This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fdc14da  MINOR: Code refacotring in KTable-KTable Join (#4486)
fdc14da is described below

commit fdc14dacedc3d3497dcc222b44f539e6355b45a9
Author: Guozhang Wang <[email protected]>
AuthorDate: Mon Jan 29 17:48:53 2018 -0800

    MINOR: Code refacotring in KTable-KTable Join (#4486)
    
    1. Rename KTableKTableJoin to KTableKTableInnerJoin. Also removed abstract 
from other joins.
    2. Merge KTableKTableJoinValueGetter.java into KTableKTableInnerJoin.
    3. Use set instead of arrays in the stores function, to avoid duplicate 
stores to be connected to processors.
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../streams/kstream/internals/KTableImpl.java      |  4 +-
 ...TableKTableAbstractJoinValueGetterSupplier.java |  5 +-
 ...eKTableJoin.java => KTableKTableInnerJoin.java} | 51 ++++++++++++++----
 .../kstream/internals/KTableKTableJoinMerger.java  | 21 +++-----
 .../internals/KTableKTableJoinValueGetter.java     | 62 ----------------------
 .../kstream/internals/KTableKTableLeftJoin.java    | 10 ++--
 .../kstream/internals/KTableKTableOuterJoin.java   | 10 ++--
 .../kstream/internals/KTableKTableRightJoin.java   | 10 ++--
 ...oinTest.java => KTableKTableInnerJoinTest.java} |  2 +-
 9 files changed, 70 insertions(+), 105 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 0cf56ef..a746d31 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -756,8 +756,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         final KTableKTableAbstractJoin<K, R, V1, V> joinOther;
 
         if (!leftOuter) { // inner
-            joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) 
other, joiner);
-            joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, 
this, reverseJoiner(joiner));
+            joinThis = new KTableKTableInnerJoin<>(this, (KTableImpl<K, ?, 
V1>) other, joiner);
+            joinOther = new KTableKTableInnerJoin<>((KTableImpl<K, ?, V1>) 
other, this, reverseJoiner(joiner));
         } else if (!rightOuter) { // left
             joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) 
other, joiner);
             joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) 
other, this, reverseJoiner(joiner));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
index d36920a..f2de67f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoinValueGetterSupplier.java
@@ -16,8 +16,9 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 public abstract class KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, 
V2> implements KTableValueGetterSupplier<K, R> {
     final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
@@ -33,7 +34,7 @@ public abstract class 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2>
     public String[] storeNames() {
         final String[] storeNames1 = valueGetterSupplier1.storeNames();
         final String[] storeNames2 = valueGetterSupplier2.storeNames();
-        final ArrayList<String> stores = new ArrayList<>(storeNames1.length + 
storeNames2.length);
+        final Set<String> stores = new HashSet<>(storeNames1.length + 
storeNames2.length);
         Collections.addAll(stores, storeNames1);
         Collections.addAll(stores, storeNames2);
         return stores.toArray(new String[stores.size()]);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
similarity index 59%
rename from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
rename to 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index c424f4f..e170175 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
-class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, 
V1, V2> {
+class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, 
R, V1, V2> {
 
     private final KeyValueMapper<K, V1, K> keyValueMapper = new 
KeyValueMapper<K, V1, K>() {
         @Override
@@ -31,7 +31,7 @@ class KTableKTableJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R, V1,
         }
     };
 
-    KTableKTableJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> table2, 
ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
+    KTableKTableInnerJoin(KTableImpl<K, ?, V1> table1, KTableImpl<K, ?, V2> 
table2, ValueJoiner<? super V1, ? super V2, ? extends R> joiner) {
         super(table1, table2, joiner);
     }
 
@@ -42,20 +42,17 @@ class KTableKTableJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R, V1,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new 
KTableKTableAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableInnerJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableAbstractJoinValueGetterSupplier extends 
org.apache.kafka.streams.kstream.internals.KTableKTableAbstractJoinValueGetterSupplier<K,
 R, V1, V2> {
+    private class KTableKTableInnerJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
 
-        public 
KTableKTableAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> 
valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
+        KTableKTableInnerJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
         public KTableValueGetter<K, R> get() {
-            return new 
KTableKTableJoinValueGetter<>(valueGetterSupplier1.get(),
-                                                     
valueGetterSupplier2.get(),
-                                                     joiner,
-                                                     keyValueMapper);
+            return new 
KTableKTableInnerJoinValueGetter(valueGetterSupplier1.get(), 
valueGetterSupplier2.get());
         }
     }
 
@@ -63,7 +60,7 @@ class KTableKTableJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R, V1,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) 
{
+        KTableKTableJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -100,4 +97,38 @@ class KTableKTableJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R, V1,
         }
     }
 
+    private class KTableKTableInnerJoinValueGetter implements 
KTableValueGetter<K, R> {
+
+        private final KTableValueGetter<K, V1> valueGetter1;
+        private final KTableValueGetter<K, V2> valueGetter2;
+
+        KTableKTableInnerJoinValueGetter(final KTableValueGetter<K, V1> 
valueGetter1,
+                                         final KTableValueGetter<K, V2> 
valueGetter2) {
+            this.valueGetter1 = valueGetter1;
+            this.valueGetter2 = valueGetter2;
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            valueGetter1.init(context);
+            valueGetter2.init(context);
+        }
+
+        @Override
+        public R get(final K key) {
+            final V1 value1 = valueGetter1.get(key);
+
+            if (value1 != null) {
+                V2 value2 = valueGetter2.get(keyValueMapper.apply(key, 
value1));
+
+                if (value2 != null) {
+                    return joiner.apply(value1, value2);
+                } else {
+                    return null;
+                }
+            } else {
+                return null;
+            }
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index d27b8bd..6400750 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -21,6 +21,10 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> 
{
 
     private final KTableImpl<K, ?, V> parent1;
@@ -56,21 +60,12 @@ class KTableKTableJoinMerger<K, V> implements 
KTableProcessorSupplier<K, V, V> {
 
                 @Override
                 public String[] storeNames() {
-                    // we need to allow the downstream processor to be able to 
access both ends of the joining table's value getters
                     final String[] storeNames1 = 
parent1.valueGetterSupplier().storeNames();
                     final String[] storeNames2 = 
parent2.valueGetterSupplier().storeNames();
-
-                    final String[] stores = new String[storeNames1.length + 
storeNames2.length];
-                    int i = 0;
-                    for (final String storeName : storeNames1) {
-                        stores[i] = storeName;
-                        i++;
-                    }
-                    for (final String storeName : storeNames2) {
-                        stores[i] = storeName;
-                        i++;
-                    }
-                    return stores;
+                    final Set<String> stores = new 
HashSet<>(storeNames1.length + storeNames2.length);
+                    Collections.addAll(stores, storeNames1);
+                    Collections.addAll(stores, storeNames2);
+                    return stores.toArray(new String[stores.size()]);
                 }
             };
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
deleted file mode 100644
index c8c3eb7..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinValueGetter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-class KTableKTableJoinValueGetter<K1, V1, K2, V2, R> implements 
KTableValueGetter<K1, R> {
-
-    private final KTableValueGetter<K1, V1> valueGetter1;
-    private final KTableValueGetter<K2, V2> valueGetter2;
-    private final ValueJoiner<? super V1, ? super V2, ? extends R>  joiner;
-    private final KeyValueMapper<K1, V1, K2> keyValueMapper;
-
-    public KTableKTableJoinValueGetter(final KTableValueGetter<K1, V1> 
valueGetter1,
-                                       final KTableValueGetter<K2, V2> 
valueGetter2,
-                                       final ValueJoiner<? super V1, ? super 
V2, ? extends R>  joiner,
-                                       final KeyValueMapper<K1, V1, K2> 
keyValueMapper) {
-        this.valueGetter1 = valueGetter1;
-        this.valueGetter2 = valueGetter2;
-        this.joiner = joiner;
-        this.keyValueMapper = keyValueMapper;
-    }
-
-    @Override
-    public void init(ProcessorContext context) {
-        valueGetter1.init(context);
-        valueGetter2.init(context);
-    }
-
-    @Override
-    public R get(K1 key) {
-        R newValue = null;
-        V1 value1 = valueGetter1.get(key);
-
-        if (value1 != null) {
-            V2 value2 = valueGetter2.get(keyValueMapper.apply(key, value1));
-
-            if (value2 != null) {
-                newValue = joiner.apply(value1, value2);
-            }
-        }
-
-        return newValue;
-    }
-
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 33aef02..bb3e652 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -34,12 +34,12 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new 
KTableKTableLeftAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableLeftAbstractJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
+    private class KTableKTableLeftJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
 
-        public 
KTableKTableLeftAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
+        KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
@@ -53,7 +53,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> 
valueGetter) {
+        KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -94,7 +94,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
 
-        public KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> 
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+        KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> valueGetter1, 
KTableValueGetter<K, V2> valueGetter2) {
             this.valueGetter1 = valueGetter1;
             this.valueGetter2 = valueGetter2;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index d2e1d79..e7c170e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -34,12 +34,12 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new 
KTableKTableOuterAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableOuterAbstractJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
+    private class KTableKTableOuterJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
 
-        public 
KTableKTableOuterAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
+        KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
@@ -52,7 +52,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> 
valueGetter) {
+        KTableKTableOuterJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -94,7 +94,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
 
-        public KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> 
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+        KTableKTableOuterJoinValueGetter(KTableValueGetter<K, V1> 
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
             this.valueGetter1 = valueGetter1;
             this.valueGetter2 = valueGetter2;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index f4c840b..c540cf9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -35,12 +35,12 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
     @Override
     public KTableValueGetterSupplier<K, R> view() {
-        return new 
KTableKTableRightAbstractJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
+        return new 
KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, 
valueGetterSupplier2);
     }
 
-    private class KTableKTableRightAbstractJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
+    private class KTableKTableRightJoinValueGetterSupplier extends 
KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
 
-        public 
KTableKTableRightAbstractJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
+        KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, 
V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> 
valueGetterSupplier2) {
             super(valueGetterSupplier1, valueGetterSupplier2);
         }
 
@@ -53,7 +53,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
         private final KTableValueGetter<K, V2> valueGetter;
 
-        public KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> 
valueGetter) {
+        KTableKTableRightJoinProcessor(KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
         }
 
@@ -94,7 +94,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
         private final KTableValueGetter<K, V1> valueGetter1;
         private final KTableValueGetter<K, V2> valueGetter2;
 
-        public KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1> 
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
+        KTableKTableRightJoinValueGetter(KTableValueGetter<K, V1> 
valueGetter1, KTableValueGetter<K, V2> valueGetter2) {
             this.valueGetter1 = valueGetter1;
             this.valueGetter2 = valueGetter2;
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
similarity index 99%
rename from 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 09d4aa0..b890e2f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class KTableKTableJoinTest {
+public class KTableKTableInnerJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to