This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fdc14da MINOR: Code refacotring in KTable-KTable Join (#4486)
fdc14da is described below
commit fdc14dacedc3d3497dcc222b44f539e6355b45a9
Author: Guozhang Wang <[email protected]>
AuthorDate: Mon Jan 29 17:48:53 2018 -0800
MINOR: Code refacotring in KTable-KTable Join (#4486)
1. Rename KTableKTableJoin to KTableKTableInnerJoin. Also removed abstract
from other joins.
2. Merge KTableKTableJoinValueGetter.java into KTableKTableInnerJoin.
3. Use set instead of arrays in the stores function, to avoid duplicate
stores to be connected to processors.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/kstream/internals/KTableImpl.java | 4 +-
...TableKTableAbstractJoinValueGetterSupplier.java | 5 +-
...eKTableJoin.java => KTableKTableInnerJoin.java} | 51 ++++++++++++++----
.../kstream/internals/KTableKTableJoinMerger.java | 21 +++-----
.../internals/KTableKTableJoinValueGetter.java | 62 ----------------------
.../kstream/internals/KTableKTableLeftJoin.java | 10 ++--
.../kstream/internals/KTableKTableOuterJoin.java | 10 ++--
.../kstream/internals/KTableKTableRightJoin.java | 10 ++--
...oinTest.java => KTableKTableInnerJoinTest.java} | 2 +-
9 files changed, 70 insertions(+), 105 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 0cf56ef..a746d31 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -756,8 +756,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K>
implements KTable<K,
final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
if (!leftOuter) { // inner
- joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>)
other, joiner);
- joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other,
this, reverseJoiner(joiner));
+ joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?,
V1>) other, joiner);
+ joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, V1>)
other, this, reverseJoiner(joiner));
} else if (!rightOuter) { // left
joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>)
other, joiner);
joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>)
other, this, reverseJoiner(joiner));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
index d36920a..f2de67f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
@@ -16,8 +16,9 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1,
V2> implements KTableValueGetterSupplier<K, R> {
final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
@@ -33,7 +34,7 @@ public abstract class
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2>
public String[] storeNames() {
final String[] storeNames1 = valueGetterSupplier1.storeNames();
final String[] storeNames2 = valueGetterSupplier2.storeNames();
- final ArrayList<String> stores = new ArrayList<>(storeNames1.length +
storeNames2.length);
+ final Set<String> stores = new HashSet<>(storeNames1.length +
storeNames2.length);
Collections.addAll(stores, storeNames1);
Collections.addAll(stores, storeNames2);
return stores.toArray(new String[stores.size()]);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
similarity index 59%
rename from
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
rename to
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index c424f4f..e170175 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
-class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
V1, V2> {
+class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K,
R, V1, V2> {
private final KeyValueMapper<K, V1, K> keyValueMapper = new
KeyValueMapper<K, V1, K>() {
@Override
@@ -31,7 +31,7 @@ class KTableKTableJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R, V1,
}
};
- KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2,
ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
+ KTableKTableInnerJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2>
table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
super(table1, table2, joiner);
}
@@ -42,20 +42,17 @@ class KTableKTableJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R, V1,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new
KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
+ return new
KTableKTableInnerJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
}
- private class KTableKTableAbstractJoinValueGetterSupplier extends
org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier<K,
R, V1, V2> {
+ private class KTableKTableInnerJoinValueGetterSupplier extends
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
- public
KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1>
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+ KTableKTableInnerJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
- return new
KTableKTableJoinValueGetter<>(valueGetterSupplier1.get(),
-
valueGetterSupplier2.get(),
- joiner,
- keyValueMapper);
+ return new
KTableKTableInnerJoinValueGetter(valueGetterSupplier1.get(),
valueGetterSupplier2.get());
}
}
@@ -63,7 +60,7 @@ class KTableKTableJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R, V1,
private final KTableValueGetter<K, V2> valueGetter;
- public KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter)
{
+ KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}
@@ -100,4 +97,38 @@ class KTableKTableJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R, V1,
}
}
+ private class KTableKTableInnerJoinValueGetter implements
KTableValueGetter<K, R> {
+
+ private final KTableValueGetter<K, V1> valueGetter1;
+ private final KTableValueGetter<K, V2> valueGetter2;
+
+ KTableKTableInnerJoinValueGetter(final KTableValueGetter<K, V1>
valueGetter1,
+ final KTableValueGetter<K, V2>
valueGetter2) {
+ this.valueGetter1 = valueGetter1;
+ this.valueGetter2 = valueGetter2;
+ }
+
+ @Override
+ public void init(final ProcessorContext context) {
+ valueGetter1.init(context);
+ valueGetter2.init(context);
+ }
+
+ @Override
+ public R get(final K key) {
+ final V1 value1 = valueGetter1.get(key);
+
+ if (value1 != null) {
+ V2 value2 = valueGetter2.get(keyValueMapper.apply(key,
value1));
+
+ if (value2 != null) {
+ return joiner.apply(value1, value2);
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index d27b8bd..6400750 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -21,6 +21,10 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V>
{
private final KTableImpl<K, ?, V> parent1;
@@ -56,21 +60,12 @@ class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K, V, V> {
@Override
public String[] storeNames() {
- // we need to allow the downstream processor to be able to
access both ends of the joining table's value getters
final String[] storeNames1 =
parent1.valueGetterSupplier().storeNames();
final String[] storeNames2 =
parent2.valueGetterSupplier().storeNames();
-
- final String[] stores = new String[storeNames1.length +
storeNames2.length];
- int i = 0;
- for (final String storeName : storeNames1) {
- stores[i] = storeName;
- i++;
- }
- for (final String storeName : storeNames2) {
- stores[i] = storeName;
- i++;
- }
- return stores;
+ final Set<String> stores = new
HashSet<>(storeNames1.length + storeNames2.length);
+ Collections.addAll(stores, storeNames1);
+ Collections.addAll(stores, storeNames2);
+ return stores.toArray(new String[stores.size()]);
}
};
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
deleted file mode 100644
index c8c3eb7..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-class KTableKTableJoinValueGetter<K1, V1, K2, V2, R> implements
KTableValueGetter<K1, R> {
-
- private final KTableValueGetter<K1, V1> valueGetter1;
- private final KTableValueGetter<K2, V2> valueGetter2;
- private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
- private final KeyValueMapper<K1, V1, K2> keyValueMapper;
-
- public KTableKTableJoinValueGetter(final KTableValueGetter<K1, V1>
valueGetter1,
- final KTableValueGetter<K2, V2>
valueGetter2,
- final ValueJoiner<? super V1, ? super
V2, ? extends R> joiner,
- final KeyValueMapper<K1, V1, K2>
keyValueMapper) {
- this.valueGetter1 = valueGetter1;
- this.valueGetter2 = valueGetter2;
- this.joiner = joiner;
- this.keyValueMapper = keyValueMapper;
- }
-
- @Override
- public void init(ProcessorContext context) {
- valueGetter1.init(context);
- valueGetter2.init(context);
- }
-
- @Override
- public R get(K1 key) {
- R newValue = null;
- V1 value1 = valueGetter1.get(key);
-
- if (value1 != null) {
- V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
-
- if (value2 != null) {
- newValue = joiner.apply(value1, value2);
- }
- }
-
- return newValue;
- }
-
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 33aef02..bb3e652 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -34,12 +34,12 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new
KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
+ return new
KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
}
- private class KTableKTableLeftAbstractJoinValueGetterSupplier extends
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
+ private class KTableKTableLeftJoinValueGetterSupplier extends
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
- public
KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
+ KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
@@ -53,7 +53,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V2> valueGetter;
- public KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2>
valueGetter) {
+ KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}
@@ -94,7 +94,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
- public KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1>
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+ KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1,
KTableValueGetter<K, V2> valueGetter2) {
this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index d2e1d79..e7c170e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -34,12 +34,12 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new
KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
+ return new
KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
}
- private class KTableKTableOuterAbstractJoinValueGetterSupplier extends
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
+ private class KTableKTableOuterJoinValueGetterSupplier extends
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
- public
KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
+ KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
@@ -52,7 +52,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V2> valueGetter;
- public KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2>
valueGetter) {
+ KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}
@@ -94,7 +94,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
- public KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1>
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+ KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1>
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index f4c840b..c540cf9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -35,12 +35,12 @@ class KTableKTableRightJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
- return new
KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
+ return new
KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1,
valueGetterSupplier2);
}
- private class KTableKTableRightAbstractJoinValueGetterSupplier extends
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
+ private class KTableKTableRightJoinValueGetterSupplier extends
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
- public
KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
+ KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K,
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2>
valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
@@ -53,7 +53,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V2> valueGetter;
- public KTableKTableRightJoinProcessor(KTableValueGetter<K, V2>
valueGetter) {
+ KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}
@@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends
KTableKTableAbstractJoin<K, R,
private final KTableValueGetter<K, V1> valueGetter1;
private final KTableValueGetter<K, V2> valueGetter2;
- public KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1>
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+ KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1>
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
this.valueGetter1 = valueGetter1;
this.valueGetter2 = valueGetter2;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
similarity index 99%
rename from
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
rename to
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 09d4aa0..b890e2f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-public class KTableKTableJoinTest {
+public class KTableKTableInnerJoinTest {
final private String topic1 = "topic1";
final private String topic2 = "topic2";
--
To stop receiving notification emails like this one, please contact
[email protected].