This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 10312ed KAFKA-6166: Streams configuration requires consumer. and
producer. in order to be read (#4434)
10312ed is described below
commit 10312ed805164643852a8fb21f705db81b952d8e
Author: Filipe Agapito <[email protected]>
AuthorDate: Tue Jan 30 18:18:51 2018 +0000
KAFKA-6166: Streams configuration requires consumer. and producer. in order
to be read (#4434)
* Implement method to get custom properties
* Add custom properties to getConsumerConfigs and getProducerConfigs
* Add tests
Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang
<[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 20 ++++++++++++++++
.../apache/kafka/streams/StreamsConfigTest.java | 28 ++++++++++++++++++----
2 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 7fc3d5c..42e65df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -644,6 +644,7 @@ public class StreamsConfig extends AbstractConfig {
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ?
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+ consumerProps.putAll(getClientCustomProps());
consumerProps.putAll(clientProvidedProps);
// bootstrap.servers should be from StreamsConfig
@@ -758,6 +759,7 @@ public class StreamsConfig extends AbstractConfig {
// generate producer configs from original properties and overridden
maps
final Map<String, Object> props = new HashMap<>(eosEnabled ?
PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
+ props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -775,6 +777,24 @@ public class StreamsConfig extends AbstractConfig {
}
/**
+ * Get a map of custom configs by removing from the originals all the
Streams, Consumer, Producer, and AdminClient configs.
+ * Prefixed properties are also removed because they are already added by
{@link #getClientPropsWithPrefix(String, Set)}.
+ * This allows to set a custom property for a specific client alone if
specified using a prefix, or for all
+ * when no prefix is used.
+ *
+ * @return a map with the custom properties
+ */
+ private Map<String, Object> getClientCustomProps() {
+ final Map<String, Object> props = originals();
+ props.keySet().removeAll(CONFIG.names());
+ props.keySet().removeAll(ConsumerConfig.configNames());
+ props.keySet().removeAll(ProducerConfig.configNames());
+ props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX,
false).keySet());
+ props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX,
false).keySet());
+ return props;
+ }
+
+ /**
* Return an {@link Serde#configure(Map, boolean) configured} instance of
{@link #KEY_SERDE_CLASS_CONFIG key Serde
* class}. This method is deprecated. Use {@link #defaultKeySerde()}
method instead.
*
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3774a8e..b217bf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -63,7 +63,6 @@ public class StreamsConfigTest {
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
- props.put("DUMMY", "dummy");
props.put("key.deserializer.encoding", "UTF8");
props.put("value.deserializer.encoding", "UTF-16");
streamsConfig = new StreamsConfig(props);
@@ -87,7 +86,6 @@ public class StreamsConfigTest {
final Map<String, Object> returnedProps =
streamsConfig.getProducerConfigs(clientId);
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG),
clientId + "-producer");
assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG),
"100");
- assertNull(returnedProps.get("DUMMY"));
}
@Test
@@ -98,7 +96,6 @@ public class StreamsConfigTest {
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG),
clientId + "-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG),
groupId);
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
- assertNull(returnedProps.get("DUMMY"));
}
@Test
@@ -107,7 +104,6 @@ public class StreamsConfigTest {
final Map<String, Object> returnedProps =
streamsConfig.getRestoreConsumerConfigs(clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG),
clientId + "-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
- assertNull(returnedProps.get("DUMMY"));
}
@Test
@@ -227,7 +223,31 @@ public class StreamsConfigTest {
assertEquals(1,
configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
+ @Test
+ public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ props.put("custom.property.host", "host");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ final Map<String, Object> restoreConsumerConfigs =
streamsConfig.getRestoreConsumerConfigs("clientId");
+ final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
+ assertEquals("host", consumerConfigs.get("custom.property.host"));
+ assertEquals("host",
restoreConsumerConfigs.get("custom.property.host"));
+ assertEquals("host", producerConfigs.get("custom.property.host"));
+ }
+ @Test
+ public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ props.put("custom.property.host", "host0");
+ props.put(consumerPrefix("custom.property.host"), "host1");
+ props.put(producerPrefix("custom.property.host"), "host2");
+ final Map<String, Object> consumerConfigs =
streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ final Map<String, Object> restoreConsumerConfigs =
streamsConfig.getRestoreConsumerConfigs("clientId");
+ final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
+ assertEquals("host1", consumerConfigs.get("custom.property.host"));
+ assertEquals("host1",
restoreConsumerConfigs.get("custom.property.host"));
+ assertEquals("host2", producerConfigs.get("custom.property.host"));
+ }
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
--
To stop receiving notification emails like this one, please contact
[email protected].