This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba237c5  HOTFIX: use ConsumedInternal in StreamsBuilder
ba237c5 is described below

commit ba237c5d21abb8b63c5edf53517654a214157582
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Thu May 17 17:20:12 2018 -0700

    HOTFIX: use ConsumedInternal in StreamsBuilder
---
 .../org/apache/kafka/streams/StreamsBuilder.java     | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index a05a9b3..ead8a76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -222,9 +222,10 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
+        
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(consumed),
+                                            consumedInternal,
                                             new 
MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
     }
 
@@ -271,10 +272,11 @@ public class StreamsBuilder {
                                                   final Consumed<K, V> 
consumed) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
         return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(consumed),
+                                            consumedInternal,
                                             new MaterializedInternal<>(
-                                                    Materialized.<K, V, 
KeyValueStore<Bytes, byte[]>>with(consumed.keySerde, consumed.valueSerde),
+                                                    Materialized.<K, V, 
KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()),
                                                     internalStreamsBuilder,
                                                     topic + "-"));
     }
@@ -328,14 +330,15 @@ public class StreamsBuilder {
                                                               final 
Consumed<K, V> consumed) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
+        final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materialized =
                 new MaterializedInternal<>(
-                        Materialized.<K, V, KeyValueStore<Bytes, 
byte[]>>with(consumed.keySerde, consumed.valueSerde),
+                        Materialized.<K, V, KeyValueStore<Bytes, 
byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
                         internalStreamsBuilder,
                         topic + "-");
 
 
-        return internalStreamsBuilder.globalTable(topic, new 
ConsumedInternal<>(consumed), materialized);
+        return internalStreamsBuilder.globalTable(topic, consumedInternal, 
materialized);
     }
 
     /**
@@ -396,10 +399,11 @@ public class StreamsBuilder {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
+        final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
         // always use the serdes from consumed
-        
materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
+        
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
         return internalStreamsBuilder.globalTable(topic,
-                                                  new 
ConsumedInternal<>(consumed),
+                                                  consumedInternal,
                                                   new 
MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to