[ 
https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345552#comment-16345552
 ] 

ASF GitHub Bot commented on KAFKA-6166:
---------------------------------------

guozhangwang closed pull request #4434: KAFKA-6166: Streams configuration 
requires consumer. and producer. in order to be read
URL: https://github.com/apache/kafka/pull/4434
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 0feb48ddecd..1393223ec23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -693,6 +693,7 @@ public StreamsConfig(final Map<?, ?> props) {
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
 
         final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
@@ -832,6 +833,7 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
 
         // generate producer configs from original properties and overridden 
maps
         final Map<String, Object> props = new HashMap<>(eosEnabled ? 
PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
+        props.putAll(getClientCustomProps());
         props.putAll(clientProvidedProps);
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -847,7 +849,11 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @return Map of the admin client configuration.
      */
     public Map<String, Object> getAdminConfigs(final String clientId) {
-        final Map<String, Object> props = 
getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
+        final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
+
+        final Map<String, Object> props = new HashMap<>();
+        props.putAll(getClientCustomProps());
+        props.putAll(clientProvidedProps);
 
         // add client id with stream client id prefix
         props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-admin");
@@ -862,6 +868,26 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
         return props;
     }
 
+    /**
+     * Get a map of custom configs by removing from the originals all the 
Streams, Consumer, Producer, and AdminClient configs.
+     * Prefixed properties are also removed because they are already added by 
{@link #getClientPropsWithPrefix(String, Set)}.
+     * This allows to set a custom property for a specific client alone if 
specified using a prefix, or for all
+     * when no prefix is used.
+     *
+     * @return a map with the custom properties
+     */
+    private Map<String, Object> getClientCustomProps() {
+        final Map<String, Object> props = originals();
+        props.keySet().removeAll(CONFIG.names());
+        props.keySet().removeAll(ConsumerConfig.configNames());
+        props.keySet().removeAll(ProducerConfig.configNames());
+        props.keySet().removeAll(AdminClientConfig.configNames());
+        props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, 
false).keySet());
+        props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, 
false).keySet());
+        props.keySet().removeAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, 
false).keySet());
+        return props;
+    }
+
     /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of 
{@link #KEY_SERDE_CLASS_CONFIG key Serde
      * class}. This method is deprecated. Use {@link #defaultKeySerde()} 
method instead.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 1d6b5a50117..cc072d5508d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -45,6 +45,7 @@
 import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
@@ -66,7 +67,6 @@ public void setUp() {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        props.put("DUMMY", "dummy");
         props.put("key.deserializer.encoding", "UTF8");
         props.put("value.deserializer.encoding", "UTF-16");
         streamsConfig = new StreamsConfig(props);
@@ -90,7 +90,6 @@ public void testGetProducerConfigs() {
         final Map<String, Object> returnedProps = 
streamsConfig.getProducerConfigs(clientId);
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), 
clientId + "-producer");
         assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), 
"100");
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -101,7 +100,6 @@ public void testGetConsumerConfigs() {
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), 
clientId + "-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), 
groupId);
         
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -148,7 +146,6 @@ public void testGetRestoreConsumerConfigs() {
         final Map<String, Object> returnedProps = 
streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), 
clientId + "-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -264,6 +261,37 @@ public void shouldSupportNonPrefixedProducerConfigs() {
         assertEquals(1, 
configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
+    @Test
+    public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put("custom.property.host", "host");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> restoreConsumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> adminConfigs = 
streamsConfig.getAdminConfigs("clientId");
+        assertEquals("host", consumerConfigs.get("custom.property.host"));
+        assertEquals("host", 
restoreConsumerConfigs.get("custom.property.host"));
+        assertEquals("host", producerConfigs.get("custom.property.host"));
+        assertEquals("host", adminConfigs.get("custom.property.host"));
+    }
+
+    @Test
+    public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put("custom.property.host", "host0");
+        props.put(consumerPrefix("custom.property.host"), "host1");
+        props.put(producerPrefix("custom.property.host"), "host2");
+        props.put(adminClientPrefix("custom.property.host"), "host3");
+        final Map<String, Object> consumerConfigs = 
streamsConfig.getConsumerConfigs("groupId", "clientId");
+        final Map<String, Object> restoreConsumerConfigs = 
streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> adminConfigs = 
streamsConfig.getAdminConfigs("clientId");
+        assertEquals("host1", consumerConfigs.get("custom.property.host"));
+        assertEquals("host1", 
restoreConsumerConfigs.get("custom.property.host"));
+        assertEquals("host2", producerConfigs.get("custom.property.host"));
+        assertEquals("host3", adminConfigs.get("custom.property.host"));
+    }
+
     @Test
     public void shouldSupportNonPrefixedAdminConfigs() {
         props.put(AdminClientConfig.RETRIES_CONFIG, 10);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams configuration requires consumer. and producer. in order to be read
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-6166
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6166
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0
>         Environment: Kafka 0.11.0.0
> JDK 1.8
> CoreOS
>            Reporter: Justin Manchester
>            Assignee: Filipe Agapito
>            Priority: Minor
>              Labels: newbie++, user-experience
>
> Problem:
> In previous release you could specify a custom metrics reporter like so:
> Properties config = new Properties(); 
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
> config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
> "com.mycompany.MetricReporter"); 
> config.put("custom-key-for-metric-reporter", "value");
> From 0.11.0.0 onwards this is no longer possible, as you have to specify 
> consumer.custom-key-for-metric-reporter or 
> producer.custom-key-for-metric-reporter otherwise it's stripped out of the 
> configuration.
> So, if you wish to use a metrics reporter and to collect producer and 
> consumer metrics, as well as kafka-streams metrics, that you would need to 
> specify 3 distinct configs:
> 1) consumer.custom-key-for-metric-reporter 
> 2) producer.custom-key-for-metric-reporter 
> 3) custom-key-for-metric-reporter
> This appears to be a regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to