http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 81f0ff9..08839cd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -176,13 +177,20 @@ public class TopologyBuilder { private final Pattern pattern; private final Deserializer<?> keyDeserializer; private final Deserializer<?> valDeserializer; - - private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) { + private final TimestampExtractor timestampExtractor; + + private SourceNodeFactory(final String name, + final String[] topics, + final Pattern pattern, + final TimestampExtractor timestampExtractor, + final Deserializer<?> keyDeserializer, + final Deserializer<?> valDeserializer) { super(name); this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>(); this.pattern = pattern; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; + this.timestampExtractor = timestampExtractor; } List<String> getTopics(Collection<String> subscribedTopics) { @@ -190,7 +198,7 @@ public class TopologyBuilder { // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder; // this should only happen for debugging since during runtime this function should always be called after the metadata has updated. if (subscribedTopics.isEmpty()) - return Collections.singletonList("Pattern[" + pattern + "]"); + return Collections.singletonList("" + pattern + ""); List<String> matchedTopics = new ArrayList<>(); for (String update : subscribedTopics) { @@ -218,9 +226,9 @@ public class TopologyBuilder { // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder; // this should only happen for debugging since during runtime this function should always be called after the metadata has updated. if (sourceTopics == null) - return new SourceNode<>(name, Collections.singletonList("Pattern[" + pattern + "]"), keyDeserializer, valDeserializer); + return new SourceNode<>(name, Collections.singletonList("" + pattern + ""), timestampExtractor, keyDeserializer, valDeserializer); else - return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), keyDeserializer, valDeserializer); + return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer); } private boolean isMatch(String topic) { @@ -325,9 +333,10 @@ public class TopologyBuilder { /** * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. - * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. @@ -335,14 +344,15 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null */ public synchronized final TopologyBuilder addSource(final String name, final String... topics) { - return addSource(null, name, null, null, topics); + return addSource(null, name, null, null, null, topics); } /** * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. - * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest * @param name the unique name of the source used to reference this node when @@ -350,33 +360,72 @@ public class TopologyBuilder { * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) { - return addSource(offsetReset, name, null, null, topics); + + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) { + return addSource(offsetReset, name, null, null, null, topics); + } + + /** + * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topics the name of one or more Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + */ + public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final String... topics) { + return addSource(null, name, timestampExtractor, null, null, topics); } + /** + * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * + * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; + * acceptable values earliest or latest + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topics the name of one or more Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + */ + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics) { + return addSource(offsetReset, name, timestampExtractor, null, null, topics); + } /** * Add a new source that consumes from topics matching the given pattern * and forward the records to child processor and/or sink nodes. - * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ + public synchronized final TopologyBuilder addSource(final String name, final Pattern topicPattern) { - return addSource(null, name, null, null, topicPattern); + return addSource(null, name, null, null, null, topicPattern); } /** * Add a new source that consumes from topics matching the given pattern * and forward the records to child processor and/or sink nodes. - * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest. * @param name the unique name of the source used to reference this node when @@ -384,50 +433,97 @@ public class TopologyBuilder { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) { - return addSource(offsetReset, name, null, null, topicPattern); + + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) { + return addSource(offsetReset, name, null, null, null, topicPattern); + } + + + /** + * Add a new source that consumes from topics matching the given pattern + * and forward the records to child processor and/or sink nodes. + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + */ + public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) { + return addSource(null, name, timestampExtractor, null, null, topicPattern); + } + + + /** + * Add a new source that consumes from topics matching the given pattern + * and forward the records to child processor and/or sink nodes. + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * + * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; + * acceptable values earliest or latest. + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + */ + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) { + return addSource(offsetReset, name, timestampExtractor, null, null, topicPattern); } /** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param topics the name of one or more Kafka topics that this source is to consume + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source */ - public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { - return addSource(null, name, keyDeserializer, valDeserializer, topics); + public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { + return addSource(null, name, null, keyDeserializer, valDeserializer, topics); } /** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest. - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param topics the name of one or more Kafka topics that this source is to consume + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; + * acceptable values are earliest or latest. + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source */ - public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { + + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valDeserializer, + final String... topics) { if (topics.length == 0) { throw new TopologyBuilderException("You must provide at least one topic"); } @@ -442,7 +538,7 @@ public class TopologyBuilder { sourceTopicNames.add(topic); } - nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer)); + nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer)); nodeToSourceTopics.put(name, Arrays.asList(topics)); nodeGrouper.add(name); @@ -460,9 +556,45 @@ public class TopologyBuilder { * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will * receive all records forwarded from the {@link SourceNode}. This * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. + * + * @param storeSupplier user defined state store supplier + * @param sourceName name of the {@link SourceNode} that will be automatically added + * @param keyDeserializer the {@link Deserializer} to deserialize keys with + * @param valueDeserializer the {@link Deserializer} to deserialize values with + * @param topic the topic to source the data from + * @param processorName the name of the {@link ProcessorSupplier} + * @param stateUpdateSupplier the instance of {@link ProcessorSupplier} + * @return this builder instance so methods can be chained together; never null + */ + public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, + final String sourceName, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer, + final String topic, + final String processorName, + final ProcessorSupplier stateUpdateSupplier) { + return addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier); + } + + + + /** + * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data + * from all partitions of the provided input topic. There will be exactly one instance of this + * {@link StateStore} per Kafka Streams instance. + * <p> + * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving + * from the partitions of the input topic. + * <p> + * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will + * receive all records forwarded from the {@link SourceNode}. This + * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. * * @param storeSupplier user defined state store supplier * @param sourceName name of the {@link SourceNode} that will be automatically added + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used * @param keyDeserializer the {@link Deserializer} to deserialize keys with * @param valueDeserializer the {@link Deserializer} to deserialize values with * @param topic the topic to source the data from @@ -472,6 +604,7 @@ public class TopologyBuilder { */ public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier, final String sourceName, + final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, @@ -502,7 +635,7 @@ public class TopologyBuilder { globalTopics.add(topic); final String[] topics = {topic}; - nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, keyDeserializer, valueDeserializer)); + nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer)); nodeToSourceTopics.put(sourceName, Arrays.asList(topics)); nodeGrouper.add(sourceName); @@ -537,23 +670,21 @@ public class TopologyBuilder { * The source will use the specified key and value deserializers. The provided * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for * topics that share the same key-value data format. + * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. * - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name */ public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) { - return addSource(null, name, keyDeserializer, valDeserializer, topicPattern); - + return addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern); } /** @@ -563,21 +694,27 @@ public class TopologyBuilder { * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for * topics that share the same key-value data format. * - * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest - * @param name the unique name of the source used to reference this node when - * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. - * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the - * {@link org.apache.kafka.streams.StreamsConfig stream configuration} - * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; + * acceptable values are earliest or latest + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param timestampExtractor the stateless timestamp extractor used for this source, + * if not specified the default extractor defined in the configs will be used + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name */ - public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) { + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, + final String name, + final TimestampExtractor timestampExtractor, + final Deserializer keyDeserializer, + final Deserializer valDeserializer, + final Pattern topicPattern) { Objects.requireNonNull(topicPattern, "topicPattern can't be null"); Objects.requireNonNull(name, "name can't be null"); @@ -593,17 +730,47 @@ public class TopologyBuilder { maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern); - nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer)); + nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer)); nodeToSourcePatterns.put(name, topicPattern); nodeGrouper.add(name); return this; } + + /** + * Add a new source that consumes from topics matching the given pattern + * and forwards the records to child processor and/or sink nodes. + * The source will use the specified key and value deserializers. The provided + * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for + * topics that share the same key-value data format. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; + * acceptable values are earliest or latest + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children} + * @param keyDeserializer key deserializer used to read this source, if not specified the default + * key deserializer defined in the configs will be used + * @param valDeserializer value deserializer used to read this source, + * if not specified the default value deserializer defined in the configs will be used + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name + */ + + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, + final String name, + final Deserializer keyDeserializer, + final Deserializer valDeserializer, + final Pattern topicPattern) { + return addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern); + } + + /** * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. - * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * * @param name the unique name of the sink @@ -622,8 +789,8 @@ public class TopologyBuilder { /** * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using * the supplied partitioner. - * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and - * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and + * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * <p> * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among @@ -653,10 +820,10 @@ public class TopologyBuilder { * @param name the unique name of the sink * @param topic the name of the Kafka topic to which this sink should write its records * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic @@ -676,10 +843,10 @@ public class TopologyBuilder { * @param name the unique name of the sink * @param topic the name of the Kafka topic to which this sink should write its records * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink - * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param partitioner the function that should be used to determine the partition for each record processed by the sink * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume @@ -1236,7 +1403,7 @@ public class TopologyBuilder { final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier; final InternalTopicConfig config = new InternalTopicConfig(name, Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, - InternalTopicConfig.CleanupPolicy.delete), + InternalTopicConfig.CleanupPolicy.delete), supplier.logConfig()); config.setRetentionMs(windowStoreSupplier.retentionPeriod()); return config;
http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 4a1cffb..bae6f22 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.processor.TimestampExtractor; import java.util.Collections; import java.util.Comparator; @@ -36,8 +35,6 @@ public class PartitionGroup { private final PriorityQueue<RecordQueue> queuesByTime; - private final TimestampExtractor timestampExtractor; - public static class RecordInfo { public RecordQueue queue; @@ -57,7 +54,7 @@ public class PartitionGroup { // since task is thread-safe, we do not need to synchronize on local variables private int totalBuffered; - public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues, TimestampExtractor timestampExtractor) { + public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues) { this.queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() { @Override @@ -73,8 +70,6 @@ public class PartitionGroup { this.partitionQueues = partitionQueues; - this.timestampExtractor = timestampExtractor; - this.totalBuffered = 0; } @@ -176,4 +171,4 @@ public class PartitionGroup { queue.clear(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 9f46a2c..486a2ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.streams.kstream.internals.ChangedDeserializer; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.TimestampExtractor; import java.util.List; @@ -29,14 +30,20 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { private ProcessorContext context; private Deserializer<K> keyDeserializer; private Deserializer<V> valDeserializer; + private final TimestampExtractor timestampExtractor; - public SourceNode(String name, List<String> topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { + public SourceNode(String name, List<String> topics, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { super(name); this.topics = topics; + this.timestampExtractor = timestampExtractor; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; } + public SourceNode(String name, List<String> topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { + this(name, topics, null, keyDeserializer, valDeserializer); + } + K deserializeKey(String topic, byte[] data) { return keyDeserializer.deserialize(topic, data); } @@ -93,4 +100,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { return sb.toString(); } -} + public TimestampExtractor getTimestampExtractor() { + return timestampExtractor; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4a49f9d..568a5b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -115,15 +115,15 @@ public class StreamTask extends AbstractTask implements Punctuator { // to corresponding source nodes in the processor topology final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>(); - final TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); - + final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); for (final TopicPartition partition : partitions) { final SourceNode source = topology.source(partition.topic()); - final RecordQueue queue = new RecordQueue(partition, source, timestampExtractor); + final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor; + final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor); partitionQueues.put(partition, queue); } - partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor); + partitionGroup = new PartitionGroup(partitionQueues); // initialize the consumed offset cache consumedOffsets = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index b229af9..cfd702e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -50,8 +50,8 @@ public class StreamsConfigTest { public void setUp() { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put("DUMMY", "dummy"); props.put("key.deserializer.encoding", "UTF8"); props.put("value.deserializer.encoding", "UTF-16"); @@ -95,11 +95,11 @@ public class StreamsConfigTest { serializer.configure(serializerConfigs, true); assertEquals("Should get the original string after serialization and deserialization with the configured encoding", - str, streamsConfig.keySerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); + str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); serializer.configure(serializerConfigs, false); assertEquals("Should get the original string after serialization and deserialization with the configured encoding", - str, streamsConfig.valueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); + str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); } @Test @@ -204,16 +204,16 @@ public class StreamsConfigTest { @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception { - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig streamsConfig = new StreamsConfig(props); - streamsConfig.keySerde(); + streamsConfig.defaultKeySerde(); } @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception { - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig streamsConfig = new StreamsConfig(props); - streamsConfig.valueSerde(); + streamsConfig.defaultValueSerde(); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java index 25dea92..8593317 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java @@ -102,7 +102,7 @@ public class FanoutIntegrationTest { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 09eceab..e5ed3d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -100,7 +100,7 @@ public class GlobalKTableIntegrationTest { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore); + globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore); stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream); table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table"); foreachAction = new ForeachAction<String, String>() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index a6bb044..29cdc1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -86,8 +86,8 @@ public class InternalTopicIntegrationTest { streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); @@ -136,8 +136,8 @@ public class InternalTopicIntegrationTest { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final KStreamBuilder builder = new KStreamBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index 8047611..040c784 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -126,8 +126,8 @@ public class JoinIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java index 1806321..9b5b428 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java @@ -86,8 +86,8 @@ public class KStreamKTableJoinIntegrationTest { streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 5fa7e49..c77cf3b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -78,8 +78,8 @@ public class KTableKTableJoinIntegrationTest { streamsConfig = new Properties(); streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index ab23af0..4b5ae17 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -137,8 +137,8 @@ public class QueryableStateIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath()); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); // override this to make the rebalances happen quickly streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 775ac8d..626f38d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -265,8 +265,8 @@ public class ResetIntegrationTest { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index e7cb669..82e5b23 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -25,10 +25,12 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.MockTimestampExtractor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -44,6 +46,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; public class KStreamBuilderTest { @@ -396,4 +400,54 @@ public class KStreamBuilderTest { assertTrue(builder.latestResetTopicsPattern().matcher(topicTwo).matches()); assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches()); } + + @SuppressWarnings("unchecked") + @Test + public void kStreamTimestampExtractorShouldBeNull() throws Exception { + builder.stream("topic"); + final ProcessorTopology processorTopology = builder.build(null); + assertNull(processorTopology.source("topic").getTimestampExtractor()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception { + builder.stream(new MockTimestampExtractor(), null, null, "topic"); + final ProcessorTopology processorTopology = builder.build(null); + for (final SourceNode sourceNode: processorTopology.sources()) { + assertThat(sourceNode.getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception { + builder.stream(null, new MockTimestampExtractor(), null, null, "topic"); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorToTablePerSource() throws Exception { + builder.table("topic", "store"); + final ProcessorTopology processorTopology = builder.build(null); + assertNull(processorTopology.source("topic").getTimestampExtractor()); + } + + @SuppressWarnings("unchecked") + @Test + public void kTableTimestampExtractorShouldBeNull() throws Exception { + builder.table("topic", "store"); + final ProcessorTopology processorTopology = builder.build(null); + assertNull(processorTopology.source("topic").getTimestampExtractor()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception { + builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store"); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java index 2526154..69b328d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java @@ -52,7 +52,7 @@ public class GlobalKTableJoinsTest { @Before public void setUp() throws Exception { stateDir = TestUtils.tempDirectory(); - global = builder.globalTable(Serdes.String(), Serdes.String(), globalTopic, "global-store"); + global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store"); stream = builder.stream(Serdes.String(), Serdes.String(), streamTopic); keyValueMapper = new KeyValueMapper<String, String, String>() { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 23b0389..d214fb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -279,14 +279,14 @@ public class KStreamImplTest { @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnJoinWithGlobalTable() throws Exception { - testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"), + testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), null, MockValueJoiner.TOSTRING_JOINER); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() throws Exception { - testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"), + testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), MockKeyValueMapper.<String, String>SelectValueMapper(), null); } @@ -300,14 +300,14 @@ public class KStreamImplTest { @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() throws Exception { - testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"), + testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), null, MockValueJoiner.TOSTRING_JOINER); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() throws Exception { - testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"), + testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"), MockKeyValueMapper.<String, String>SelectValueMapper(), null); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 9947870..6d6ec1d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -214,8 +214,8 @@ public class SimpleBenchmark { // the socket buffer needs to be large, especially when running in AWS with // high latency. if running locally the default is fine. props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS); props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS); return props; http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 41ab803..0b60b00 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; +import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.ProcessorTopologyTestDriver; import org.junit.Test; @@ -53,6 +54,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; +import static org.junit.Assert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; public class TopologyBuilderTest { @@ -118,7 +121,7 @@ public class TopologyBuilderTest { final Serde<String> stringSerde = Serdes.String(); try { - builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", stringSerde.deserializer(), stringSerde.deserializer(), new String[]{}); + builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), new String[]{}); fail("Should throw TopologyBuilderException with no topics"); } catch (TopologyBuilderException tpe) { //no-op @@ -130,9 +133,9 @@ public class TopologyBuilderTest { final TopologyBuilder builder = new TopologyBuilder(); final Serde<String> stringSerde = Serdes.String(); - builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", stringSerde.deserializer(), stringSerde.deserializer(), "topic-1"); + builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1"); try { - builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", stringSerde.deserializer(), stringSerde.deserializer(), "topic-2"); + builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2"); fail("Should throw TopologyBuilderException for duplicate source name"); } catch (TopologyBuilderException tpe) { //no-op @@ -701,6 +704,61 @@ public class TopologyBuilderTest { @SuppressWarnings("unchecked") @Test + public void shouldAddTimestampExtractorPerSource() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource(new MockTimestampExtractor(), "source", "topic"); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource(null, new MockTimestampExtractor(), "source", "topic"); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + final Pattern pattern = Pattern.compile("t.*"); + builder.addSource(new MockTimestampExtractor(), "source", pattern); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + final Pattern pattern = Pattern.compile("t.*"); + builder.addSource(null, new MockTimestampExtractor(), "source", pattern); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic"); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + final Pattern pattern = Pattern.compile("t.*"); + builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { final TopologyBuilder topologyBuilder = new TopologyBuilder() http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 031c732..bf3259e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -87,7 +87,7 @@ public class InternalTopicManagerTest { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "Internal-Topic-ManagerTest"); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 447843f..9f54b4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -51,7 +51,7 @@ public class PartitionGroupTest { put(partition1, queue1); put(partition2, queue2); } - }, timestampExtractor); + }); @Test public void testTimeTracking() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index d19c91a..369c47f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -78,9 +78,9 @@ public class ProcessorTopologyTest { props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath()); - props.setProperty(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.setProperty(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); + props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); this.config = new StreamsConfig(props); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 3d4411c..a358be5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -119,7 +119,7 @@ public class StandbyTaskTest { setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 4cc7b92..9937ad4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -111,7 +111,7 @@ public class StreamPartitionAssignorTest { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test"); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }; }
