This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e21e17  MINOR: Improve Join integration test coverage, PART I
3e21e17 is described below

commit 3e21e17a7d3613bfab8c34555c0f598a07cf0675
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Fri Jan 12 15:40:59 2018 -0800

    MINOR: Improve Join integration test coverage, PART I
    
    0. Rename `JoinIntegrationTest` to `StreamStreamJoinIntegrationTest`, which 
is only for KStream-KStream joins.
    1. Extract the `AbstractJoinIntegrationTest` which is going to be used for 
all the join integration test classes, parameterized with and without caching.
    2. Merge `KStreamRepartitionJoinTest.java` into 
`StreamStreamJoinIntegrationTest.java` with augmented stream-stream join.
    3. Add `TableTableJoinIntegrationTest` with detailed per-step expected 
results and removed `KTableKTableJoinIntegrationTest`.
    
    Findings of the integration test:
    
    1. Confirmed KAFKA-4309 with caching turned on.
    2. Found bug KAFKA-6398.
    3. Found bug KAFKA-6443.
    4. Found a bug that in CachingKeyValueStore, we would flush before putting 
the record into the underlying store, when the store is going to be used in the 
downstream processors with flushing it would result in incorrect results, fixed 
the issue along with this PR.
    5. Consider a new optimization described in KAFKA-6286.
    
    Future works including stream-table joins will be in other PRs.
    
    Author: Guozhang Wang <wangg...@gmail.com>
    
    Reviewers: Damian Guy <damian....@gmail.com>, Bill Bejeck 
<b...@confluent.io>
    
    Closes #4331 from guozhangwang/KMinor-join-integration-tests
---
 .../state/internals/CachingKeyValueStore.java      |   6 +-
 .../integration/AbstractJoinIntegrationTest.java   | 292 +++++++++++
 .../streams/integration/JoinIntegrationTest.java   | 400 ---------------
 .../integration/KStreamRepartitionJoinTest.java    | 386 ---------------
 .../KTableKTableJoinIntegrationTest.java           | 399 ---------------
 .../StreamStreamJoinIntegrationTest.java           | 262 ++++++++++
 .../integration/TableTableJoinIntegrationTest.java | 535 +++++++++++++++++++++
 .../java/org/apache/kafka/test/MockMapper.java     |  12 +
 8 files changed, 1104 insertions(+), 1188 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 9fff8cc..f9ab3f1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -91,14 +91,14 @@ class CachingKeyValueStore<K, V> extends 
WrappedStateStore.AbstractStateStore im
         try {
             context.setRecordContext(entry.recordContext());
             if (flushListener != null) {
-
                 final V oldValue = sendOldValues ? 
serdes.valueFrom(underlying.get(entry.key())) : null;
+                underlying.put(entry.key(), entry.newValue());
                 flushListener.apply(serdes.keyFrom(entry.key().get()),
                                     serdes.valueFrom(entry.newValue()),
                                     oldValue);
-
+            } else {
+                underlying.put(entry.key(), entry.newValue());
             }
-            underlying.put(entry.key(), entry.newValue());
         } finally {
             context.setRecordContext(current);
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
new file mode 100644
index 0000000..16d2611
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public abstract class AbstractJoinIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @Rule
+    public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+    @Parameterized.Parameters(name = "caching enabled = {0}")
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+        for (boolean cacheEnabled : Arrays.asList(true, false))
+            values.add(new Object[] {cacheEnabled});
+        return values;
+    }
+
+    static String appID;
+
+    private static final Long COMMIT_INTERVAL = 100L;
+    static final Properties STREAMS_CONFIG = new Properties();
+    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
+    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
+    static final String OUTPUT_TOPIC = "outputTopic";
+    private final long anyUniqueKey = 0L;
+
+    private final static Properties PRODUCER_CONFIG = new Properties();
+    private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+
+    private KafkaProducer<Long, String> producer;
+    private KafkaStreams streams;
+
+    StreamsBuilder builder;
+    int numRecordsExpected = 0;
+    AtomicBoolean finalResultReached = new AtomicBoolean(false);
+
+    private final List<Input<String>> input = Arrays.asList(
+            new Input<>(INPUT_TOPIC_LEFT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, "A"),
+            new Input<>(INPUT_TOPIC_RIGHT, "a"),
+            new Input<>(INPUT_TOPIC_LEFT, "B"),
+            new Input<>(INPUT_TOPIC_RIGHT, "b"),
+            new Input<>(INPUT_TOPIC_LEFT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, "C"),
+            new Input<>(INPUT_TOPIC_RIGHT, "c"),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, "d"),
+            new Input<>(INPUT_TOPIC_LEFT, "D")
+    );
+
+    final ValueJoiner<String, String, String> valueJoiner = new 
ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(final String value1, final String value2) {
+            return value1 + "-" + value2;
+        }
+    };
+
+    final boolean cacheEnabled;
+
+    AbstractJoinIntegrationTest(boolean cacheEnabled) {
+        this.cacheEnabled = cacheEnabled;
+    }
+
+    @BeforeClass
+    public static void setupConfigsAndUtils() {
+        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, appID + 
"-result-consumer");
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
+        
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+
+        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL);
+    }
+
+    void prepareEnvironment() throws InterruptedException {
+        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, 
OUTPUT_TOPIC);
+
+        if (!cacheEnabled)
+            STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
0);
+
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+
+        producer = new KafkaProducer<>(PRODUCER_CONFIG);
+    }
+
+    @After
+    public void cleanup() throws InterruptedException {
+        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, 
INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+    }
+
+    private void checkResult(final String outputTopic, final List<String> 
expectedResult) throws InterruptedException {
+        final List<String> result = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, 
outputTopic, expectedResult.size(), 30 * 1000L);
+        assertThat(result, is(expectedResult));
+    }
+
+    private void checkResult(final String outputTopic, final String 
expectedFinalResult, final int expectedTotalNumRecords) throws 
InterruptedException {
+        final List<String> result = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, 
outputTopic, expectedTotalNumRecords, 30 * 1000L);
+        assertThat(result.get(result.size() - 1), is(expectedFinalResult));
+    }
+
+    /*
+     * Runs the actual test. Checks the result after each input record to 
ensure fixed processing order.
+     * If an input tuple does not trigger any result, "expectedResult" should 
contain a "null" entry
+     */
+    void runTest(final List<List<String>> expectedResult) throws Exception {
+        runTest(expectedResult, null);
+    }
+
+
+    /*
+     * Runs the actual test. Checks the result after each input record to 
ensure fixed processing order.
+     * If an input tuple does not trigger any result, "expectedResult" should 
contain a "null" entry
+     */
+    void runTest(final List<List<String>> expectedResult, final String 
storeName) throws Exception {
+        assert expectedResult.size() == input.size();
+
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+        streams = new KafkaStreams(builder.build(), new 
StreamsConfig(STREAMS_CONFIG));
+
+        String expectedFinalResult = null;
+
+        try {
+            streams.start();
+
+            long ts = System.currentTimeMillis();
+
+            final Iterator<List<String>> resultIterator = 
expectedResult.iterator();
+            for (final Input<String> singleInput : input) {
+                producer.send(new ProducerRecord<>(singleInput.topic, null, 
++ts, singleInput.record.key, singleInput.record.value)).get();
+
+                List<String> expected = resultIterator.next();
+
+                if (expected != null) {
+                    checkResult(OUTPUT_TOPIC, expected);
+                    expectedFinalResult = expected.get(expected.size() - 1);
+                }
+            }
+
+            if (storeName != null) {
+                checkQueryableStore(storeName, expectedFinalResult);
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    /*
+     * Runs the actual test. Checks the final result only after expected 
number of records have been consumed.
+     */
+    void runTest(final String expectedFinalResult) throws Exception {
+        runTest(expectedFinalResult, null);
+    }
+
+    /*
+     * Runs the actual test. Checks the final result only after expected 
number of records have been consumed.
+     */
+    void runTest(final String expectedFinalResult, final String storeName) 
throws Exception {
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+        streams = new KafkaStreams(builder.build(), new 
StreamsConfig(STREAMS_CONFIG));
+
+        try {
+            streams.start();
+
+            long ts = System.currentTimeMillis();
+
+            for (final Input<String> singleInput : input) {
+                producer.send(new ProducerRecord<>(singleInput.topic, null, 
++ts, singleInput.record.key, singleInput.record.value)).get();
+            }
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return finalResultReached.get();
+                }
+            }, "Never received expected final result.");
+
+            checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);
+
+            if (storeName != null) {
+                checkQueryableStore(storeName, expectedFinalResult);
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    /*
+     * Checks the embedded queryable state store snapshot
+     */
+    private void checkQueryableStore(final String queryableName, final String 
expectedFinalResult) {
+        final ReadOnlyKeyValueStore<Long, String> store = 
streams.store(queryableName, QueryableStoreTypes.<Long, String>keyValueStore());
+
+        final KeyValueIterator<Long, String> all = store.all();
+        final KeyValue<Long, String> onlyEntry = all.next();
+
+        try {
+            assertThat(onlyEntry.key, is(anyUniqueKey));
+            assertThat(onlyEntry.value, is(expectedFinalResult));
+            assertThat(all.hasNext(), is(false));
+        } finally {
+            all.close();
+        }
+    }
+
+    private final class Input<V> {
+        String topic;
+        KeyValue<Long, V> record;
+
+        Input(final String topic, final V value) {
+            this.topic = topic;
+            record = KeyValue.pair(anyUniqueKey, value);
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
deleted file mode 100644
index faa581b..0000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-/**
- * Tests all available joins of Kafka Streams DSL.
- */
-@Category({IntegrationTest.class})
-public class JoinIntegrationTest {
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
-
-    private static final String APP_ID = "join-integration-test";
-    private static final String INPUT_TOPIC_1 = "inputTopicLeft";
-    private static final String INPUT_TOPIC_2 = "inputTopicRight";
-    private static final String OUTPUT_TOPIC = "outputTopic";
-
-    private final static Properties PRODUCER_CONFIG = new Properties();
-    private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
-    private final static Properties STREAMS_CONFIG = new Properties();
-
-    private StreamsBuilder builder;
-    private KStream<Long, String> leftStream;
-    private KStream<Long, String> rightStream;
-    private KTable<Long, String> leftTable;
-    private KTable<Long, String> rightTable;
-
-    private final List<Input<String>> input = Arrays.asList(
-        new Input<>(INPUT_TOPIC_1, (String) null),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_1, "A"),
-        new Input<>(INPUT_TOPIC_2, "a"),
-        new Input<>(INPUT_TOPIC_1, "B"),
-        new Input<>(INPUT_TOPIC_2, "b"),
-        new Input<>(INPUT_TOPIC_1, (String) null),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_1, "C"),
-        new Input<>(INPUT_TOPIC_2, "c"),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_1, (String) null),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_2, "d"),
-        new Input<>(INPUT_TOPIC_1, "D")
-    );
-
-    private final ValueJoiner<String, String, String> valueJoiner = new 
ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(final String value1, final String value2) {
-            return value1 + "-" + value2;
-        }
-    };
-
-    @BeforeClass
-    public static void setupConfigsAndUtils() {
-        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
-        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
-        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);
-        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + 
"-result-consumer");
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
-        
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
-        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
-        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-    }
-
-    @Before
-    public void prepareTopology() throws InterruptedException {
-        CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
-
-        builder = new StreamsBuilder();
-        leftTable = builder.table(INPUT_TOPIC_1);
-        rightTable = builder.table(INPUT_TOPIC_2);
-        leftStream = leftTable.toStream();
-        rightStream = rightTable.toStream();
-    }
-
-    @After
-    public void cleanup() throws InterruptedException {
-        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, 
OUTPUT_TOPIC);
-    }
-
-    private void checkResult(final String outputTopic, final List<String> 
expectedResult) throws InterruptedException {
-        if (expectedResult != null) {
-            final List<String> result = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, 
outputTopic, expectedResult.size(), 30 * 1000L);
-            assertThat(result, is(expectedResult));
-        }
-    }
-
-    /*
-     * Runs the actual test. Checks the result after each input record to 
ensure fixed processing order.
-     * If an input tuple does not trigger any result, "expectedResult" should 
contain a "null" entry
-     */
-    private void runTest(final List<List<String>> expectedResult) throws 
Exception {
-        assert expectedResult.size() == input.size();
-
-        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), 
STREAMS_CONFIG);
-        try {
-            streams.start();
-
-            long ts = System.currentTimeMillis();
-
-            final Iterator<List<String>> resultIterator = 
expectedResult.iterator();
-            for (final Input<String> singleInput : input) {
-                
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic,
 Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts);
-                checkResult(OUTPUT_TOPIC, resultIterator.next());
-            }
-        } finally {
-            streams.close();
-        }
-    }
-
-    @Test
-    public void testInnerKStreamKStream() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-KStream-KStream");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            null,
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
-            null,
-            null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
-            null,
-            null,
-            null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
-        );
-
-        leftStream.join(rightStream, valueJoiner, 
JoinWindows.of(10000)).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testLeftKStreamKStream() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left-KStream-KStream");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
-            null,
-            null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
-            null,
-            null,
-            null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
-        );
-
-        leftStream.leftJoin(rightStream, valueJoiner, 
JoinWindows.of(10000)).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testOuterKStreamKStream() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer-KStream-KStream");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
-            null,
-            null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
-            null,
-            null,
-            null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
-        );
-
-        leftStream.outerJoin(rightStream, valueJoiner, 
JoinWindows.of(10000)).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testInnerKStreamKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-KStream-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList("B-a"),
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testLeftKStreamKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left-KStream-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            null,
-            Collections.singletonList("B-a"),
-            null,
-            null,
-            null,
-            Collections.singletonList("C-null"),
-            null,
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testInnerKTableKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-inner-KTable-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            null,
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Collections.singletonList("B-b"),
-            Collections.singletonList((String) null),
-            null,
-            null,
-            Collections.singletonList("C-c"),
-            Collections.singletonList((String) null),
-            null,
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testLeftKTableKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-left-KTable-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Collections.singletonList("B-b"),
-            Collections.singletonList((String) null),
-            null,
-            Collections.singletonList("C-null"),
-            Collections.singletonList("C-c"),
-            Collections.singletonList("C-null"),
-            Collections.singletonList((String) null),
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testOuterKTableKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-outer-KTable-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Collections.singletonList("B-b"),
-            Collections.singletonList("null-b"),
-            Collections.singletonList((String) null),
-            Collections.singletonList("C-null"),
-            Collections.singletonList("C-c"),
-            Collections.singletonList("C-null"),
-            Collections.singletonList((String) null),
-            null,
-            Collections.singletonList("null-d"),
-            Collections.singletonList("D-d")
-        );
-
-        leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    private final class Input<V> {
-        String topic;
-        KeyValue<Long, V> record;
-
-        private final long anyUniqueKey = 0L;
-
-        Input(final String topic, final V value) {
-            this.topic = topic;
-            record = KeyValue.pair(anyUniqueKey, value);
-        }
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
deleted file mode 100644
index 32546de..0000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.Consumed;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.Joined;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-@Category({IntegrationTest.class})
-public class KStreamRepartitionJoinTest {
-
-    private static final int NUM_BROKERS = 1;
-    private static final long COMMIT_INTERVAL_MS = 300L;
-
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
-    public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = 
MockValueJoiner.instance(":");
-    private final MockTime mockTime = CLUSTER.time;
-    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.DAYS);
-
-    private StreamsBuilder builder;
-    private Properties streamsConfiguration;
-    private KStream<Long, Integer> streamOne;
-    private KStream<Integer, String> streamTwo;
-    private KStream<Integer, String> streamFour;
-    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> 
keyMapper;
-
-    private final List<String>
-        expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", 
"5:E");
-    private KafkaStreams kafkaStreams;
-    private String streamOneInput;
-    private String streamTwoInput;
-    private String streamFourInput;
-    private static volatile int testNo = 0;
-
-    @Before
-    public void before() throws InterruptedException {
-        testNo++;
-        String applicationId = "kstream-repartition-join-test-" + testNo;
-        builder = new StreamsBuilder();
-        createTopics();
-        streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
COMMIT_INTERVAL_MS);
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
-
-        streamOne = builder.stream(streamOneInput, 
Consumed.with(Serdes.Long(), Serdes.Integer()));
-        streamTwo = builder.stream(streamTwoInput, 
Consumed.with(Serdes.Integer(), Serdes.String()));
-        streamFour = builder.stream(streamFourInput, 
Consumed.with(Serdes.Integer(), Serdes.String()));
-
-        keyMapper = MockMapper.selectValueKeyValueMapper();
-    }
-
-    @After
-    public void whenShuttingDown() throws IOException {
-        if (kafkaStreams != null) {
-            kafkaStreams.close();
-        }
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-    }
-
-    @Test
-    public void shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache() 
throws Exception {
-        verifyRepartitionOnJoinOperations(0);
-    }
-
-    @Test
-    public void 
shouldCorrectlyRepartitionOnJoinOperationsWithNonZeroSizedCache() throws 
Exception {
-        verifyRepartitionOnJoinOperations(10 * 1024 * 1024);
-    }
-
-    private void verifyRepartitionOnJoinOperations(final int cacheSizeBytes) 
throws Exception {
-        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
cacheSizeBytes);
-        produceMessages();
-        final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
-        final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();
-        final ExpectedOutputOnTopic mapMapJoin = mapMapJoin();
-        final ExpectedOutputOnTopic selectKeyJoin = selectKeyAndJoin();
-        final ExpectedOutputOnTopic flatMapJoin = flatMapJoin();
-        final ExpectedOutputOnTopic mapRhs = joinMappedRhsStream();
-        final ExpectedOutputOnTopic mapJoinJoin = 
joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined();
-        final ExpectedOutputOnTopic leftJoin = mapBothStreamsAndLeftJoin();
-
-        startStreams();
-
-        verifyCorrectOutput(mapOne);
-        verifyCorrectOutput(mapBoth);
-        verifyCorrectOutput(mapMapJoin);
-        verifyCorrectOutput(selectKeyJoin);
-        verifyCorrectOutput(flatMapJoin);
-        verifyCorrectOutput(mapRhs);
-        verifyCorrectOutput(mapJoinJoin);
-        verifyCorrectOutput(leftJoin);
-    }
-
-    private ExpectedOutputOnTopic mapStreamOneAndJoin() throws 
InterruptedException {
-        String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
-        doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
mapOneStreamAndJoinOutput);
-    }
-
-    private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws 
InterruptedException {
-        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-        final KStream<Integer, String> map2 = 
streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper());
-
-        doJoin(map1, map2, "map-both-streams-and-join-" + testNo);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
"map-both-streams-and-join-" + testNo);
-    }
-
-    private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException {
-        final KStream<Integer, Integer> mapMapStream = streamOne.map(
-            new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
-                @Override
-                public KeyValue<Long, Integer> apply(final Long key, final 
Integer value) {
-                    if (value == null) {
-                        return new KeyValue<>(null, null);
-                    }
-                    return new KeyValue<>(key + value, value);
-                }
-            }).map(keyMapper);
-
-        final String outputTopic = "map-map-join-" + testNo;
-        doJoin(mapMapStream, streamTwo, outputTopic);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
outputTopic);
-    }
-
-    private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception {
-
-        final KStream<Integer, Integer> keySelected =
-            streamOne.selectKey(MockMapper.<Long, Integer>selectValueMapper());
-
-        final String outputTopic = "select-key-join-" + testNo;
-        doJoin(keySelected, streamTwo, outputTopic);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
outputTopic);
-    }
-
-    private ExpectedOutputOnTopic flatMapJoin() throws InterruptedException {
-        final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
-            new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, 
Integer>>>() {
-                @Override
-                public Iterable<KeyValue<Integer, Integer>> apply(final Long 
key, final Integer value) {
-                    return Collections.singletonList(new KeyValue<>(value, 
value));
-                }
-            });
-
-        final String outputTopic = "flat-map-join-" + testNo;
-        doJoin(flatMapped, streamTwo, outputTopic);
-
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
outputTopic);
-    }
-
-    private ExpectedOutputOnTopic joinMappedRhsStream() throws 
InterruptedException {
-
-        final String output = "join-rhs-stream-mapped-" + testNo;
-        CLUSTER.createTopic(output);
-        streamTwo
-            .join(streamOne.map(keyMapper),
-                TOSTRING_JOINER,
-                getJoinWindow(),
-                Joined.with(Serdes.Integer(), Serdes.String(), 
Serdes.Integer()))
-            .to(Serdes.Integer(), Serdes.String(), output);
-
-        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", 
"D:4", "E:5"), output);
-    }
-
-    private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws 
InterruptedException {
-        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-
-        final KStream<Integer, String> map2 = 
streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper());
-
-
-        final String outputTopic = "left-join-" + testNo;
-        CLUSTER.createTopic(outputTopic);
-        map1.leftJoin(map2,
-            TOSTRING_JOINER,
-            getJoinWindow(),
-            Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String()))
-            .filterNot(new Predicate<Integer, String>() {
-                @Override
-                public boolean test(Integer key, String value) {
-                    // filter not left-only join results
-                    return value.substring(2).equals("null");
-                }
-            })
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
-
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, 
outputTopic);
-    }
-
-    private ExpectedOutputOnTopic 
joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws 
InterruptedException {
-        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-
-        final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
-            kvMapper = MockMapper.noOpKeyValueMapper();
-
-        final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
-
-        final KStream<Integer, String> join = map1.join(map2,
-            TOSTRING_JOINER,
-            getJoinWindow(),
-            Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String()));
-
-        final String topic = "map-join-join-" + testNo;
-        CLUSTER.createTopic(topic);
-        join.map(kvMapper)
-            .join(streamFour.map(kvMapper),
-                TOSTRING_JOINER,
-                getJoinWindow(),
-                Joined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()))
-            .to(Serdes.Integer(), Serdes.String(), topic);
-
-
-        return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", 
"3:C:C", "4:D:D", "5:E:E"), topic);
-    }
-
-    private JoinWindows getJoinWindow() {
-        return JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
-    }
-
-
-    private class ExpectedOutputOnTopic {
-        private final List<String> expectedOutput;
-        private final String outputTopic;
-
-        ExpectedOutputOnTopic(final List<String> expectedOutput, final String 
outputTopic) {
-            this.expectedOutput = expectedOutput;
-            this.outputTopic = outputTopic;
-        }
-    }
-
-    private void verifyCorrectOutput(final ExpectedOutputOnTopic 
expectedOutputOnTopic)
-        throws InterruptedException {
-        assertThat(receiveMessages(new StringDeserializer(),
-            expectedOutputOnTopic.expectedOutput.size(),
-            expectedOutputOnTopic.outputTopic),
-            is(expectedOutputOnTopic.expectedOutput));
-    }
-
-    private void produceMessages() throws Exception {
-        produceToStreamOne();
-        produceStreamTwoInputTo(streamTwoInput);
-        produceStreamTwoInputTo(streamFourInput);
-
-    }
-
-    private void produceStreamTwoInputTo(final String streamTwoInput) throws 
Exception {
-        IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamTwoInput,
-            Arrays.asList(
-                new KeyValue<>(1, "A"),
-                new KeyValue<>(2, "B"),
-                new KeyValue<>(3, "C"),
-                new KeyValue<>(4, "D"),
-                new KeyValue<>(5, "E")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            mockTime);
-    }
-
-    private void produceToStreamOne() throws Exception {
-        IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamOneInput,
-            Arrays.asList(
-                new KeyValue<>(10L, 1),
-                new KeyValue<>(5L, 2),
-                new KeyValue<>(12L, 3),
-                new KeyValue<>(15L, 4),
-                new KeyValue<>(20L, 5),
-                new KeyValue<Long, Integer>(70L, null)), // nulls should be 
filtered
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                LongSerializer.class,
-                IntegerSerializer.class,
-                new Properties()),
-            mockTime);
-    }
-
-    private void createTopics() throws InterruptedException {
-        streamOneInput = "stream-one-" + testNo;
-        streamTwoInput = "stream-two-" + testNo;
-        streamFourInput = "stream-four-" + testNo;
-        CLUSTER.createTopic(streamOneInput, 2, 1);
-        CLUSTER.createTopic(streamTwoInput, 2, 1);
-        CLUSTER.createTopic(streamFourInput, 2, 1);
-    }
-
-
-    private void startStreams() {
-        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
-    }
-
-
-    private List<String> receiveMessages(final Deserializer<?> 
valueDeserializer,
-                                         final int numMessages, final String 
topic) throws InterruptedException {
-
-        final Properties config = new Properties();
-
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
-        config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-            IntegerDeserializer.class.getName());
-        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-            valueDeserializer.getClass().getName());
-        final List<String> received = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
-            config,
-            topic,
-            numMessages,
-            60 * 1000);
-        Collections.sort(received);
-
-        return received;
-    }
-
-    private void doJoin(final KStream<Integer, Integer> lhs,
-                        final KStream<Integer, String> rhs,
-                        final String outputTopic) throws InterruptedException {
-        CLUSTER.createTopic(outputTopic);
-        lhs.join(rhs,
-                 TOSTRING_JOINER,
-                 getJoinWindow(),
-                 Joined.with(Serdes.Integer(), Serdes.Integer(), 
Serdes.String()))
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
-    }
-
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
deleted file mode 100644
index a12ffac..0000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-@Category({IntegrationTest.class})
-public class KTableKTableJoinIntegrationTest {
-    private final static int NUM_BROKERS = 1;
-
-    @ClassRule
-    public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
-    private final static MockTime MOCK_TIME = CLUSTER.time;
-    private final static String TABLE_1 = "table1";
-    private final static String TABLE_2 = "table2";
-    private final static String TABLE_3 = "table3";
-    private final static String OUTPUT = "output-";
-    private static Properties streamsConfig;
-    private KafkaStreams streams;
-    private final static Properties CONSUMER_CONFIG = new Properties();
-
-    @BeforeClass
-    public static void beforeTest() throws Exception {
-        CLUSTER.createTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT);
-
-        streamsConfig = new Properties();
-        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
-        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-
-        final Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-
-        final List<KeyValue<String, String>> table1 = Arrays.asList(
-            new KeyValue<>("a", "A1"),
-            new KeyValue<>("b", "B1")
-        );
-
-        final List<KeyValue<String, String>> table2 = Arrays.asList(
-            new KeyValue<>("b", "B2"),
-            new KeyValue<>("c", "C2")
-        );
-
-        final List<KeyValue<String, String>> table3 = Arrays.asList(
-            new KeyValue<>("a", "A3"),
-            new KeyValue<>("b", "B3"),
-            new KeyValue<>("c", "C3")
-        );
-
-        // put table 3 first, to make sure data is there when joining T1 with 
T2
-        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, 
producerConfig, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, 
producerConfig, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, 
producerConfig, MOCK_TIME);
-
-        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, 
"ktable-ktable-consumer");
-        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-    }
-
-    @Before
-    public void before() throws IOException {
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
-    }
-
-    @After
-    public void after() throws IOException {
-        if (streams != null) {
-            streams.close();
-            streams = null;
-        }
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
-    }
-
-    private enum JoinType {
-        INNER, LEFT, OUTER
-    }
-
-
-    @Test
-    public void shouldInnerInnerJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, 
Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldInnerInnerJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, 
Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldInnerLeftJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, 
Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldInnerLeftJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, 
Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldInnerOuterJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldInnerOuterJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
-            new KeyValue<>("a", "null-A3"),
-            new KeyValue<>("b", "null-B3"),
-            new KeyValue<>("c", "null-C3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldLeftInnerJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldLeftInnerJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldLeftLeftJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldLeftLeftJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldLeftOuterJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldLeftOuterJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
-            new KeyValue<>("a", "null-A3"),
-            new KeyValue<>("b", "null-B3"),
-            new KeyValue<>("c", "null-C3"),
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldOuterInnerJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")), false);
-    }
-
-    @Test
-    public void shouldOuterInnerJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3"),
-            new KeyValue<>("c", "null-C2-C3")), true);
-    }
-
-    @Test
-    public void shouldOuterLeftJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")), false);
-    }
-
-    @Test
-    public void shouldOuterLeftJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3"),
-            new KeyValue<>("c", "null-C2-C3")), true);
-    }
-
-    @Test
-    public void shouldOuterOuterJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")), false);
-    }
-
-    @Test
-    public void shouldOuterOuterJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
-            new KeyValue<>("a", "null-A3"),
-            new KeyValue<>("b", "null-B3"),
-            new KeyValue<>("c", "null-C3"),
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3"),
-            new KeyValue<>("c", "null-C2-C3")), true);
-    }
-
-
-    private void verifyKTableKTableJoin(final JoinType joinType1,
-                                        final JoinType joinType2,
-                                        final List<KeyValue<String, String>> 
expectedResult,
-                                        boolean verifyQueryableState) throws 
InterruptedException {
-        final String queryableName = verifyQueryableState ? joinType1 + "-" + 
joinType2 + "-ktable-ktable-join-query" : null;
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" 
+ joinType2 + "-ktable-ktable-join" + queryableName);
-
-        streams = prepareTopology(joinType1, joinType2, queryableName);
-        streams.start();
-
-        final List<KeyValue<String, String>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expectedResult.size());
-
-        assertThat(result, equalTo(expectedResult));
-
-        if (verifyQueryableState) {
-            verifyKTableKTableJoinQueryableState(joinType1, joinType2, 
expectedResult);
-        }
-    }
-
-    private void verifyKTableKTableJoinQueryableState(final JoinType joinType1,
-                                                      final JoinType joinType2,
-                                                      final 
List<KeyValue<String, String>> expectedResult) {
-        final String queryableName = joinType1 + "-" + joinType2 + 
"-ktable-ktable-join-query";
-        final ReadOnlyKeyValueStore<String, String> myJoinStore = 
streams.store(queryableName,
-            QueryableStoreTypes.<String, String>keyValueStore());
-
-        // store only keeps last set of values, not entire stream of value 
changes
-        final Map<String, String> expectedInStore = new HashMap<>();
-        for (KeyValue<String, String> expected : expectedResult) {
-            expectedInStore.put(expected.key, expected.value);
-        }
-
-        for (Map.Entry<String, String> expected : expectedInStore.entrySet()) {
-            assertEquals(expected.getValue(), 
myJoinStore.get(expected.getKey()));
-        }
-        final KeyValueIterator<String, String> all = myJoinStore.all();
-        while (all.hasNext()) {
-            KeyValue<String, String> storeEntry = all.next();
-            assertTrue(expectedResult.contains(storeEntry));
-        }
-        all.close();
-
-    }
-
-    private KafkaStreams prepareTopology(final JoinType joinType1, final 
JoinType joinType2, final String queryableName) {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KTable<String, String> table1 = builder.table(TABLE_1);
-        final KTable<String, String> table2 = builder.table(TABLE_2);
-        final KTable<String, String> table3 = builder.table(TABLE_3);
-
-        Materialized<String, String, KeyValueStore<Bytes, byte[]>> 
materialized = null;
-        if (queryableName != null) {
-            materialized = Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as(queryableName)
-                    .withKeySerde(Serdes.String())
-                    .withValueSerde(Serdes.String())
-                    .withCachingDisabled();
-        }
-        join(join(table1, table2, joinType1, null /* no need to query 
intermediate result */), table3,
-            joinType2, materialized).to(OUTPUT);
-
-        return new KafkaStreams(builder.build(), new 
StreamsConfig(streamsConfig));
-    }
-
-    private KTable<String, String> join(final KTable<String, String> first,
-                                        final KTable<String, String> second,
-                                        final JoinType joinType,
-                                        final Materialized<String, String, 
KeyValueStore<Bytes, byte[]>> materialized) {
-        final ValueJoiner<String, String, String> joiner = new 
ValueJoiner<String, String, String>() {
-            @Override
-            public String apply(final String value1, final String value2) {
-                return value1 + "-" + value2;
-            }
-        };
-
-
-        switch (joinType) {
-            case INNER:
-                if (materialized != null) {
-                    return first.join(second, joiner, materialized);
-                } else {
-                    return first.join(second, joiner);
-                }
-            case LEFT:
-                if (materialized != null) {
-                    return first.leftJoin(second, joiner, materialized);
-                } else {
-                    return first.leftJoin(second, joiner);
-                }
-            case OUTER:
-                if (materialized != null) {
-                    return first.outerJoin(second, joiner, materialized);
-                } else {
-                    return first.outerJoin(second, joiner);
-                }
-        }
-
-        throw new RuntimeException("Unknown join type.");
-    }
-
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
new file mode 100644
index 0000000..571dc05
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockMapper;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamStreamJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
+    private KStream<Long, String> leftStream;
+    private KStream<Long, String> rightStream;
+
+    public StreamStreamJoinIntegrationTest(boolean cacheEnabled) {
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+
+        appID = "stream-stream-join-integration-test";
+
+        builder = new StreamsBuilder();
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightStream = builder.stream(INPUT_TOPIC_RIGHT);
+    }
+
+    @Test
+    public void testInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.join(rightStream, valueJoiner, 
JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testInnerRepartitioned() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-repartitioned");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Collections.singletonList("A-a"),
+                Collections.singletonList("B-a"),
+                Arrays.asList("A-b", "B-b"),
+                null,
+                null,
+                Arrays.asList("C-a", "C-b"),
+                Arrays.asList("A-c", "B-c", "C-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d", "B-d", "C-d"),
+                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
+                .join(rightStream.flatMap(MockMapper.<Long, 
String>noOpFlatKeyValueMapper())
+                                 .selectKey(MockMapper.<Long, 
String>selectKeyKeyValueMapper()),
+                       valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.leftJoin(rightStream, valueJoiner, 
JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testLeftRepartitioned() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left-repartitioned");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                Collections.singletonList("A-null"),
+                Collections.singletonList("A-a"),
+                Collections.singletonList("B-a"),
+                Arrays.asList("A-b", "B-b"),
+                null,
+                null,
+                Arrays.asList("C-a", "C-b"),
+                Arrays.asList("A-c", "B-c", "C-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d", "B-d", "C-d"),
+                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
+                .leftJoin(rightStream.flatMap(MockMapper.<Long, 
String>noOpFlatKeyValueMapper())
+                                     .selectKey(MockMapper.<Long, 
String>selectKeyKeyValueMapper()),
+                        valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-outer");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.outerJoin(rightStream, valueJoiner, 
JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testOuterRepartitioned() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-outer");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                Collections.singletonList("A-null"),
+                Collections.singletonList("A-a"),
+                Collections.singletonList("B-a"),
+                Arrays.asList("A-b", "B-b"),
+                null,
+                null,
+                Arrays.asList("C-a", "C-b"),
+                Arrays.asList("A-c", "B-c", "C-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d", "B-d", "C-d"),
+                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
+                .outerJoin(rightStream.flatMap(MockMapper.<Long, 
String>noOpFlatKeyValueMapper())
+                                .selectKey(MockMapper.<Long, 
String>selectKeyKeyValueMapper()),
+                        valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testMultiInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-multi-inner");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Collections.singletonList("A-a-a"),
+                Collections.singletonList("B-a-a"),
+                Arrays.asList("A-b-a", "B-b-a", "A-a-b", "B-a-b", "A-b-b", 
"B-b-b"),
+                null,
+                null,
+                Arrays.asList("C-a-a", "C-a-b", "C-b-a", "C-b-b"),
+                Arrays.asList("A-c-a", "A-c-b", "B-c-a", "B-c-b", "C-c-a", 
"C-c-b", "A-a-c", "B-a-c",
+                        "A-b-c", "B-b-c", "C-a-c", "C-b-c", "A-c-c", "B-c-c", 
"C-c-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d-a", "A-d-b", "A-d-c", "B-d-a", "B-d-b", 
"B-d-c", "C-d-a", "C-d-b", "C-d-c",
+                        "A-a-d", "B-a-d", "A-b-d", "B-b-d", "C-a-d", "C-b-d", 
"A-c-d", "B-c-d", "C-c-d",
+                        "A-d-d", "B-d-d", "C-d-d"),
+                Arrays.asList("D-a-a", "D-a-b", "D-a-c", "D-a-d", "D-b-a", 
"D-b-b", "D-b-c", "D-b-d", "D-c-a",
+                        "D-c-b", "D-c-c", "D-c-d", "D-d-a", "D-d-b", "D-d-c", 
"D-d-d")
+        );
+
+        leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000))
+                .join(rightStream, valueJoiner, 
JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
new file mode 100644
index 0000000..f3eceb0
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest 
{
+    private KTable<Long, String> leftTable;
+    private KTable<Long, String> rightTable;
+
+    public TableTableJoinIntegrationTest(boolean cacheEnabled) {
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+
+        appID = "table-table-join-integration-test";
+
+        builder = new StreamsBuilder();
+        leftTable = builder.table(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT);
+    }
+
+    final private String expectedFinalJoinResult = "D-d";
+    final private String expectedFinalMultiJoinResult = "D-d-d";
+    final private String storeName = appID + "-store";
+
+    private Materialized<Long, String, KeyValueStore<Bytes, byte[]>> 
materialized = Materialized.<Long, String, KeyValueStore<Bytes, 
byte[]>>as(storeName)
+            .withKeySerde(Serdes.Long())
+            .withValueSerde(Serdes.String())
+            .withCachingDisabled()
+            .withLoggingDisabled();
+
+    final private class CountingPeek implements ForeachAction<Long, String> {
+        final private String expected;
+
+        CountingPeek(final boolean multiJoin) {
+            this.expected = multiJoin ? expectedFinalMultiJoinResult : 
expectedFinalJoinResult;
+        }
+
+        @Override
+        public void apply(final Long key, final String value) {
+            numRecordsExpected++;
+            if (value.equals(expected)) {
+                boolean ret = finalResultReached.compareAndSet(false, true);
+
+                if (!ret) {
+                    // do nothing; it is possible that we will see multiple 
duplicates of final results due to KAFKA-4309
+                    // TODO: should be removed when KAFKA-4309 is fixed
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner, 
materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
+            runTest(expectedFinalJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("A-a"),
+                    Collections.singletonList("B-a"),
+                    Collections.singletonList("B-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Collections.singletonList("C-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
+            runTest(expectedFinalJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    Collections.singletonList("A-null"),
+                    Collections.singletonList("A-a"),
+                    Collections.singletonList("B-a"),
+                    Collections.singletonList("B-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList("C-c"),
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Collections.singletonList("D-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-outer");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
+            runTest(expectedFinalJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    Collections.singletonList("A-null"),
+                    Collections.singletonList("A-a"),
+                    Collections.singletonList("B-a"),
+                    Collections.singletonList("B-b"),
+                    Collections.singletonList("null-b"),
+                    Collections.singletonList((String) null),
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList("C-c"),
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Collections.singletonList("null-d"),
+                    Collections.singletonList("D-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner, 
materialized).toStream().to(OUTPUT_TOPIC);
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testInnerInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-inner");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            // FIXME: the duplicate below for all the multi-joins
+            //        are due to KAFKA-6443, should be updated once it is 
fixed.
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    null,
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream().to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testInnerLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-left");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testInnerOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-outer");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Arrays.asList((String) null, null),
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("null-d", "D-d-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream().to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeftInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-inner");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeftLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-left");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeftOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-outer");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("null-d", "D-d-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream().to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuterInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-inner");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b-b"),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("null-d-d", "null-d-d"),
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuterLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-left");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("null-d-d", "null-d-d"),
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuterOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner-outer");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b-b"),
+                    Arrays.asList((String) null, null),
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("null-d-d", "null-d-d", "D-d-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockMapper.java 
b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
index fec9522..5184199 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
@@ -20,6 +20,8 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
 
+import java.util.Collections;
+
 public class MockMapper {
 
     private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, 
V, KeyValue<K, V>> {
@@ -29,6 +31,13 @@ public class MockMapper {
         }
     }
 
+    private static class NoOpFlatKeyValueMapper<K, V> implements 
KeyValueMapper<K, V, Iterable<KeyValue<K, V>>> {
+        @Override
+        public Iterable<KeyValue<K, V>> apply(K key, V value) {
+            return Collections.singletonList(KeyValue.pair(key, value));
+        }
+    }
+
     private static class SelectValueKeyValueMapper<K, V> implements 
KeyValueMapper<K, V, KeyValue<V, V>> {
         @Override
         public KeyValue<V, V> apply(K key, V value) {
@@ -61,6 +70,9 @@ public class MockMapper {
         return new SelectKeyMapper<>();
     }
 
+    public static <K, V> KeyValueMapper<K, V, Iterable<KeyValue<K, V>>> 
noOpFlatKeyValueMapper() {
+        return new NoOpFlatKeyValueMapper<>();
+    }
 
     public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> 
noOpKeyValueMapper() {
         return new NoOpKeyValueMapper<>();

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Reply via email to