This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 1d560ed  KAFKA-7671: Stream-Global Table join should not reset 
repartition flag (#5959)
1d560ed is described below

commit 1d560edfd3dd80e1a1c61d07f2c21d7efa21e74f
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Wed Nov 28 21:15:26 2018 -0500

    KAFKA-7671: Stream-Global Table join should not reset repartition flag 
(#5959)
    
    This PR fixes an issue reported from a user. When we join a KStream with a 
GlobalKTable we should not reset the repartition flag as the stream may have 
previously changed its key, and the resulting stream could be used in an 
aggregation operation or join with another stream which may require a 
repartition for correct results.
    
    I've added a test which fails without the fix.
    
    Reviewers: John Roesler <j...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com>
---
 .../streams/kstream/internals/KStreamImpl.java     |  3 ++-
 .../streams/kstream/internals/KStreamImplTest.java | 24 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index e7dabbf..d31d7cf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -564,8 +564,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         final KTableValueGetterSupplier<K1, V1> valueGetterSupplier = 
((GlobalKTableImpl<K1, V1>) globalTable).valueGetterSupplier();
         final String name = builder.newProcessorName(LEFTJOIN_NAME);
+
         builder.internalTopologyBuilder.addProcessor(name, new 
KStreamGlobalKTableJoin<>(valueGetterSupplier, joiner, keyMapper, leftJoin), 
this.name);
-        return new KStreamImpl<>(builder, name, sourceNodes, false);
+        return new KStreamImpl<>(builder, name, sourceNodes, 
repartitionRequired);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 7aed8e1..0ee9d5e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -55,12 +55,15 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 
@@ -276,6 +279,27 @@ public class KStreamImplTest {
             }
         }
     }
+
+    @Test
+    public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final GlobalKTable<String, String> globalKTable = 
builder.globalTable("globalTopic");
+        final KeyValueMapper<String, String, String> kvMappper = (k, v) -> k + 
v;
+        final ValueJoiner<String, String, String> valueJoiner = (v1, v2) -> v1 
+ v2;
+        builder.<String, String>stream("topic").selectKey((k, v) -> v)
+            .join(globalKTable, kvMappper, valueJoiner)
+            .groupByKey()
+            .count();
+
+        final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
+        final String topology = builder.build().describe().toString();
+        final Matcher matcher = repartitionTopicPattern.matcher(topology);
+        assertTrue(matcher.find());
+        final String match = matcher.group();
+        assertThat(match, notNullValue());
+        assertTrue(match.endsWith("repartition"));
+
+    }
     
     @Test
     public void testToWithNullValueSerdeDoesntNPE() {

Reply via email to