This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ba237c5 HOTFIX: use ConsumedInternal in StreamsBuilder ba237c5 is described below commit ba237c5d21abb8b63c5edf53517654a214157582 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu May 17 17:20:12 2018 -0700 HOTFIX: use ConsumedInternal in StreamsBuilder --- .../org/apache/kafka/streams/StreamsBuilder.java | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index a05a9b3..ead8a76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -222,9 +222,10 @@ public class StreamsBuilder { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); + final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); + materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); return internalStreamsBuilder.table(topic, - new ConsumedInternal<>(consumed), + consumedInternal, new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); } @@ -271,10 +272,11 @@ public class StreamsBuilder { final Consumed<K, V> consumed) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); + final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); return internalStreamsBuilder.table(topic, - new ConsumedInternal<>(consumed), + consumedInternal, new MaterializedInternal<>( - Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde), + Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-")); } @@ -328,14 +330,15 @@ public class StreamsBuilder { final Consumed<K, V> consumed) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); + final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>( - Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde), + Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-"); - return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), materialized); + return internalStreamsBuilder.globalTable(topic, consumedInternal, materialized); } /** @@ -396,10 +399,11 @@ public class StreamsBuilder { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(consumed, "consumed can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); // always use the serdes from consumed - materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); + materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); return internalStreamsBuilder.globalTable(topic, - new ConsumedInternal<>(consumed), + consumedInternal, new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); } -- To stop receiving notification emails like this one, please contact guozh...@apache.org.