[ https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16345552#comment-16345552 ]
ASF GitHub Bot commented on KAFKA-6166: --------------------------------------- guozhangwang closed pull request #4434: KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read URL: https://github.com/apache/kafka/pull/4434 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 0feb48ddecd..1393223ec23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -693,6 +693,7 @@ public StreamsConfig(final Map<?, ?> props) { checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); + consumerProps.putAll(getClientCustomProps()); consumerProps.putAll(clientProvidedProps); // bootstrap.servers should be from StreamsConfig @@ -832,6 +833,7 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje // generate producer configs from original properties and overridden maps final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES); + props.putAll(getClientCustomProps()); props.putAll(clientProvidedProps); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); @@ -847,7 +849,11 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje * @return Map of the admin client configuration. */ public Map<String, Object> getAdminConfigs(final String clientId) { - final Map<String, Object> props = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()); + final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()); + + final Map<String, Object> props = new HashMap<>(); + props.putAll(getClientCustomProps()); + props.putAll(clientProvidedProps); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-admin"); @@ -862,6 +868,26 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje return props; } + /** + * Get a map of custom configs by removing from the originals all the Streams, Consumer, Producer, and AdminClient configs. + * Prefixed properties are also removed because they are already added by {@link #getClientPropsWithPrefix(String, Set)}. + * This allows to set a custom property for a specific client alone if specified using a prefix, or for all + * when no prefix is used. + * + * @return a map with the custom properties + */ + private Map<String, Object> getClientCustomProps() { + final Map<String, Object> props = originals(); + props.keySet().removeAll(CONFIG.names()); + props.keySet().removeAll(ConsumerConfig.configNames()); + props.keySet().removeAll(ProducerConfig.configNames()); + props.keySet().removeAll(AdminClientConfig.configNames()); + props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, false).keySet()); + props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, false).keySet()); + props.keySet().removeAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet()); + return props; + } + /** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead. diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 1d6b5a50117..cc072d5508d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -45,6 +45,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; +import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig; @@ -66,7 +67,6 @@ public void setUp() { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put("DUMMY", "dummy"); props.put("key.deserializer.encoding", "UTF8"); props.put("value.deserializer.encoding", "UTF-16"); streamsConfig = new StreamsConfig(props); @@ -90,7 +90,6 @@ public void testGetProducerConfigs() { final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId); assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer"); assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100"); - assertNull(returnedProps.get("DUMMY")); } @Test @@ -101,7 +100,6 @@ public void testGetConsumerConfigs() { assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer"); assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId); assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000"); - assertNull(returnedProps.get("DUMMY")); } @Test @@ -148,7 +146,6 @@ public void testGetRestoreConsumerConfigs() { final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer"); assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); - assertNull(returnedProps.get("DUMMY")); } @Test @@ -264,6 +261,37 @@ public void shouldSupportNonPrefixedProducerConfigs() { assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); } + @Test + public void shouldForwardCustomConfigsWithNoPrefixToAllClients() { + final StreamsConfig streamsConfig = new StreamsConfig(props); + props.put("custom.property.host", "host"); + final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId"); + final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId"); + assertEquals("host", consumerConfigs.get("custom.property.host")); + assertEquals("host", restoreConsumerConfigs.get("custom.property.host")); + assertEquals("host", producerConfigs.get("custom.property.host")); + assertEquals("host", adminConfigs.get("custom.property.host")); + } + + @Test + public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() { + final StreamsConfig streamsConfig = new StreamsConfig(props); + props.put("custom.property.host", "host0"); + props.put(consumerPrefix("custom.property.host"), "host1"); + props.put(producerPrefix("custom.property.host"), "host2"); + props.put(adminClientPrefix("custom.property.host"), "host3"); + final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId"); + final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId"); + assertEquals("host1", consumerConfigs.get("custom.property.host")); + assertEquals("host1", restoreConsumerConfigs.get("custom.property.host")); + assertEquals("host2", producerConfigs.get("custom.property.host")); + assertEquals("host3", adminConfigs.get("custom.property.host")); + } + @Test public void shouldSupportNonPrefixedAdminConfigs() { props.put(AdminClientConfig.RETRIES_CONFIG, 10); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams configuration requires consumer. and producer. in order to be read > -------------------------------------------------------------------------- > > Key: KAFKA-6166 > URL: https://issues.apache.org/jira/browse/KAFKA-6166 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.0 > Environment: Kafka 0.11.0.0 > JDK 1.8 > CoreOS > Reporter: Justin Manchester > Assignee: Filipe Agapito > Priority: Minor > Labels: newbie++, user-experience > > Problem: > In previous release you could specify a custom metrics reporter like so: > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); > config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, > "com.mycompany.MetricReporter"); > config.put("custom-key-for-metric-reporter", "value"); > From 0.11.0.0 onwards this is no longer possible, as you have to specify > consumer.custom-key-for-metric-reporter or > producer.custom-key-for-metric-reporter otherwise it's stripped out of the > configuration. > So, if you wish to use a metrics reporter and to collect producer and > consumer metrics, as well as kafka-streams metrics, that you would need to > specify 3 distinct configs: > 1) consumer.custom-key-for-metric-reporter > 2) producer.custom-key-for-metric-reporter > 3) custom-key-for-metric-reporter > This appears to be a regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)