This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e56ebbf [KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461) e56ebbf is described below commit e56ebbffca57741d398283e46073ed4170f8f927 Author: Ted Yu <yuzhih...@gmail.com> AuthorDate: Sat Apr 20 11:30:20 2019 -0700 [KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461) Reviewers: Matthias J. Sax <matth...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- docs/streams/developer-guide/datatypes.html | 2 +- docs/streams/developer-guide/dsl-api.html | 3 ++- .../org/apache/kafka/streams/KafkaStreams.java | 26 ++++++++++++++++++++++ .../streams/processor/internals/SinkNode.java | 8 +++++++ .../streams/processor/internals/SourceNode.java | 8 +++++++ .../state/internals/MeteredKeyValueStore.java | 19 +++++++++++++--- .../state/internals/MeteredSessionStore.java | 19 +++++++++++++--- .../internals/MeteredTimestampedKeyValueStore.java | 21 ++++++++++++++--- .../internals/MeteredTimestampedWindowStore.java | 21 ++++++++++++++--- .../state/internals/MeteredWindowStore.java | 19 +++++++++++++--- .../state/internals/MeteredKeyValueStoreTest.java | 2 ++ .../state/internals/MeteredSessionStoreTest.java | 2 ++ .../MeteredTimestampedKeyValueStoreTest.java | 2 ++ 13 files changed, 135 insertions(+), 17 deletions(-) diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index 83159e8..d78202a 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -68,7 +68,7 @@ </div> <div class="section" id="overriding-default-serdes"> <h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2> - <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p> + <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default SerDe settings. For this case, Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p> <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span> <span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span> diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index f5c3df9..784dcca 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -3547,7 +3547,8 @@ val clicksPerRegion: KTable[String, Long] = // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic) </pre> - <p>A complete example of user-defined SerDes can be found in a test class within the library.</p> + <p>A complete example of user-defined SerDes can be found in a test class within the library. + Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p> </div> </div> </div> diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 315a6bb..e201bcd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -49,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.SinkNode; +import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -634,6 +636,26 @@ public class KafkaStreams implements AutoCloseable { this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM); } + @SuppressWarnings("unchecked") + private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> sources) { + for (final SinkNode sn : sinks) { + if (sn.getKeySerializer() != null) { + sn.getKeySerializer().configure(config.originals(), true); + } + if (sn.getValueSerializer() != null) { + sn.getValueSerializer().configure(config.originals(), false); + } + } + for (final SourceNode sn : sources) { + if (sn.getKeyDeSerializer() != null) { + sn.getKeyDeSerializer().configure(config.originals(), true); + } + if (sn.getValueDeSerializer() != null) { + sn.getValueDeSerializer().configure(config.originals(), false); + } + } + } + private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, @@ -670,6 +692,7 @@ public class KafkaStreams implements AutoCloseable { // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception final ProcessorTopology taskTopology = internalTopologyBuilder.build(); + configureSerDes(taskTopology.sinks(), taskTopology.sources()); streamsMetadataState = new StreamsMetadataState( internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); @@ -683,6 +706,7 @@ public class KafkaStreams implements AutoCloseable { log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); } final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)); final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() || (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore()); @@ -696,6 +720,8 @@ public class KafkaStreams implements AutoCloseable { final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener(); GlobalStreamThread.State globalThreadState = null; if (globalTaskTopology != null) { + configureSerDes(globalTaskTopology.sinks(), globalTaskTopology.sources()); + final String globalThreadId = clientId + "-GlobalStreamThread"; globalStreamThread = new GlobalStreamThread(globalTaskTopology, config, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 73bffc8..03e16c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -44,6 +44,14 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { this.partitioner = partitioner; } + public Serializer getKeySerializer() { + return keySerializer; + } + + public Serializer getValueSerializer() { + return valSerializer; + } + /** * @throws UnsupportedOperationException if this method adds a child to a sink node */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 87505ca..bcd6475 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -52,6 +52,14 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> { this(name, topics, null, keyDeserializer, valDeserializer); } + public Deserializer getKeyDeSerializer() { + return keyDeserializer; + } + + public Deserializer getValueDeSerializer() { + return valDeserializer; + } + K deserializeKey(final String topic, final Headers headers, final byte[] data) { return keyDeserializer.deserialize(topic, headers, data); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 51da3ed..277efd2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -115,10 +115,23 @@ public class MeteredKeyValueStore<K, V> @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { + final Serde<K> usedKeySerde; + final Serde<V> usedValueSerde; + final Map<String, Object> conf = context.appConfigs(); + if (keySerde == null) { + usedKeySerde = (Serde<K>) context.keySerde(); + } else { + usedKeySerde = keySerde; + usedKeySerde.configure(conf, true); + } + if (valueSerde == null) { + usedValueSerde = (Serde<V>) context.valueSerde(); + } else { + usedValueSerde = valueSerde; + usedValueSerde.configure(conf, false); + } serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 94b004e..1a55490 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -69,10 +69,23 @@ public class MeteredSessionStore<K, V> public void init(final ProcessorContext context, final StateStore root) { //noinspection unchecked + final Serde<K> usedKeySerde; + final Serde<V> usedValueSerde; + final Map<String, Object> conf = context.appConfigs(); + if (keySerde == null) { + usedKeySerde = (Serde<K>) context.keySerde(); + } else { + usedKeySerde = keySerde; + usedKeySerde.configure(conf, true); + } + if (valueSerde == null) { + usedValueSerde = (Serde<V>) context.valueSerde(); + } else { + usedValueSerde = valueSerde; + usedValueSerde.configure(conf, false); + } serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde); metrics = (StreamsMetricsImpl) context.metrics(); taskName = context.taskId().toString(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 2fa7c96..9a239e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; +import java.util.Map; + /** * A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality. @@ -48,9 +50,22 @@ public class MeteredTimestampedKeyValueStore<K, V> @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { + final Serde<K> usedKeySerde; + final Serde<ValueAndTimestamp<V>> usedValueSerde; + final Map<String, Object> conf = context.appConfigs(); + if (keySerde == null) { + usedKeySerde = (Serde<K>) context.keySerde(); + } else { + usedKeySerde = keySerde; + usedKeySerde.configure(conf, true); + } + if (valueSerde == null) { + usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde(); + } else { + usedValueSerde = valueSerde; + usedValueSerde.configure(conf, false); + } serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde); + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java index 1c10491..7bc5486 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java @@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowStore; +import java.util.Map; + /** * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its * inner WindowStore implementation do not need to provide its own metrics collecting functionality. @@ -50,9 +52,22 @@ class MeteredTimestampedWindowStore<K, V> @SuppressWarnings("unchecked") @Override void initStoreSerde(final ProcessorContext context) { + final Serde<K> usedKeySerde; + final Serde<ValueAndTimestamp<V>> usedValueSerde; + final Map<String, Object> conf = context.appConfigs(); + if (keySerde == null) { + usedKeySerde = (Serde<K>) context.keySerde(); + } else { + usedKeySerde = keySerde; + usedKeySerde.configure(conf, true); + } + if (valueSerde == null) { + usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde(); + } else { + usedValueSerde = valueSerde; + usedValueSerde.configure(conf, false); + } serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde); + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 6d2eaab..74de63e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -99,10 +99,23 @@ public class MeteredWindowStore<K, V> @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { + final Serde<K> usedKeySerde; + final Serde<V> usedValueSerde; + final Map<String, Object> conf = context.appConfigs(); + if (keySerde == null) { + usedKeySerde = (Serde<K>) context.keySerde(); + } else { + usedKeySerde = keySerde; + usedKeySerde.configure(conf, true); + } + if (valueSerde == null) { + usedValueSerde = (Serde<V>) context.valueSerde(); + } else { + usedValueSerde = valueSerde; + usedValueSerde.configure(conf, false); + } serdes = new StateSerdes<>( - ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), - keySerde == null ? (Serde<K>) context.keySerde() : keySerde, - valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); + ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 5cbe95c..51c14ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -40,6 +40,7 @@ import org.junit.runner.RunWith; import java.util.Collections; import java.util.List; +import java.util.HashMap; import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -92,6 +93,7 @@ public class MeteredKeyValueStoreTest { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)); expect(context.taskId()).andReturn(taskId); + expect(context.appConfigs()).andReturn(new HashMap<>()); expect(inner.name()).andReturn("metered").anyTimes(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 30c382b..e5eb9e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -42,6 +42,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -91,6 +92,7 @@ public class MeteredSessionStoreTest { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)); expect(context.taskId()).andReturn(taskId); + expect(context.appConfigs()).andReturn(new HashMap<>()); expect(inner.name()).andReturn("metered").anyTimes(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 0f60d24..587d369 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -41,6 +41,7 @@ import org.junit.runner.RunWith; import java.util.Collections; import java.util.List; +import java.util.HashMap; import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -94,6 +95,7 @@ public class MeteredTimestampedKeyValueStoreTest { metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)); expect(context.taskId()).andReturn(taskId); + expect(context.appConfigs()).andReturn(new HashMap<>()); expect(inner.name()).andReturn("metered").anyTimes(); }