Repository: kafka Updated Branches: refs/heads/trunk eb6f04a8b -> da8517182
KAFKA-3817: handle null keys in KTableRepartitionMap Author: Guozhang Wang <[email protected]> Reviewers: Jeff Klukas <[email protected]> Closes #1488 from guozhangwang/K3817-handle-null-groupedkey Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/da851718 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/da851718 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/da851718 Branch: refs/heads/trunk Commit: da8517182d2f30c4e03b33b38d41d2fa33621e24 Parents: eb6f04a Author: Guozhang Wang <[email protected]> Authored: Fri Jun 10 13:14:05 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Jun 10 13:14:05 2016 -0700 ---------------------------------------------------------------------- .../kstream/internals/KTableRepartitionMap.java | 31 +++++------- .../kstream/internals/KTableAggregateTest.java | 50 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/da851718/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index 2a7cf1b..bba1857 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -53,7 +53,6 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli public KTableValueGetter<K, KeyValue<K1, V1>> get() { return new KTableMapValueGetter(parentValueGetterSupplier.get()); } - }; } @@ -66,15 +65,6 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli throw new IllegalStateException("KTableRepartitionMap should always require sending old values."); } - private KeyValue<K1, V1> computeValue(K key, V value) { - KeyValue<K1, V1> newValue = null; - - if (key != null || value != null) - newValue = mapper.apply(key, value); - - return newValue; - } - private class KTableMapProcessor extends AbstractProcessor<K, Change<V>> { /** @@ -82,16 +72,18 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli */ @Override public void process(K key, Change<V> change) { - KeyValue<K1, V1> newPair = computeValue(key, change.newValue); - - // the selected repartition key should never be null - if (newPair.key == null) - throw new StreamsException("Record key for KTable repartition operator should not be null."); + // the original key should never be null + if (key == null) + throw new StreamsException("Record key for the grouping KTable should not be null."); - context().forward(newPair.key, new Change<>(newPair.value, null)); + KeyValue<K1, V1> newPair = mapper.apply(key, change.newValue); + KeyValue<K1, V1> oldPair = mapper.apply(key, change.oldValue); - if (change.oldValue != null) { - KeyValue<K1, V1> oldPair = computeValue(key, change.oldValue); + // if the selected repartition key or value is null, skip + if (newPair != null && newPair.key != null && newPair.value != null) { + context().forward(newPair.key, new Change<>(newPair.value, null)); + } + if (oldPair != null && oldPair.key != null && oldPair.value != null) { context().forward(oldPair.key, new Change<>(null, oldPair.value)); } } @@ -112,9 +104,8 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli @Override public KeyValue<K1, V1> get(K key) { - return computeValue(key, parentGetter.get(key)); + return mapper.apply(key, parentGetter.get(key)); } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/da851718/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index a614479..75e007d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -20,8 +20,10 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; @@ -96,4 +98,52 @@ public class KTableAggregateTest { "B:0+2+4-2+7", "B:0+2+4-2+7-4", "C:0+5+8", "C:0+5+8-5"), proc2.processed); } + + @Test + public void testAggRepartition() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() { + @Override + public KeyValue<String, String> apply(String key, String value) { + if (key.equals("null")) { + return KeyValue.pair(null, value + "s"); + } else if (key.equals("NULL")) { + return null; + } else { + return KeyValue.pair(value, value + "s"); + } + } + }, + stringSerde, + stringSerde + ) + .aggregate(MockInitializer.STRING_INIT, + MockAggregator.STRING_ADDER, + MockAggregator.STRING_REMOVER, + stringSerde, + "topic1-Canonized"); + + MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + driver = new KStreamTestDriver(builder, stateDir); + + driver.process(topic1, "A", "1"); + driver.process(topic1, "B", "2"); + driver.process(topic1, "null", "3"); + driver.process(topic1, "B", "4"); + driver.process(topic1, "NULL", "5"); + driver.process(topic1, "B", "7"); + + assertEquals(Utils.mkList( + "1:0+1s", + "2:0+2s", + "4:0+4s", + "2:0+2s-2s", + "7:0+7s", + "4:0+4s-4s"), proc2.processed); + } }
