Repository: kafka Updated Branches: refs/heads/trunk b058fcf5e -> b8ea094b4
KAFKA-3786: Let ConfigDef filter property key value pairs Author: Guozhang Wang <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1465 from guozhangwang/K3786-config-parsing Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8ea094b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8ea094b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8ea094b Branch: refs/heads/trunk Commit: b8ea094b427768bb360d87fc0a07f670cb667e1e Parents: b058fcf Author: Guozhang Wang <[email protected]> Authored: Mon Jun 13 14:15:28 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Jun 13 14:15:28 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 5 + .../kafka/clients/producer/ProducerConfig.java | 5 + .../org/apache/kafka/streams/StreamsConfig.java | 120 ++++++++++++------- .../apache/kafka/streams/StreamsConfigTest.java | 10 +- 4 files changed, 93 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index e58f2fd..b7fc1d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; @@ -376,6 +377,10 @@ public class ConsumerConfig extends AbstractConfig { super(CONFIG, props); } + public static Set<String> configNames() { + return CONFIG.names(); + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index e505f71..47eb309 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.Serializer; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; @@ -336,6 +337,10 @@ public class ProducerConfig extends AbstractConfig { super(CONFIG, props); } + public static Set<String> configNames() { + return CONFIG.names(); + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 95e55c9..7f32434 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor; @@ -31,7 +32,10 @@ import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Set; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; @@ -212,6 +216,28 @@ public class StreamsConfig extends AbstractConfig { CommonClientConfigs.METRICS_NUM_SAMPLES_DOC); } + // this is the list of configs for underlying clients + // that streams prefer different default values + private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES; + static + { + Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(); + tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + + PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides); + } + + private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES; + static + { + Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(); + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); + tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); + } + public static class InternalConfig { public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__"; } @@ -220,8 +246,18 @@ public class StreamsConfig extends AbstractConfig { super(CONFIG, props); } - public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) { - Map<String, Object> props = getBaseConsumerConfigs(); + public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException { + Map<String, Object> originals = this.originals(); + + // disable auto commit and throw exception if there is user overridden values, + // this is necessary for streams commit semantics + if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + ", as the streams client will always turn off auto committing."); + } + + // generate consumer configs from original properties and overridden maps + Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES); // add client id with stream client id prefix, and group id props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -232,15 +268,24 @@ public class StreamsConfig extends AbstractConfig { props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); - if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, getString(ZOOKEEPER_CONNECT_CONFIG)); return props; } - public Map<String, Object> getRestoreConsumerConfigs(String clientId) { - Map<String, Object> props = getBaseConsumerConfigs(); + public Map<String, Object> getRestoreConsumerConfigs(String clientId) throws ConfigException { + Map<String, Object> originals = this.originals(); + + // disable auto commit and throw exception if there is user overridden values, + // this is necessary for streams commit semantics + if (originals.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { + throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + + ", as the streams client will always turn off auto committing."); + } + + // generate consumer configs from original properties and overridden maps + Map<String, Object> props = clientProps(ConsumerConfig.configNames(), originals, CONSUMER_DEFAULT_OVERRIDES); // no need to set group id for a restore consumer props.remove(ConsumerConfig.GROUP_ID_CONFIG); @@ -251,29 +296,9 @@ public class StreamsConfig extends AbstractConfig { return props; } - private Map<String, Object> getBaseConsumerConfigs() { - Map<String, Object> props = this.originals(); - - // remove streams properties - removeStreamsSpecificConfigs(props); - - // set consumer default property values - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - - return props; - } - public Map<String, Object> getProducerConfigs(String clientId) { - Map<String, Object> props = this.originals(); - - // remove consumer properties that are not required for producers - props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); - - // remove streams properties - removeStreamsSpecificConfigs(props); - - // set producer default property values - props.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + // generate producer configs from original properties and overridden maps + Map<String, Object> props = clientProps(ProducerConfig.configNames(), this.originals(), PRODUCER_DEFAULT_OVERRIDES); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); @@ -281,24 +306,6 @@ public class StreamsConfig extends AbstractConfig { return props; } - private void removeStreamsSpecificConfigs(Map<String, Object> props) { - props.remove(StreamsConfig.POLL_MS_CONFIG); - props.remove(StreamsConfig.STATE_DIR_CONFIG); - props.remove(StreamsConfig.APPLICATION_ID_CONFIG); - props.remove(StreamsConfig.KEY_SERDE_CLASS_CONFIG); - props.remove(StreamsConfig.VALUE_SERDE_CLASS_CONFIG); - props.remove(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG); - props.remove(StreamsConfig.REPLICATION_FACTOR_CONFIG); - props.remove(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); - props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG); - props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); - props.remove(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); - props.remove(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG); - props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); - props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); - props.remove(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE); - } - public Serde keySerde() { Serde<?> serde = getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), true); @@ -313,6 +320,29 @@ public class StreamsConfig extends AbstractConfig { return serde; } + /** + * Filter configs that are not defined in the given set of configuration names. + * + * @param configNames The given set of configuration names. + * @param originals The original configs to be filtered. + * @param overrides The default overridden values. + * @return Filtered configs. + */ + private Map<String, Object> clientProps(Set<String> configNames, Map<String, Object> originals, Map<String, Object> overrides) { + // iterate all client config names, filter out non-client configs from the original + // property map and use the overridden values when they are not specified by users + Map<String, Object> parsed = new HashMap<>(); + for (String configName: configNames) { + if (originals.containsKey(configName)) { + parsed.put(configName, originals.get(configName)); + } else if (overrides.containsKey(configName)) { + parsed.put(configName, overrides.get(configName)); + } + } + + return parsed; + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b8ea094b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 17d6b4b..3d4a9cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; @@ -44,6 +45,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put("DUMMY", "dummy"); props.put("key.deserializer.encoding", "UTF8"); props.put("value.deserializer.encoding", "UTF-16"); streamsConfig = new StreamsConfig(props); @@ -52,7 +54,9 @@ public class StreamsConfigTest { @Test public void testGetProducerConfigs() throws Exception { Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client"); - assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer"); + assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer"); + assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100"); + assertNull(returnedProps.get("DUMMY")); } @Test @@ -60,7 +64,8 @@ public class StreamsConfigTest { Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client"); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer"); assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application"); - + assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000"); + assertNull(returnedProps.get("DUMMY")); } @Test @@ -68,6 +73,7 @@ public class StreamsConfigTest { Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client"); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer"); assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); + assertNull(returnedProps.get("DUMMY")); } @Test
