This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1b170df KAFKA-6657: Add StreamsConfig prefix for different consumers
(#4805)
1b170df is described below
commit 1b170df31c7304f6d4c938b5e0c2a09ae1e9189d
Author: Boyang Chen <[email protected]>
AuthorDate: Wed May 2 13:24:15 2018 -0700
KAFKA-6657: Add StreamsConfig prefix for different consumers (#4805)
This pull request is for JIRA 6657, for KIP-276.
Added unit tests for new getGlobalConsumerConfigs API and make sure
existing restore consumer tests are passing.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>, Guozhang Wang <[email protected]>
---
docs/streams/developer-guide/config-streams.html | 20 ++-
.../apache/kafka/streams/KafkaClientSupplier.java | 20 ++-
.../org/apache/kafka/streams/KafkaStreams.java | 2 +-
.../org/apache/kafka/streams/StreamsConfig.java | 144 +++++++++++++++++++--
.../internals/DefaultKafkaClientSupplier.java | 11 +-
.../streams/processor/internals/StreamThread.java | 2 +-
.../apache/kafka/streams/StreamsConfigTest.java | 96 ++++++++++++--
.../org/apache/kafka/test/MockClientSupplier.java | 4 +
8 files changed, 266 insertions(+), 33 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index fd0cbca..e3cae22 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -516,7 +516,7 @@
<h4><a class="toc-backref" href="#id17">Naming</a><a
class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
<p>Some consumer and producer configuration parameters use the same
parameter name. For example, <code class="docutils literal"><span
class="pre">send.buffer.bytes</span></code> and
<code class="docutils literal"><span
class="pre">receive.buffer.bytes</span></code> are used to configure TCP
buffers; <code class="docutils literal"><span
class="pre">request.timeout.ms</span></code> and <code class="docutils
literal"><span class="pre">retry.backoff.ms</span></code> control retries
- for client request. You can avoid duplicate names by prefix
parameter names with <code class="docutils literal"><span
class="pre">consumer.</span></code> or <code class="docutils literal"><span
class="pre">producer</span></code> (e.g., <code class="docutils literal"><span
class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils
literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
+ for client request. You can avoid duplicate names by prefix
parameter names with <code class="docutils literal"><span
class="pre">consumer.</span></code> or <code class="docutils literal"><span
class="pre">producer.</span></code> (e.g., <code class="docutils literal"><span
class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils
literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
<div class="highlight-java"><div
class="highlight"><pre><span></span><span class="n">Properties</span> <span
class="n">streamsSettings</span> <span class="o">=</span> <span
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="c1">// same value for consumer and producer</span>
<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span
class="s">"PARAMETER_NAME"</span><span class="o">,</span> <span
class="s">"value"</span><span class="o">);</span>
@@ -527,6 +527,24 @@
<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span
class="n">StreamsConfig</span><span class="o">.</span><span
class="na">consumerPrefix</span><span class="o">(</span><span
class="s">"PARAMETER_NAME"</span><span class="o">),</span> <span
class="s">"consumer-value"</span><span class="o">);</span>
<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span
class="n">StreamsConfig</span><span class="o">.</span><span
class="na">producerPrefix</span><span class="o">(</span><span
class="s">"PARAMETER_NAME"</span><span class="o">),</span> <span
class="s">"producer-value"</span><span class="o">);</span>
</pre></div>
+ <p>You could further separate consumer configuration by adding
different prefixes:</p>
+ <ul class="simple">
+ <li><code class="docutils literal"><span
class="pre">main.consumer.</span></code> for main consumer which is the default
consumer of stream source.</li>
+ <li><code class="docutils literal"><span
class="pre">restore.consumer.</span></code> for restore consumer which is in
charge of state store recovery.</li>
+ <li><code class="docutils literal"><span
class="pre">global.consumer.</span></code> for global consumer which is used in
global KTable construction.</li>
+ </ul>
+ <p>For example, if you only want to set restore consumer config
without touching other consumers' settings, you could simply use <code
class="docutils literal"><span class="pre">restore.consumer.</span></code> to
set the config.</p>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="n">Properties</span> <span
class="n">streamsSettings</span> <span class="o">=</span> <span
class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
+<span class="c1">// same config value for all consumer types</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span
class="s">"consumer.PARAMETER_NAME"</span><span class="o">,</span>
<span class="s">"general-consumer-value"</span><span
class="o">);</span>
+<span class="c1">// set a different restore consumer config. This would make
restore consumer take restore-consumer-value,</span>
+<span>// while main consumer and global consumer stay with
general-consumer-value</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span
class="s">"restore.consumer.PARAMETER_NAME"</span><span
class="o">,</span> <span
class="s">"restore-consumer-value"</span><span class="o">);</span>
+<span class="c1">// alternatively, you can use</span>
+<span class="n">streamsSettings</span><span class="o">.</span><span
class="na">put</span><span class="o">(</span><span
class="n">StreamsConfig</span><span class="o">.</span><span
class="na">restoreConsumerPrefix</span><span class="o">(</span><span
class="s">"PARAMETER_NAME"</span><span class="o">),</span> <span
class="s">"restore-consumer-value"</span><span class="o">);</span>
+</pre></div>
+ </div>
+ <p> Same applied to <code class="docutils literal"><span
class="pre">main.consumer.</span></code> and <code class="docutils
literal"><span class="pre">main.consumer.</span></code>, if you only want to
specify one consumer type config.</p>
</div>
</div>
<div class="section" id="default-values">
diff --git
a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 8a6ec05..888edf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.processor.StateStore;
import java.util.Map;
@@ -32,7 +33,7 @@ public interface KafkaClientSupplier {
/**
* Create an {@link AdminClient} which is used for internal topic
management.
*
- * @param config Supplied by the {@link StreamsConfig} given to the {@link
KafkaStreams}
+ * @param config Supplied by the {@link java.util.Properties} given to the
{@link KafkaStreams}
* @return an instance of {@link AdminClient}
*/
AdminClient getAdminClient(final Map<String, Object> config);
@@ -41,7 +42,7 @@ public interface KafkaClientSupplier {
* Create a {@link Producer} which is used to write records to sink topics.
*
* @param config {@link StreamsConfig#getProducerConfigs(String) producer
config} which is supplied by the
- * {@link StreamsConfig} given to the {@link KafkaStreams}
instance
+ * {@link java.util.Properties} given to the {@link
KafkaStreams} instance
* @return an instance of Kafka producer
*/
Producer<byte[], byte[]> getProducer(final Map<String, Object> config);
@@ -49,8 +50,8 @@ public interface KafkaClientSupplier {
/**
* Create a {@link Consumer} which is used to read records of source
topics.
*
- * @param config {@link StreamsConfig#getConsumerConfigs(String, String)
consumer config} which is
- * supplied by the {@link StreamsConfig} given to the {@link
KafkaStreams} instance
+ * @param config {@link StreamsConfig#getMainConsumerConfigs(String,
String) consumer config} which is
+ * supplied by the {@link java.util.Properties} given to the
{@link KafkaStreams} instance
* @return an instance of Kafka consumer
*/
Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
@@ -59,8 +60,17 @@ public interface KafkaClientSupplier {
* Create a {@link Consumer} which is used to read records to restore
{@link StateStore}s.
*
* @param config {@link StreamsConfig#getRestoreConsumerConfigs(String)
restore consumer config} which is supplied
- * by the {@link StreamsConfig} given to the {@link
KafkaStreams}
+ * by the {@link java.util.Properties} given to the {@link
KafkaStreams}
* @return an instance of Kafka consumer
*/
Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object>
config);
+
+ /**
+ * Create a {@link Consumer} which is used to consume records for {@link
GlobalKTable}.
+ *
+ * @param config {@link StreamsConfig#getGlobalConsumerConfigs(String)
global consumer config} which is supplied
+ * by the {@link java.util.Properties} given to the {@link
KafkaStreams}
+ * @return an instance of Kafka consumer
+ */
+ Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object>
config);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 776dde7..66a8934 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -708,7 +708,7 @@ public class KafkaStreams {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
-
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId +
"-global")),
+
clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)),
stateDirectory,
cacheSizePerThread,
metrics,
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 653243c..18dc891 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -155,6 +155,33 @@ public class StreamsConfig extends AbstractConfig {
public static final String CONSUMER_PREFIX = "consumer.";
/**
+ * Prefix used to override {@link KafkaConsumer consumer} configs for the
main consumer client from
+ * the general consumer client configs. The override precedence is the
following (from highest to lowest precedence):
+ * 1. main.consumer.[config-name]
+ * 2. consumer.[config-name]
+ * 3. [config-name]
+ */
+ public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
+
+ /**
+ * Prefix used to override {@link KafkaConsumer consumer} configs for the
restore consumer client from
+ * the general consumer client configs. The override precedence is the
following (from highest to lowest precedence):
+ * 1. restore.consumer.[config-name]
+ * 2. consumer.[config-name]
+ * 3. [config-name]
+ */
+ public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
+
+ /**
+ * Prefix used to override {@link KafkaConsumer consumer} configs for the
global consumer client from
+ * the general consumer client configs. The override precedence is the
following (from highest to lowest precedence):
+ * 1. global.consumer.[config-name]
+ * 2. consumer.[config-name]
+ * 3. [config-name]
+ */
+ public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
+
+ /**
* Prefix used to isolate {@link KafkaProducer producer} configs from
other client configs.
* It is recommended to use {@link #producerPrefix(String)} to add this
prefix to {@link ProducerConfig producer
* properties}.
@@ -640,6 +667,39 @@ public class StreamsConfig extends AbstractConfig {
}
/**
+ * Prefix a property with {@link #MAIN_CONSUMER_PREFIX}. This is used to
isolate {@link ConsumerConfig main consumer configs}
+ * from other client configs.
+ *
+ * @param consumerProp the consumer property to be masked
+ * @return {@link #MAIN_CONSUMER_PREFIX} + {@code consumerProp}
+ */
+ public static String mainConsumerPrefix(final String consumerProp) {
+ return MAIN_CONSUMER_PREFIX + consumerProp;
+ }
+
+ /**
+ * Prefix a property with {@link #RESTORE_CONSUMER_PREFIX}. This is used
to isolate {@link ConsumerConfig restore consumer configs}
+ * from other client configs.
+ *
+ * @param consumerProp the consumer property to be masked
+ * @return {@link #RESTORE_CONSUMER_PREFIX} + {@code consumerProp}
+ */
+ public static String restoreConsumerPrefix(final String consumerProp) {
+ return RESTORE_CONSUMER_PREFIX + consumerProp;
+ }
+
+ /**
+ * Prefix a property with {@link #GLOBAL_CONSUMER_PREFIX}. This is used to
isolate {@link ConsumerConfig global consumer configs}
+ * from other client configs.
+ *
+ * @param consumerProp the consumer property to be masked
+ * @return {@link #GLOBAL_CONSUMER_PREFIX} + {@code consumerProp}
+ */
+ public static String globalConsumerPrefix(final String consumerProp) {
+ return GLOBAL_CONSUMER_PREFIX + consumerProp;
+ }
+
+ /**
* Prefix a property with {@link #PRODUCER_PREFIX}. This is used to
isolate {@link ProducerConfig producer configs}
* from other client configs.
*
@@ -771,10 +831,37 @@ public class StreamsConfig extends AbstractConfig {
* @param groupId consumer groupId
* @param clientId clientId
* @return Map of the consumer configuration.
+ * @Deprecated use {@link StreamsConfig#getMainConsumerConfigs(String,
String)}
*/
+ @Deprecated
public Map<String, Object> getConsumerConfigs(final String groupId,
final String clientId) {
- final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+ return getMainConsumerConfigs(groupId, clientId);
+ }
+
+ /**
+ * Get the configs to the {@link KafkaConsumer main consumer}.
+ * Properties using the prefix {@link #MAIN_CONSUMER_PREFIX} will be used
in favor over
+ * the properties prefixed with {@link #CONSUMER_PREFIX} and the
non-prefixed versions
+ * (read the override precedence ordering in {@link #MAIN_CONSUMER_PREFIX)
+ * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
where we always use the non-prefixed
+ * version as we only support reading/writing from/to the same Kafka
Cluster.
+ * If not specified by {@link #MAIN_CONSUMER_PREFIX}, main consumer will
share the general consumer configs
+ * prefixed by {@link #CONSUMER_PREFIX}.
+ *
+ * @param groupId consumer groupId
+ * @param clientId clientId
+ * @return Map of the consumer configuration.
+ */
+ public Map<String, Object> getMainConsumerConfigs(final String groupId,
+ final String clientId) {
+ Map<String, Object> consumerProps = getCommonConsumerConfigs();
+
+ // Get main consumer override configs
+ Map<String, Object> mainConsumerProps =
originalsWithPrefix(MAIN_CONSUMER_PREFIX);
+ for (Map.Entry<String, Object> entry: mainConsumerProps.entrySet()) {
+ consumerProps.put(entry.getKey(), entry.getValue());
+ }
// add client id with stream client id prefix, and group id
consumerProps.put(APPLICATION_ID_CONFIG, groupId);
@@ -821,23 +908,64 @@ public class StreamsConfig extends AbstractConfig {
/**
* Get the configs for the {@link KafkaConsumer restore-consumer}.
- * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in
favor over their non-prefixed versions
+ * Properties using the prefix {@link #RESTORE_CONSUMER_PREFIX} will be
used in favor over
+ * the properties prefixed with {@link #CONSUMER_PREFIX} and the
non-prefixed versions
+ * (read the override precedence ordering in {@link
#RESTORE_CONSUMER_PREFIX)
* except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
where we always use the non-prefixed
* version as we only support reading/writing from/to the same Kafka
Cluster.
+ * If not specified by {@link #RESTORE_CONSUMER_PREFIX}, restore consumer
will share the general consumer configs
+ * prefixed by {@link #CONSUMER_PREFIX}.
*
* @param clientId clientId
- * @return Map of the consumer configuration.
+ * @return Map of the restore consumer configuration.
*/
public Map<String, Object> getRestoreConsumerConfigs(final String
clientId) {
- final Map<String, Object> consumerProps = getCommonConsumerConfigs();
+ Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+ // Get restore consumer override configs
+ Map<String, Object> restoreConsumerProps =
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
+ for (Map.Entry<String, Object> entry: restoreConsumerProps.entrySet())
{
+ baseConsumerProps.put(entry.getKey(), entry.getValue());
+ }
// no need to set group id for a restore consumer
- consumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+ baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
// add client id with stream client id prefix
- consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
"-restore-consumer");
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
"-restore-consumer");
+ baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
- return consumerProps;
+ return baseConsumerProps;
+ }
+
+ /**
+ * Get the configs for the {@link KafkaConsumer global consumer}.
+ * Properties using the prefix {@link #GLOBAL_CONSUMER_PREFIX} will be
used in favor over
+ * the properties prefixed with {@link #CONSUMER_PREFIX} and the
non-prefixed versions
+ * (read the override precedence ordering in {@link
#GLOBAL_CONSUMER_PREFIX)
+ * except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG}
where we always use the non-prefixed
+ * version as we only support reading/writing from/to the same Kafka
Cluster.
+ * If not specified by {@link #GLOBAL_CONSUMER_PREFIX}, global consumer
will share the general consumer configs
+ * prefixed by {@link #CONSUMER_PREFIX}.
+ *
+ * @param clientId clientId
+ * @return Map of the global consumer configuration.
+ */
+ public Map<String, Object> getGlobalConsumerConfigs(final String clientId)
{
+ Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+
+ // Get global consumer override configs
+ Map<String, Object> globalConsumerProps =
originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
+ for (Map.Entry<String, Object> entry: globalConsumerProps.entrySet()) {
+ baseConsumerProps.put(entry.getKey(), entry.getValue());
+ }
+
+ // no need to set group id for a global consumer
+ baseConsumerProps.remove(ConsumerConfig.GROUP_ID_CONFIG);
+ // add client id with stream client id prefix
+ baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
"-global-consumer");
+ baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
+ return baseConsumerProps;
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index 6f01e2f..69331b4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -35,17 +35,22 @@ public class DefaultKafkaClientSupplier implements
KafkaClientSupplier {
}
@Override
- public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
+ public Producer<byte[], byte[]> getProducer(final Map<String, Object>
config) {
return new KafkaProducer<>(config, new ByteArraySerializer(), new
ByteArraySerializer());
}
@Override
- public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
+ public Consumer<byte[], byte[]> getConsumer(final Map<String, Object>
config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new
ByteArrayDeserializer());
}
@Override
- public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object>
config) {
+ public Consumer<byte[], byte[]> getRestoreConsumer(final Map<String,
Object> config) {
+ return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new
ByteArrayDeserializer());
+ }
+
+ @Override
+ public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String,
Object> config) {
return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new
ByteArrayDeserializer());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e4ad138..cc5a07f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -641,7 +641,7 @@ public class StreamThread extends Thread {
log.info("Creating consumer client");
final String applicationId =
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- final Map<String, Object> consumerConfigs =
config.getConsumerConfigs(applicationId, threadClientId);
+ final Map<String, Object> consumerConfigs =
config.getMainConsumerConfigs(applicationId, threadClientId);
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR,
taskManager);
String originalReset = null;
if (!builder.latestResetTopicsPattern().pattern().equals("") ||
!builder.earliestResetTopicsPattern().pattern().equals("")) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ac82c04..e991b6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -96,7 +96,7 @@ public class StreamsConfigTest {
public void testGetConsumerConfigs() {
final String groupId = "example-application";
final String clientId = "client";
- final Map<String, Object> returnedProps =
streamsConfig.getConsumerConfigs(groupId, clientId);
+ final Map<String, Object> returnedProps =
streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG),
clientId + "-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG),
groupId);
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
@@ -115,7 +115,7 @@ public class StreamsConfigTest {
final String groupId = "example-application";
final String clientId = "client";
- final Map<String, Object> returnedProps =
streamsConfig.getConsumerConfigs(groupId, clientId);
+ final Map<String, Object> returnedProps =
streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(42,
returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
assertEquals(1,
returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
@@ -135,12 +135,22 @@ public class StreamsConfigTest {
final String groupId = "example-application";
final String clientId = "client";
- final Map<String, Object> returnedProps =
streamsConfig.getConsumerConfigs(groupId, clientId);
+ final Map<String, Object> returnedProps =
streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(20,
returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
}
@Test
+ public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
+
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"5");
+
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"50");
+ final String groupId = "example-application";
+ final String clientId = "client";
+ final Map<String, Object> returnedProps =
streamsConfig.getMainConsumerConfigs(groupId, clientId);
+ assertEquals("50",
returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+ }
+
+ @Test
public void testGetRestoreConsumerConfigs() {
final String clientId = "client";
final Map<String, Object> returnedProps =
streamsConfig.getRestoreConsumerConfigs(clientId);
@@ -185,7 +195,7 @@ public class StreamsConfigTest {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG),
1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("earliest",
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
assertEquals(1,
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@@ -202,7 +212,7 @@ public class StreamsConfigTest {
public void
shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
}
@@ -238,7 +248,7 @@ public class StreamsConfigTest {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("earliest",
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
assertEquals(1,
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@@ -265,7 +275,7 @@ public class StreamsConfigTest {
public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put("custom.property.host", "host");
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
final Map<String, Object> restoreConsumerConfigs =
streamsConfig.getRestoreConsumerConfigs("clientId");
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> adminConfigs =
streamsConfig.getAdminConfigs("clientId");
@@ -282,7 +292,7 @@ public class StreamsConfigTest {
props.put(consumerPrefix("custom.property.host"), "host1");
props.put(producerPrefix("custom.property.host"), "host2");
props.put(adminClientPrefix("custom.property.host"), "host3");
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
final Map<String, Object> restoreConsumerConfigs =
streamsConfig.getRestoreConsumerConfigs("clientId");
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
final Map<String, Object> adminConfigs =
streamsConfig.getAdminConfigs("clientId");
@@ -319,7 +329,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertEquals("latest",
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
assertEquals("10",
consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@@ -344,7 +354,7 @@ public class StreamsConfigTest {
public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("a", "b");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("a", "b");
assertEquals("false",
consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@@ -357,9 +367,65 @@ public class StreamsConfigTest {
}
@Test
+ public void
testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
+
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"5");
+
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"50");
+ final Map<String, Object> returnedProps =
streamsConfig.getRestoreConsumerConfigs("clientId");
+ assertEquals("50",
returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+ }
+
+ @Test
+ public void testGetGlobalConsumerConfigs() {
+ final String clientId = "client";
+ final Map<String, Object> returnedProps =
streamsConfig.getGlobalConsumerConfigs(clientId);
+ assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG),
clientId + "-global-consumer");
+ assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
+ }
+
+ @Test
+ public void shouldSupportPrefixedGlobalConsumerConfigs() {
+ props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG),
1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs =
streamsConfig.getGlobalConsumerConfigs("clientId");
+ assertEquals(1,
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+ @Test
+ public void
shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ props.put(consumerPrefix("interceptor.statsd.host"), "host");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getGlobalConsumerConfigs("clientId");
+ assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
+ }
+
+ @Test
+ public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
+ props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs =
streamsConfig.getGlobalConsumerConfigs("groupId");
+ assertEquals(1,
consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
+ }
+
+ @Test
+ public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
+
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ final Map<String, Object> consumerConfigs =
streamsConfig.getGlobalConsumerConfigs("client");
+ assertEquals("false",
consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ }
+
+ @Test
+ public void
testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
+
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"5");
+
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
"50");
+ final Map<String, Object> returnedProps =
streamsConfig.getGlobalConsumerConfigs("clientId");
+ assertEquals("50",
returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
+ }
+
+ @Test
public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
assertThat(consumerConfigs.get("internal.leave.group.on.close"),
CoreMatchers.<Object>equalTo(false));
}
@@ -388,7 +454,9 @@ public class StreamsConfigTest {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
+ String isoLevel = (String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+ String name = READ_COMMITTED.name();
assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
}
@@ -396,7 +464,7 @@ public class StreamsConfigTest {
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientrId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
}
@@ -423,7 +491,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs("groupId", "clientId");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getMainConsumerConfigs("groupId", "clientId");
final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index 1ec28fa..d3430f2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -83,4 +83,8 @@ public class MockClientSupplier implements
KafkaClientSupplier {
return restoreConsumer;
}
+ @Override
+ public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String,
Object> config) {
+ return restoreConsumer;
+ }
}
--
To stop receiving notification emails like this one, please contact
[email protected].