This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1b170df  KAFKA-6657: Add StreamsConfig prefix for different consumers 
(#4805)
1b170df is described below

commit 1b170df31c7304f6d4c938b5e0c2a09ae1e9189d
Author: Boyang Chen <[email protected]>
AuthorDate: Wed May 2 13:24:15 2018 -0700

    KAFKA-6657: Add StreamsConfig prefix for different consumers (#4805)
    
    This pull request is for JIRA 6657, for KIP-276.
    
    Added unit tests for new getGlobalConsumerConfigs API and make sure 
existing restore consumer tests are passing.
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax 
<[email protected]>, Guozhang Wang <[email protected]>
---
 docs/streams/developer-guide/config-streams.html   |  20 ++-
 .../apache/kafka/streams/KafkaClientSupplier.java  |  20 ++-
 .../org/apache/kafka/streams/KafkaStreams.java     |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java    | 144 +++++++++++++++++++--
 .../internals/DefaultKafkaClientSupplier.java      |  11 +-
 .../streams/processor/internals/StreamThread.java  |   2 +-
 .../apache/kafka/streams/StreamsConfigTest.java    |  96 ++++++++++++--
 .../org/apache/kafka/test/MockClientSupplier.java  |   4 +
 8 files changed, 266 insertions(+), 33 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index fd0cbca..e3cae22 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -516,7 +516,7 @@
           <h4><a class="toc-backref" href="#id17">Naming</a><a 
class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
           <p>Some consumer and producer configuration parameters use the same 
parameter name. For example, <code class="docutils literal"><span 
class="pre">send.buffer.bytes</span></code> and
             <code class="docutils literal"><span 
class="pre">receive.buffer.bytes</span></code> are used to configure TCP 
buffers; <code class="docutils literal"><span 
class="pre">request.timeout.ms</span></code> and <code class="docutils 
literal"><span class="pre">retry.backoff.ms</span></code> control retries
-            for client request. You can avoid duplicate names by prefix 
parameter names with <code class="docutils literal"><span 
class="pre">consumer.</span></code> or <code class="docutils literal"><span 
class="pre">producer</span></code> (e.g., <code class="docutils literal"><span 
class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils 
literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
+            for client request. You can avoid duplicate names by prefix 
parameter names with <code class="docutils literal"><span 
class="pre">consumer.</span></code> or <code class="docutils literal"><span 
class="pre">producer.</span></code> (e.g., <code class="docutils literal"><span 
class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils 
literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
           <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
 <span class="c1">// same value for consumer and producer</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">,</span> <span 
class="s">&quot;value&quot;</span><span class="o">);</span>
@@ -527,6 +527,24 @@
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">consumerPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;consumer-value&quot;</span><span class="o">);</span>
 <span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">producerPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;producer-value&quot;</span><span class="o">);</span>
 </pre></div>
+          <p>You could further separate consumer configuration by adding 
different prefixes:</p>
+          <ul class="simple">
+            <li><code class="docutils literal"><span 
class="pre">main.consumer.</span></code> for main consumer which is the default 
consumer of stream source.</li>
+            <li><code class="docutils literal"><span 
class="pre">restore.consumer.</span></code> for restore consumer which is in 
charge of state store recovery.</li>
+            <li><code class="docutils literal"><span 
class="pre">global.consumer.</span></code> for global consumer which is used in 
global KTable construction.</li>
+          </ul>
+          <p>For example, if you only want to set restore consumer config 
without touching other consumers' settings, you could simply use <code 
class="docutils literal"><span class="pre">restore.consumer.</span></code> to 
set the config.</p>
+          <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">Properties</span> <span 
class="n">streamsSettings</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="c1">// same config value for all consumer types</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;consumer.PARAMETER_NAME&quot;</span><span class="o">,</span> 
<span class="s">&quot;general-consumer-value&quot;</span><span 
class="o">);</span>
+<span class="c1">// set a different restore consumer config. This would make 
restore consumer take restore-consumer-value,</span>
+<span>// while main consumer and global consumer stay with 
general-consumer-value</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="s">&quot;restore.consumer.PARAMETER_NAME&quot;</span><span 
class="o">,</span> <span 
class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
+<span class="c1">// alternatively, you can use</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span 
class="na">put</span><span class="o">(</span><span 
class="n">StreamsConfig</span><span class="o">.</span><span 
class="na">restoreConsumerPrefix</span><span class="o">(</span><span 
class="s">&quot;PARAMETER_NAME&quot;</span><span class="o">),</span> <span 
class="s">&quot;restore-consumer-value&quot;</span><span class="o">);</span>
+</pre></div>
+          </div>
+          <p> Same applied to <code class="docutils literal"><span 
class="pre">main.consumer.</span></code> and <code class="docutils 
literal"><span class="pre">main.consumer.</span></code>, if you only want to 
specify one consumer type config.</p>
           </div>
         </div>
         <div class="section" id="default-values">
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 8a6ec05..888edf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.Map;
@@ -32,7 +33,7 @@ public interface KafkaClientSupplier {
     /**
      * Create an {@link AdminClient} which is used for internal topic 
management.
      *
-     * @param config Supplied by the {@link StreamsConfig} given to the {@link 
KafkaStreams}
+     * @param config Supplied by the {@link java.util.Properties} given to the 
{@link KafkaStreams}
      * @return an instance of {@link AdminClient}
      */
     AdminClient getAdminClient(final Map<String, Object> config);
@@ -41,7 +42,7 @@ public interface KafkaClientSupplier {
      * Create a {@link Producer} which is used to write records to sink topics.
      *
      * @param config {@link StreamsConfig#getProducerConfigs(String) producer 
config} which is supplied by the
-     *               {@link StreamsConfig} given to the {@link KafkaStreams} 
instance
+     *               {@link java.util.Properties} given to the {@link 
KafkaStreams} instance
      * @return an instance of Kafka producer
      */
     Producer<byte[], byte[]> getProducer(final Map<String, Object> config);
@@ -49,8 +50,8 @@ public interface KafkaClientSupplier {
     /**
      * Create a {@link Consumer} which is used to read records of source 
topics.
      *
-     * @param config {@link StreamsConfig#getConsumerConfigs(String, String) 
consumer config} which is
-     *               supplied by the {@link StreamsConfig} given to the {@link 
KafkaStreams} instance
+     * @param config {@link StreamsConfig#getMainConsumerConfigs(String, 
String) consumer config} which is
+     *               supplied by the {@link java.util.Properties} given to the 
{@link KafkaStreams} instance
      * @return an instance of Kafka consumer
      */
     Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
@@ -59,8 +60,17 @@ public interface KafkaClientSupplier {
      * Create a {@link Consumer} which is used to read records to restore 
{@link StateStore}s.
      *
      * @param config {@link StreamsConfig#getRestoreConsumerConfigs(String) 
restore consumer config} which is supplied
-     *               by the {@link StreamsConfig} given to the {@link 
KafkaStreams}
+     *               by the {@link java.util.Properties} given to the {@link 
KafkaStreams}
      * @return an instance of Kafka consumer
      */
     Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> 
config);
+
+    /**
+     * Create a {@link Consumer} which is used to consume records for {@link 
GlobalKTable}.
+     *
+     * @param config {@link StreamsConfig#getGlobalConsumerConfigs(String) 
global consumer config} which is supplied
+     *               by the {@link java.util.Properties} given to the {@link 
KafkaStreams}
+     * @return an instance of Kafka consumer
+     */
+    Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> 
config);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 776dde7..66a8934 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -708,7 +708,7 @@ public class KafkaStreams {
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
-                                                        
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + 
"-global")),
+                                                        
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
                                                         stateDirectory,
                                                         cacheSizePerThread,
                                                         metrics,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 653243c..18dc891 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -155,6 +155,33 @@ public class StreamsConfig extends AbstractConfig {
     public static final String CONSUMER_PREFIX = "consumer.";
 
     /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the 
main consumer client from
+     * the general consumer client configs. The override precedence is the 
following (from highest to lowest precedence):
+     * 1. main.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
+
+    /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the 
restore consumer client from
+     * the general consumer client configs. The override precedence is the 
following (from highest to lowest precedence):
+     * 1. restore.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
+
+    /**
+     * Prefix used to override {@link KafkaConsumer consumer} configs for the 
global consumer client from
+     * the general consumer client configs. The override precedence is the 
following (from highest to lowest precedence):
+     * 1. global.consumer.[config-name]
+     * 2. consumer.[config-name]
+     * 3. [config-name]
+     */
+    public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
+
+    /**
      * Prefix used to isolate {@link KafkaProducer producer} configs from 
other client configs.
      * It is recommended to use {@link #producerPrefix(String)} to add this 
prefix to {@link ProducerConfig producer
      * properties}.
@@ -640,6 +667,39 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Prefix a property with {@link #MAIN_CONSUMER_PREFIX}. This is used to 
isolate {@link ConsumerConfig main consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #MAIN_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String mainConsumerPrefix(final String consumerProp) {
+        return MAIN_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
+     * Prefix a property with {@link #RESTORE_CONSUMER_PREFIX}. This is used 
to isolate {@link ConsumerConfig restore consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #RESTORE_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String restoreConsumerPrefix(final String consumerProp) {
+        return RESTORE_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
+     * Prefix a property with {@link #GLOBAL_CONSUMER_PREFIX}. This is used to 
isolate {@link ConsumerConfig global consumer configs}
+     * from other client configs.
+     *
+     * @param consumerProp the consumer property to be masked
+     * @return {@link #GLOBAL_CONSUMER_PREFIX} + {@code consumerProp}
+     */
+    public static String globalConsumerPrefix(final String consumerProp) {
+        return GLOBAL_CONSUMER_PREFIX + consumerProp;
+    }
+
+    /**
      * Prefix a property with {@link #PRODUCER_PREFIX}. This is used to 
isolate {@link ProducerConfig producer configs}
      * from other client configs.
      *
@@ -771,10 +831,37 @@ public class StreamsConfig extends AbstractConfig {
      * @param groupId      consumer groupId
      * @param clientId     clientId
      * @return Map of the consumer configuration.
+     * @Deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, 
String)}
      */
+    @Deprecated
     public Map<String, Object> getConsumerConfigs(final String groupId,
                                                   final String clientId) {
-        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+        return getMainConsumerConfigs(groupId, clientId);
+    }
+
+    /**
+     * Get the configs to the {@link KafkaConsumer main consumer}.
+     * Properties using the prefix {@link #MAIN_CONSUMER_PREFIX} will be used 
in favor over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the 
non-prefixed versions
+     * (read the override precedence ordering in {@link #MAIN_CONSUMER_PREFIX)
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} 
where we always use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka 
Cluster.
+     * If not specified by {@link #MAIN_CONSUMER_PREFIX}, main consumer will 
share the general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
+     *
+     * @param groupId      consumer groupId
+     * @param clientId     clientId
+     * @return Map of the consumer configuration.
+     */
+    public Map<String, Object> getMainConsumerConfigs(final String groupId,
+                                                      final String clientId) {
+        Map<String, Object> consumerProps = getCommonConsumerConfigs();
+
+        // Get main consumer override configs
+        Map<String, Object> mainConsumerProps = 
originalsWithPrefix(MAIN_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: mainConsumerProps.entrySet()) {
+            consumerProps.put(entry.getKey(), entry.getValue());
+        }
 
         // add client id with stream client id prefix, and group id
         consumerProps.put(APPLICATION_ID_CONFIG, groupId);
@@ -821,23 +908,64 @@ public class StreamsConfig extends AbstractConfig {
 
     /**
      * Get the configs for the {@link KafkaConsumer restore-consumer}.
-     * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in 
favor over their non-prefixed versions
+     * Properties using the prefix {@link #RESTORE_CONSUMER_PREFIX} will be 
used in favor over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the 
non-prefixed versions
+     * (read the override precedence ordering in {@link 
#RESTORE_CONSUMER_PREFIX)
      * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} 
where we always use the non-prefixed
      * version as we only support reading/writing from/to the same Kafka 
Cluster.
+     * If not specified by {@link #RESTORE_CONSUMER_PREFIX}, restore consumer 
will share the general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
      *
      * @param clientId clientId
-     * @return Map of the consumer configuration.
+     * @return Map of the restore consumer configuration.
      */
     public Map<String, Object> getRestoreConsumerConfigs(final String 
clientId) {
-        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+        // Get restore consumer override configs
+        Map<String, Object> restoreConsumerProps = 
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: restoreConsumerProps.entrySet()) 
{
+            baseConsumerProps.put(entry.getKey(), entry.getValue());
+        }
 
         // no need to set group id for a restore consumer
-        consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+        baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
         // add client id with stream client id prefix
-        consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-restore-consumer");
-        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+        baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-restore-consumer");
+        baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
 
-        return consumerProps;
+        return baseConsumerProps;
+    }
+
+    /**
+     * Get the configs for the {@link KafkaConsumer global consumer}.
+     * Properties using the prefix {@link #GLOBAL_CONSUMER_PREFIX} will be 
used in favor over
+     * the properties prefixed with {@link #CONSUMER_PREFIX} and the 
non-prefixed versions
+     * (read the override precedence ordering in {@link 
#GLOBAL_CONSUMER_PREFIX)
+     * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} 
where we always use the non-prefixed
+     * version as we only support reading/writing from/to the same Kafka 
Cluster.
+     * If not specified by {@link #GLOBAL_CONSUMER_PREFIX}, global consumer 
will share the general consumer configs
+     * prefixed by {@link #CONSUMER_PREFIX}.
+     *
+     * @param clientId clientId
+     * @return Map of the global consumer configuration.
+     */
+    public Map<String, Object> getGlobalConsumerConfigs(final String clientId) 
{
+        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+        // Get global consumer override configs
+        Map<String, Object> globalConsumerProps = 
originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
+        for (Map.Entry<String, Object> entry: globalConsumerProps.entrySet()) {
+            baseConsumerProps.put(entry.getKey(), entry.getValue());
+        }
+
+        // no need to set group id for a global consumer
+        baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+        // add client id with stream client id prefix
+        baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-global-consumer");
+        baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
+        return baseConsumerProps;
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index 6f01e2f..69331b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -35,17 +35,22 @@ public class DefaultKafkaClientSupplier implements 
KafkaClientSupplier {
     }
 
     @Override
-    public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+    public Producer<byte[], byte[]> getProducer(final Map<String, Object> 
config) {
         return new KafkaProducer<>(config, new ByteArraySerializer(), new 
ByteArraySerializer());
     }
 
     @Override
-    public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+    public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> 
config) {
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
     }
 
     @Override
-    public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> 
config) {
+    public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, 
Object> config) {
+        return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+    }
+
+    @Override
+    public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, 
Object> config) {
         return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e4ad138..cc5a07f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -641,7 +641,7 @@ public class StreamThread extends Thread {
 
         log.info("Creating consumer client");
         final String applicationId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        final Map<String, Object> consumerConfigs = 
config.getConsumerConfigs(applicationId, threadClientId);
+        final Map<String, Object> consumerConfigs = 
config.getMainConsumerConfigs(applicationId, threadClientId);
         
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR,
 taskManager);
         String originalReset = null;
         if (!builder.latestResetTopicsPattern().pattern().equals("") || 
!builder.earliestResetTopicsPattern().pattern().equals("")) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ac82c04..e991b6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -96,7 +96,7 @@ public class StreamsConfigTest {
     public void testGetConsumerConfigs() {
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = 
streamsConfig.getConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = 
streamsConfig.getMainConsumerConfigs(groupId, clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), 
clientId + "-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), 
groupId);
         
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
@@ -115,7 +115,7 @@ public class StreamsConfigTest {
 
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = 
streamsConfig.getConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = 
streamsConfig.getMainConsumerConfigs(groupId, clientId);
 
         assertEquals(42, 
returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
         assertEquals(1, 
returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
@@ -135,12 +135,22 @@ public class StreamsConfigTest {
 
         final String groupId = "example-application";
         final String clientId = "client";
-        final Map<String, Object> returnedProps = 
streamsConfig.getConsumerConfigs(groupId, clientId);
+        final Map<String, Object> returnedProps = 
streamsConfig.getMainConsumerConfigs(groupId, clientId);
 
         assertEquals(20, 
returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
     }
 
     @Test
+    public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
+        
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 
"5");
+        
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
 "50");
+        final String groupId = "example-application";
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = 
streamsConfig.getMainConsumerConfigs(groupId, clientId);
+        assertEquals("50", 
returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
     public void testGetRestoreConsumerConfigs() {
         final String clientId = "client";
         final Map<String, Object> returnedProps = 
streamsConfig.getRestoreConsumerConfigs(clientId);
@@ -185,7 +195,7 @@ public class StreamsConfigTest {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), 
"earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 
1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("earliest", 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, 
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -202,7 +212,7 @@ public class StreamsConfigTest {
     public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
     }
 
@@ -238,7 +248,7 @@ public class StreamsConfigTest {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("earliest", 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals(1, 
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -265,7 +275,7 @@ public class StreamsConfigTest {
     public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put("custom.property.host", "host");
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         final Map<String, Object> restoreConsumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("clientId");
         final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
         final Map<String, Object> adminConfigs = 
streamsConfig.getAdminConfigs("clientId");
@@ -282,7 +292,7 @@ public class StreamsConfigTest {
         props.put(consumerPrefix("custom.property.host"), "host1");
         props.put(producerPrefix("custom.property.host"), "host2");
         props.put(adminClientPrefix("custom.property.host"), "host3");
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         final Map<String, Object> restoreConsumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("clientId");
         final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
         final Map<String, Object> adminConfigs = 
streamsConfig.getAdminConfigs("clientId");
@@ -319,7 +329,7 @@ public class StreamsConfigTest {
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest");
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 
"10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertEquals("latest", 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals("10", 
consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
@@ -344,7 +354,7 @@ public class StreamsConfigTest {
     public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
         
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("a", "b");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("a", "b");
         assertEquals("false", 
consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
@@ -357,9 +367,65 @@ public class StreamsConfigTest {
     }
 
     @Test
+    public void 
testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
+        
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 
"5");
+        
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
 "50");
+        final Map<String, Object> returnedProps = 
streamsConfig.getRestoreConsumerConfigs("clientId");
+        assertEquals("50", 
returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
+    public void testGetGlobalConsumerConfigs() {
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = 
streamsConfig.getGlobalConsumerConfigs(clientId);
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), 
clientId + "-global-consumer");
+        assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+    }
+
+    @Test
+    public void shouldSupportPrefixedGlobalConsumerConfigs() {
+        props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 
1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals(1, 
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void 
shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put(consumerPrefix("interceptor.statsd.host"), "host");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
+    }
+
+    @Test
+    public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
+        props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getGlobalConsumerConfigs("groupId");
+        assertEquals(1, 
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+    }
+
+    @Test
+    public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
+        
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 "true");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getGlobalConsumerConfigs("client");
+        assertEquals("false", 
consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    }
+
+    @Test
+    public void 
testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
+        
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 
"5");
+        
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
 "50");
+        final Map<String, Object> returnedProps = 
streamsConfig.getGlobalConsumerConfigs("clientId");
+        assertEquals("50", 
returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+    }
+
+    @Test
     public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), 
CoreMatchers.<Object>equalTo(false));
     }
 
@@ -388,7 +454,9 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+        String isoLevel = (String) 
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+        String name = READ_COMMITTED.name();
         assertThat((String) 
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), 
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -396,7 +464,7 @@ public class StreamsConfigTest {
     public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientrId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
         assertThat((String) 
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), 
equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
@@ -423,7 +491,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
 
-        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
         final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
 
         assertThat((String) 
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), 
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 1ec28fa..d3430f2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -83,4 +83,8 @@ public class MockClientSupplier implements 
KafkaClientSupplier {
         return restoreConsumer;
     }
 
+    @Override
+    public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, 
Object> config) {
+        return restoreConsumer;
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to