KAFKA-3561: Auto create through topic for KStream aggregation and join

guozhangwang enothereska mjsax miguno

If you get a chance can you please take a look at this. I've done the 
repartitioning in the join, but it results in 2 internal topics for each join. 
This seems like overkill as sometimes we wouldn't need to repartition at all, 
others just 1 topic, and then sometimes both, but I'm not sure how we can know 
that.

I'd also need to implement something similar for leftJoin, but again, i'd like 
to see if i'm heading down the right path or if anyone has any other bright 
ideas.

For reference - https://github.com/apache/kafka/pull/1453 - the previous PR

Thanks for taking the time and looking forward to getting some welcome advice 
:-)

Author: Damian Guy <[email protected]>
Author: Damian Guy <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #1472 from dguy/KAFKA-3561


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d9d1cb2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d9d1cb2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d9d1cb2

Branch: refs/heads/trunk
Commit: 7d9d1cb2355e33270703280ed6bb712033b03d26
Parents: 54ba228
Author: Damian Guy <[email protected]>
Authored: Thu Jun 16 11:56:32 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Thu Jun 16 11:56:32 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   2 +-
 .../java/org/apache/kafka/test/TestUtils.java   |  15 +
 .../examples/pageview/PageViewTypedDemo.java    |  12 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   3 +-
 .../examples/wordcount/WordCountDemo.java       |   3 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   2 +
 .../kafka/streams/kstream/KGroupedStream.java   | 119 ++++
 .../apache/kafka/streams/kstream/KStream.java   | 215 ++-----
 .../kafka/streams/kstream/KStreamBuilder.java   |   4 +-
 .../kstream/internals/KGroupedStreamImpl.java   | 180 ++++++
 .../kstream/internals/KGroupedTableImpl.java    |   4 +-
 .../kstream/internals/KStreamAggregate.java     |   8 +-
 .../streams/kstream/internals/KStreamImpl.java  | 477 ++++++++--------
 .../kstream/internals/KStreamKStreamJoin.java   |   8 +-
 .../kstream/internals/KStreamReduce.java        |   9 +-
 .../kstream/internals/KStreamWindowReduce.java  |   2 +-
 .../streams/kstream/internals/KTableImpl.java   |   2 +-
 .../streams/processor/PartitionGrouper.java     |   2 +-
 .../streams/processor/TopologyBuilder.java      |  56 +-
 .../processor/internals/RecordCollector.java    |   4 +-
 .../internals/StreamPartitionAssignor.java      |  59 +-
 .../processor/internals/StreamThread.java       |   2 +-
 .../InternalTopicIntegrationTest.java           |   9 +-
 .../integration/JoinIntegrationTest.java        |   5 +-
 .../KGroupedStreamIntegrationTest.java          | 472 ++++++++++++++++
 .../integration/KStreamRepartitionJoinTest.java | 565 +++++++++++++++++++
 .../integration/WordCountIntegrationTest.java   |  24 +-
 .../integration/utils/IntegrationTestUtils.java |  14 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |   6 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  16 -
 .../internals/KStreamWindowAggregateTest.java   |  28 +-
 .../streams/processor/TopologyBuilderTest.java  |  10 +-
 .../internals/StreamPartitionAssignorTest.java  |  61 +-
 .../streams/smoketest/SmokeTestClient.java      |  26 +-
 streams/src/test/resources/log4j.properties     |  21 +
 35 files changed, 1944 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5f52cce..9a099d0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -148,7 +148,7 @@
       <allow pkg="scala" />
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
-      <allow pkg="org.hamcrest.CoreMatchers" />
+      <allow pkg="org.hamcrest" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 742d14f..a818d53 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -27,8 +27,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -163,4 +165,17 @@ public class TestUtils {
         return memoryRecords.buffer();
     }
 
+    public static Properties producerConfig(final String bootstrapServers,
+                                            final Class keySerializer,
+                                            final Class valueSerializer,
+                                            final Properties additional) {
+        final Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        properties.put(ProducerConfig.ACKS_CONFIG, "all");
+        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializer);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializer);
+        properties.putAll(additional);
+        return properties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index e53b037..19391d8 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -131,9 +131,16 @@ public class PageViewTypedDemo {
         final Deserializer<RegionCount> regionCountDeserializer = new 
JsonPOJODeserializer<>();
         serdeProps.put("JsonPOJOClass", RegionCount.class);
         regionCountDeserializer.configure(serdeProps, false);
-
         final Serde<RegionCount> regionCountSerde = 
Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer);
 
+        final Serializer<PageViewByRegion> pageViewByRegionSerializer = new 
JsonPOJOSerializer<>();
+        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
+        pageViewByRegionSerializer.configure(serdeProps, false);
+        final Deserializer<PageViewByRegion> pageViewByRegionDeserializer = 
new JsonPOJODeserializer<>();
+        serdeProps.put("JsonPOJOClass", PageViewByRegion.class);
+        pageViewByRegionDeserializer.configure(serdeProps, false);
+        final Serde<PageViewByRegion> pageViewByRegionSerde = 
Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer);
+
         KStream<String, PageView> views = builder.stream(Serdes.String(), 
pageViewSerde, "streams-pageview-input");
 
         KTable<String, UserProfile> users = builder.table(Serdes.String(), 
userProfileSerde, "streams-userprofile-input");
@@ -160,7 +167,8 @@ public class PageViewTypedDemo {
                         return new KeyValue<>(viewRegion.region, viewRegion);
                     }
                 })
-                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 
60 * 1000L).advanceBy(1000), Serdes.String())
+                .groupByKey(Serdes.String(), pageViewByRegionSerde)
+                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 
1000L).advanceBy(1000))
                 // TODO: we can merge ths toStream().map(...) with a single 
toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, 
KeyValue<WindowedPageViewByRegion, RegionCount>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 8a0af6c..e9aa467 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -99,7 +99,8 @@ public class PageViewUntypedDemo {
                         return new 
KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
                     }
                 })
-                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 
60 * 1000L).advanceBy(1000), Serdes.String())
+                .groupByKey(Serdes.String(), jsonSerde)
+                .count(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 
1000L).advanceBy(1000))
                 // TODO: we can merge ths toStream().map(...) with a single 
toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, 
KeyValue<JsonNode, JsonNode>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 12395f9..bf1d8cb 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -72,7 +72,8 @@ public class WordCountDemo {
                         return new KeyValue<>(value, value);
                     }
                 })
-                .countByKey("Counts");
+                .groupByKey()
+                .count("Counts");
 
         // need to override value serde to Long type
         counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index f05b02c..6605335 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -134,6 +134,8 @@ public class KafkaStreams {
         // The application ID is a required config and hence should always 
have value
         String applicationId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
 
+        builder.setApplicationId(applicationId);
+
         String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + 
STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
new file mode 100644
index 0000000..25fdb3a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * {@link KGroupedStream} is an abstraction of a <i>grouped record stream</i> 
of key-value pairs
+ * usually grouped on a different key than the original stream key
+ *
+ * <p>
+ * It is an intermediate representation of a {@link KStream} before an
+ * aggregation is applied to the new partitions resulting in a new {@link 
KTable}.
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ *
+ * @see KStream
+ */
[email protected]
+public interface KGroupedStream<K, V> {
+
+
+    /**
+     * Combine values of this stream by the grouped key into a new instance of 
ever-updating
+     * {@link KTable}.
+     *
+     * @param reducer           the instance of {@link Reducer}
+     * @param name              the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and 
values that represent the latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(Reducer<V> reducer,
+                        String name);
+
+
+    /**
+     * Combine values of this stream by key on a window basis into a new 
instance of windowed {@link KTable}.
+     *
+     * @param reducer           the instance of {@link Reducer}
+     * @param windows           the specification of the aggregation {@link 
Windows}
+     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
+     *         where each table contains records with unmodified keys and 
values
+     *         that represent the latest (rolling) aggregate for each key 
within that window
+     */
+    <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+                                                     Windows<W> windows);
+
+    /**
+     * Aggregate values of this stream by key into a new instance of a {@link 
KTable}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param aggValueSerde aggregate value serdes for materializing the 
aggregated table,
+     *                      if not specified the default serdes defined in the 
configs will be used
+     * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that represents the latest (rolling) aggregate 
for each key
+     */
+    <T> KTable<K, T> aggregate(Initializer<T> initializer,
+                               Aggregator<K, V, T> aggregator,
+                               Serde<T> aggValueSerde,
+                               String name);
+
+    /**
+     * Aggregate values of this stream by key on a window basis into a new 
instance of windowed {@link KTable}.
+     *
+     * @param initializer   the instance of {@link Initializer}
+     * @param aggregator    the instance of {@link Aggregator}
+     * @param windows       the specification of the aggregation {@link 
Windows}
+     * @param aggValueSerde aggregate value serdes for materializing the 
aggregated table,
+     *                      if not specified the default serdes defined in the 
configs will be used
+     * @param <T>           the value type of the resulted {@link KTable}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
+     *         where each table contains records with unmodified keys and 
values with type {@code T}
+     *         that represent the latest (rolling) aggregate for each key 
within that window
+     */
+    <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> 
initializer,
+                                                           Aggregator<K, V, T> 
aggregator,
+                                                           Windows<W> windows,
+                                                           Serde<T> 
aggValueSerde);
+
+
+    /**
+     * Count number of records of this stream by key into a new instance of a 
{@link KTable}
+     *
+     * @param name  the name of the resulted {@link KTable}
+     *
+     * @return a {@link KTable} that contains records with unmodified keys and 
values that represent the latest (rolling) count (i.e., number of records) for 
each key
+     */
+    KTable<K, Long> count(String name);
+
+
+    /**
+     * Count number of records of this stream by key on a window basis into a 
new instance of windowed {@link KTable}.
+     *
+     * @param windows   the specification of the aggregation {@link Windows}
+     *
+     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
+     *         where each table contains records with unmodified keys and 
values
+     *         that represent the latest (rolling) count (i.e., number of 
records) for each key within that window
+     */
+    <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a1ecfa4..ae743b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -317,6 +317,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine element values of this stream with another {@link KStream}'s 
elements of the same key using windowed Inner Join.
+     * If a record key is null it will not included in the resulting {@link 
KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with 
this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -343,7 +344,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine element values of this stream with another {@link KStream}'s 
elements of the same key using windowed Inner Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it 
will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this 
stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -361,6 +362,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements 
of the same key using windowed Outer Join.
+     * If a record key is null it will not included in the resulting {@link 
KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with 
this stream
      * @param joiner            the instance of {@link ValueJoiner}
@@ -387,7 +389,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements 
of the same key using windowed Outer Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it 
will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this 
stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -405,12 +407,15 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements 
of the same key using windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link 
KStream}
      *
      * @param otherStream       the instance of {@link KStream} joined with 
this stream
      * @param joiner            the instance of {@link ValueJoiner}
      * @param windows           the specification of the {@link JoinWindows}
      * @param keySerde          key serdes for materializing the other stream,
      *                          if not specified the default serdes defined in 
the configs will be used
+     * @param thisValSerde    value serdes for materializing this stream,
+     *                          if not specified the default serdes defined in 
the configs will be used
      * @param otherValueSerde   value serdes for materializing the other 
stream,
      *                          if not specified the default serdes defined in 
the configs will be used
      * @param <V1>              the value type of the other stream
@@ -424,11 +429,12 @@ public interface KStream<K, V> {
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serde<K> keySerde,
+            Serde<V> thisValSerde,
             Serde<V1> otherValueSerde);
 
     /**
      * Combine values of this stream with another {@link KStream}'s elements 
of the same key using windowed Left Join
-     * with default serializers and deserializers.
+     * with default serializers and deserializers. If a record key is null it 
will not included in the resulting {@link KStream}
      *
      * @param otherStream   the instance of {@link KStream} joined with this 
stream
      * @param joiner        the instance of {@link ValueJoiner}
@@ -446,6 +452,7 @@ public interface KStream<K, V> {
 
     /**
      * Combine values of this stream with {@link KTable}'s elements of the 
same key using non-windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link 
KStream}
      *
      * @param table     the instance of {@link KTable} joined with this stream
      * @param joiner    the instance of {@link ValueJoiner}
@@ -458,182 +465,76 @@ public interface KStream<K, V> {
     <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table, ValueJoiner<V, V1, 
V2> joiner);
 
     /**
-     * Combine values of this stream by key on a window basis into a new 
instance of windowed {@link KTable}.
-     *
-     * @param reducer           the instance of {@link Reducer}
-     * @param windows           the specification of the aggregation {@link 
Windows}
-     * @param keySerde          key serdes for materializing the aggregated 
table,
-     *                          if not specified the default serdes defined in 
the configs will be used
-     * @param valueSerde        value serdes for materializing the aggregated 
table,
-     *                          if not specified the default serdes defined in 
the configs will be used
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
-     *         where each table contains records with unmodified keys and 
values
-     *         that represent the latest (rolling) aggregate for each key 
within that window
-     */
-    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
-                                                          Windows<W> windows,
-                                                          Serde<K> keySerde,
-                                                          Serde<V> valueSerde);
-
-    /**
-     * Combine values of this stream by key on a window basis into a new 
instance of windowed {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param reducer the instance of {@link Reducer}
-     * @param windows the specification of the aggregation {@link Windows}
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
-     *         where each table contains records with unmodified keys and 
values
-     *         that represent the latest (rolling) aggregate for each key 
within that window
-     */
-    <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, 
Windows<W> windows);
-
-    /**
-     * Combine values of this stream by key into a new instance of 
ever-updating {@link KTable}.
-     *
-     * @param reducer           the instance of {@link Reducer}
-     * @param keySerde          key serdes for materializing the aggregated 
table,
-     *                          if not specified the default serdes defined in 
the configs will be used
-     * @param valueSerde        value serdes for materializing the aggregated 
table,
-     *                          if not specified the default serdes defined in 
the configs will be used
-     * @param name              the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and 
values that represent the latest (rolling) aggregate for each key
-     */
-    KTable<K, V> reduceByKey(Reducer<V> reducer,
-                             Serde<K> keySerde,
-                             Serde<V> valueSerde,
-                             String name);
-
-    /**
-     * Combine values of this stream by key into a new instance of 
ever-updating {@link KTable} with default serializers and deserializers.
-     *
-     * @param reducer the instance of {@link Reducer}
-     * @param name    the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and 
values that represent the latest (rolling) aggregate for each key
-     */
-    KTable<K, V> reduceByKey(Reducer<V> reducer, String name);
-
-    /**
-     * Aggregate values of this stream by key on a window basis into a new 
instance of windowed {@link KTable}.
+     * Combine values of this stream with {@link KTable}'s elements of the 
same key using non-windowed Left Join.
+     * If a record key is null it will not included in the resulting {@link 
KStream}
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param windows       the specification of the aggregation {@link 
Windows}
-     * @param keySerde      key serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the 
configs will be used
-     * @param aggValueSerde aggregate value serdes for materializing the 
aggregated table,
+     * @param table         the instance of {@link KTable} joined with this 
stream
+     * @param valueJoiner   the instance of {@link ValueJoiner}
+     * @param keySerde      key serdes for materializing this stream.
+     *                      If not specified the default serdes defined in the 
configs will be used
+     * @param valSerde      value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the 
configs will be used
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param <V1>          the value type of the table
+     * @param <V2>          the value type of the new stream
      *
-     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
-     *         where each table contains records with unmodified keys and 
values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key 
within that window
+     * @return a {@link KStream} that contains join-records for each key and 
values computed by the given {@link ValueJoiner},
+     *         one for each matched record-pair with the same key and within 
the joining window intervals
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> 
initializer,
-                                                                Aggregator<K, 
V, T> aggregator,
-                                                                Windows<W> 
windows,
-                                                                Serde<K> 
keySerde,
-                                                                Serde<T> 
aggValueSerde);
-
+    <V1, V2> KStream<K, V2> leftJoin(KTable<K, V1> table,
+                                     ValueJoiner<V, V1, V2> valueJoiner,
+                                     Serde<K> keySerde,
+                                     Serde<V> valSerde);
     /**
-     * Aggregate values of this stream by key on a window basis into a new 
instance of windowed {@link KTable}
-     * with default serializers and deserializers.
+     * Group the records of this {@link KStream} using the provided {@link 
KeyValueMapper} and
+     * default serializers and deserializers. If a record key is null it will 
not included in
+     * the resulting {@link KGroupedStream}
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param aggregator    the instance of {@link Aggregator}
-     * @param windows       the specification of the aggregation {@link 
Windows}
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param selector      select the grouping key and value to be aggregated
+     * @param <K1>          the key type of the {@link KGroupedStream}
+     * @param <V1>          the value type of the {@link KGroupedStream}
      *
-     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
-     *         where each table contains records with unmodified keys and 
values with type {@code T}
-     *         that represent the latest (rolling) aggregate for each key 
within that window
+     * @return a {@link KGroupedStream} that contains the the grouped records 
of the original {@link KStream}
      */
-    <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> 
initializer,
-                                                                Aggregator<K, 
V, T> aggregator,
-                                                                Windows<W> 
windows);
+    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector);
 
     /**
-     * Aggregate values of this stream by key into a new instance of 
ever-updating {@link KTable}.
+     * Group the records of this {@link KStream} using the provided {@link 
KeyValueMapper}.
+     * If a record key is null it will not included in the resulting {@link 
KGroupedStream}
      *
-     * @param initializer   the class of {@link Initializer}
-     * @param aggregator    the class of {@link Aggregator}
-     * @param keySerde      key serdes for materializing the aggregated table,
+     * @param selector      select the grouping key and value to be aggregated
+     * @param keySerde      key serdes for materializing this stream,
      *                      if not specified the default serdes defined in the 
configs will be used
-     * @param aggValueSerde aggregate value serdes for materializing the 
aggregated table,
+     * @param valSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the 
configs will be used
-     * @param name          the name of the resulted {@link KTable}
-     * @param <T>           the value type of the resulted {@link KTable}
+     * @param <K1>          the key type of the {@link KGroupedStream}
+     * @param <V1>          the value type of the {@link KGroupedStream}
      *
-     * @return a {@link KTable} that contains records with unmodified keys and 
values (of different type) that represent the latest (rolling) aggregate for 
each key
+     * @return a {@link KGroupedStream} that contains the the grouped records 
of the original {@link KStream}
      */
-    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                    Aggregator<K, V, T> aggregator,
-                                    Serde<K> keySerde,
-                                    Serde<T> aggValueSerde,
-                                    String name);
+    <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> selector,
+                                            Serde<K1> keySerde,
+                                            Serde<V1> valSerde);
 
     /**
-     * Aggregate values of this stream by key into a new instance of 
ever-updating {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param initializer   the class of {@link Initializer}
-     * @param aggregator    the class of {@link Aggregator}
-     * @param name          the name of the resulted {@link KTable}
-     * @param <T>           the value type of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and 
values (of different type) that represent the latest (rolling) aggregate for 
each key
+     * Group the records with the same key into a {@link KGroupedStream} while 
preserving the
+     * original values. If a record key is null it will not included in the 
resulting
+     * {@link KGroupedStream}
+     * Default Serdes will be used
+     * @return a {@link KGroupedStream}
      */
-    <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                    Aggregator<K, V, T> aggregator,
-                                    String name);
+    KGroupedStream<K, V> groupByKey();
 
     /**
-     * Count number of records of this stream by key on a window basis into a 
new instance of windowed {@link KTable}.
-     *
-     * @param windows       the specification of the aggregation {@link 
Windows}
-     * @param keySerde      key serdes for materializing the counting table,
+     * Group the records with the same key into a {@link KGroupedStream} while 
preserving the
+     * original values. If a record key is null it will not included in the 
resulting
+     * {@link KGroupedStream}
+     * @param keySerde      key serdes for materializing this stream,
      *                      if not specified the default serdes defined in the 
configs will be used
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
-     *         where each table contains records with unmodified keys and 
values
-     *         that represent the latest (rolling) count (i.e., number of 
records) for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> 
windows, Serde<K> keySerde);
-
-    /**
-     * Count number of records of this stream by key on a window basis into a 
new instance of windowed {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param windows       the specification of the aggregation {@link 
Windows}
-     *
-     * @return a windowed {@link KTable} which can be treated as a list of 
{@code KTable}s
-     *         where each table contains records with unmodified keys and 
values
-     *         that represent the latest (rolling) count (i.e., number of 
records) for each key within that window
-     */
-    <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> 
windows);
-
-    /**
-     * Count number of records of this stream by key into a new instance of 
ever-updating {@link KTable}.
-     *
-     * @param keySerde      key serdes for materializing the counting table,
+     * @param valSerde    value serdes for materializing this stream,
      *                      if not specified the default serdes defined in the 
configs will be used
-     * @param name          the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and 
values that represent the latest (rolling) count (i.e., number of records) for 
each key
-     */
-    KTable<K, Long> countByKey(Serde<K> keySerde, String name);
-
-    /**
-     * Count number of records of this stream by key into a new instance of 
ever-updating {@link KTable}
-     * with default serializers and deserializers.
-     *
-     * @param name          the name of the resulted {@link KTable}
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and 
values that represent the latest (rolling) count (i.e., number of records) for 
each key
+     * @return a {@link KGroupedStream}
      */
-    KTable<K, Long> countByKey(String name);
+    KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
+                                    Serde<V> valSerde);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 53b2f4e..37d8921 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -89,7 +89,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         addSource(name, keySerde == null ? null : keySerde.deserializer(), 
valSerde == null ? null : valSerde.deserializer(), topics);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return new KStreamImpl<>(this, name, Collections.singleton(name), 
false);
     }
 
 
@@ -111,7 +111,7 @@ public class KStreamBuilder extends TopologyBuilder {
 
         addSource(name, keySerde == null ? null : keySerde.deserializer(), 
valSerde == null ? null : valSerde.deserializer(), topicPattern);
 
-        return new KStreamImpl<>(this, name, Collections.singleton(name));
+        return new KStreamImpl<>(this, name, Collections.singleton(name), 
false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
new file mode 100644
index 0000000..1830484
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements 
KGroupedStream<K, V> {
+
+    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
+    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+
+    private final Serde<K> keySerde;
+    private final Serde<V> valSerde;
+    private final boolean repartitionRequired;
+
+    public KGroupedStreamImpl(final KStreamBuilder topology,
+                              final String name,
+                              final Set<String> sourceNodes,
+                              final Serde<K> keySerde,
+                              final Serde<V> valSerde,
+                              final boolean repartitionRequired) {
+        super(topology, name, sourceNodes);
+        this.keySerde = keySerde;
+        this.valSerde = valSerde;
+        this.repartitionRequired = repartitionRequired;
+    }
+
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> reducer,
+                               final String name) {
+        return doAggregate(
+            new KStreamReduce<K, V>(name, reducer),
+            REDUCE_NAME,
+            keyValueStore(valSerde, name));
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer,
+                                                            Windows<W> 
windows) {
+        return (KTable<Windowed<K>, V>) doAggregate(
+            new KStreamWindowReduce<K, V, W>(windows, windows.name(), reducer),
+            REDUCE_NAME,
+            windowedStore(valSerde, windows)
+        );
+    }
+
+    @Override
+    public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
+                                      final Aggregator<K, V, T> aggregator,
+                                      final Serde<T> aggValueSerde,
+                                      final String name) {
+        return doAggregate(
+            new KStreamAggregate<>(name, initializer, aggregator),
+            AGGREGATE_NAME,
+            keyValueStore(aggValueSerde, name));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final 
Initializer<T> initializer,
+                                                                  final 
Aggregator<K, V, T> aggregator,
+                                                                  final 
Windows<W> windows,
+                                                                  final 
Serde<T> aggValueSerde) {
+        return (KTable<Windowed<K>, T>) doAggregate(
+            new KStreamWindowAggregate<>(windows, windows.name(), initializer, 
aggregator),
+            AGGREGATE_NAME,
+            windowedStore(aggValueSerde, windows)
+        );
+    }
+
+    @Override
+    public KTable<K, Long> count(final String name) {
+        return aggregate(new Initializer<Long>() {
+            @Override
+            public Long apply() {
+                return 0L;
+            }
+        }, new Aggregator<K, V, Long>() {
+            @Override
+            public Long apply(K aggKey, V value, Long aggregate) {
+                return aggregate + 1;
+            }
+        }, Serdes.Long(), name);
+    }
+
+    @Override
+    public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> 
windows) {
+        return aggregate(new Initializer<Long>() {
+            @Override
+            public Long apply() {
+                return 0L;
+            }
+        }, new Aggregator<K, V, Long>() {
+            @Override
+            public Long apply(K aggKey, V value, Long aggregate) {
+                return aggregate + 1;
+            }
+        }, windows, Serdes.Long());
+    }
+
+    private <T> StateStoreSupplier keyValueStore(final Serde<T> aggValueSerde, 
final String name) {
+        return storeFactory(aggValueSerde, name).build();
+    }
+
+
+    private <W extends Window, T> StateStoreSupplier windowedStore(final 
Serde<T> aggValSerde,
+                                                                   final 
Windows<W> windows) {
+        return storeFactory(aggValSerde, windows.name())
+            .windowed(windows.maintainMs(), windows.segments, false)
+            .build();
+
+    }
+
+    private <T> Stores.PersistentKeyValueFactory<K, T> storeFactory(final 
Serde<T> aggValueSerde,
+                                                                    final 
String name) {
+        return Stores.create(name)
+            .withKeys(keySerde)
+            .withValues(aggValueSerde)
+            .persistent();
+
+    }
+
+    private <T> KTable<K, T> doAggregate(
+        final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+        final String functionName,
+        final StateStoreSupplier storeSupplier) {
+
+        final String aggFunctionName = topology.newName(functionName);
+
+        final String sourceName = repartitionIfRequired();
+
+        topology.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
+        topology.addStateStore(storeSupplier, aggFunctionName);
+
+        return new KTableImpl<>(topology,
+                                aggFunctionName,
+                                aggregateSupplier,
+                                sourceName.equals(this.name) ? sourceNodes
+                                                             : 
Collections.singleton(sourceName));
+    }
+
+    /**
+     * @return the new sourceName if repartitioned. Otherwise the name of this 
stream
+     */
+    private String repartitionIfRequired() {
+        if (!repartitionRequired) {
+            return this.name;
+        }
+        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index f7fe4e5..7118bb9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -45,8 +45,6 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K> implements KGroup
 
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
-    private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
-
     protected final Serde<K> keySerde;
     protected final Serde<V> valSerde;
 
@@ -88,7 +86,7 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K> implements KGroup
         String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
         String funcName = topology.newName(functionName);
 
-        String topic = name + REPARTITION_TOPIC_SUFFIX;
+        String topic = name + KStreamImpl.REPARTITION_TOPIC_SUFFIX;
 
         Serializer<K> keySerializer = keySerde == null ? null : 
keySerde.serializer();
         Deserializer<K> keyDeserializer = keySerde == null ? null : 
keySerde.deserializer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index b6d1492..dc6410d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -61,14 +60,11 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V value) {
-            // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for KStream aggregate 
operator with state " + storeName + " should not be null.");
+                return;
 
             T oldAgg = store.get(key);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 91bcef9..79ff842 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -17,18 +17,16 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -36,9 +34,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -47,18 +42,17 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.lang.reflect.Array;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, 
V> {
 
-    private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
-
     private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
 
     private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
 
-    private static final String FILTER_NAME = "KSTREAM-FILTER-";
+    public static final String FILTER_NAME = "KSTREAM-FILTER-";
 
     private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
 
@@ -84,8 +78,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
     private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
 
-    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
-
     private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
 
     public static final String SINK_NAME = "KSTREAM-SINK-";
@@ -100,8 +92,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
     private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
 
-    public KStreamImpl(KStreamBuilder topology, String name, Set<String> 
sourceNodes) {
+    public static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
+
+    private final boolean repartitionRequired;
+
+    public KStreamImpl(KStreamBuilder topology, String name, Set<String> 
sourceNodes,
+                       boolean repartitionRequired) {
         super(topology, name, sourceNodes);
+        this.repartitionRequired = repartitionRequired;
     }
 
     @Override
@@ -110,7 +108,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, false), 
this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, 
this.repartitionRequired);
     }
 
     @Override
@@ -119,20 +117,24 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, true), 
this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, 
this.repartitionRequired);
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public <K1> KStream<K1, V> selectKey(final KeyValueMapper<K, V, K1> 
mapper) {
+        return new KStreamImpl<>(topology, internalSelectKey(mapper), 
sourceNodes, true);
+    }
+
+    private <K1> String internalSelectKey(final KeyValueMapper<K, V, K1> 
mapper) {
         String name = topology.newName(KEY_SELECT_NAME);
         topology.addProcessor(name, new KStreamMap<>(new KeyValueMapper<K, V, 
KeyValue<K1, V>>() {
             @Override
             public KeyValue<K1, V> apply(K key, V value) {
-                return new KeyValue(mapper.apply(key, value), value);
+                return new KeyValue<>(mapper.apply(key, value), value);
             }
         }), this.name);
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return name;
     }
 
     @Override
@@ -141,16 +143,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
+
     @Override
     public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
         String name = topology.newName(MAPVALUES_NAME);
 
         topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, 
this.repartitionRequired);
     }
 
     @Override
@@ -193,7 +196,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, sourceNodes, true);
     }
 
     @Override
@@ -202,7 +205,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), 
this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, 
this.repartitionRequired);
     }
 
     @Override
@@ -218,7 +221,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
             topology.addProcessor(childName, new KStreamPassThrough<K, V>(), 
branchName);
 
-            branchChildren[i] = new KStreamImpl<>(topology, childName, 
sourceNodes);
+            branchChildren[i] = new KStreamImpl<>(topology, childName, 
sourceNodes, this.repartitionRequired);
         }
 
         return branchChildren;
@@ -245,7 +248,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
         topology.addProcessor(name, new KStreamPassThrough<>(), parentNames);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes);
+        return new KStreamImpl<>(topology, name, allSourceNodes, false);
     }
 
     @Override
@@ -315,7 +318,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         topology.addProcessor(name, new 
KStreamTransform<>(transformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
 
-        return new KStreamImpl<>(topology, name, null);
+        return new KStreamImpl<>(topology, name, null, true);
     }
 
     @Override
@@ -325,7 +328,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         topology.addProcessor(name, new 
KStreamTransformValues<>(valueTransformerSupplier), this.name);
         topology.connectProcessorAndStateStores(name, stateStoreNames);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, 
this.repartitionRequired);
     }
 
     @Override
@@ -388,45 +391,87 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
             Serde<V1> otherValueSerde,
             boolean outer) {
 
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) 
other);
+        return doJoin(other,
+                      joiner,
+                      windows,
+                      keySerde,
+                      thisValueSerde,
+                      otherValueSerde,
+                      new DefaultJoin(outer));
+    }
+
+    private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other,
+                                         ValueJoiner<V, V1, R> joiner,
+                                         JoinWindows windows,
+                                         Serde<K> keySerde,
+                                         Serde<V> thisValueSerde,
+                                         Serde<V1> otherValueSerde,
+                                         KStreamImplJoin join) {
+        KStreamImpl<K, V> joinThis = this;
+        KStreamImpl<K, V1> joinOther = (KStreamImpl) other;
+
+        if (joinThis.repartitionRequired) {
+            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde);
+        }
 
-        StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
-                .withKeys(keySerde)
-                .withValues(thisValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
+        if (joinOther.repartitionRequired) {
+            joinOther = joinOther.repartitionForJoin(keySerde, 
otherValueSerde);
+        }
+
+        joinThis.ensureJoinableWith(joinOther);
 
-        StateStoreSupplier otherWindow = Stores.create(windows.name() + 
"-other")
-                .withKeys(keySerde)
-                .withValues(otherValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
+        return join.join(joinThis,
+                         joinOther,
+                         joiner,
+                         windows,
+                         keySerde,
+                         thisValueSerde,
+                         otherValueSerde);
+    }
 
-        KStreamJoinWindow<K, V> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, 
windows.maintainMs());
-        KStreamJoinWindow<K, V1> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, 
windows.maintainMs());
 
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new 
KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, 
outer);
-        KStreamKStreamJoin<K, R, V1, V> joinOther = new 
KStreamKStreamJoin<>(thisWindow.name(), windows.before, windows.after, 
reverseJoiner(joiner), outer);
+    /**
+     * Repartition a stream. This is required on join operations occurring 
after
+     * an operation that changes the key, i.e, selectKey, map(..), flatMap(..).
+     * @param keySerde      Serdes for serializing the keys
+     * @param valSerde      Serdes for serilaizing the values
+     * @return a new {@link KStreamImpl}
+     */
+    private KStreamImpl<K, V> repartitionForJoin(Serde<K> keySerde,
+                                                 Serde<V> valSerde) {
 
-        KStreamPassThrough<K, R> joinMerge = new KStreamPassThrough<>();
+        String repartitionedSourceName = createReparitionedSource(this, 
keySerde, valSerde);
+        return new KStreamImpl<>(topology, repartitionedSourceName, Collections
+            .singleton(repartitionedSourceName), false);
+    }
 
-        String thisWindowStreamName = topology.newName(WINDOWED_NAME);
-        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-        String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : 
topology.newName(JOINTHIS_NAME);
-        String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : 
topology.newName(JOINOTHER_NAME);
-        String joinMergeName = topology.newName(MERGE_NAME);
+    static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
+                                                    Serde<K1> keySerde,
+                                                    Serde<V1> valSerde) {
+        Serializer<K1> keySerializer = keySerde != null ? 
keySerde.serializer() : null;
+        Serializer<V1> valSerializer = valSerde != null ? 
valSerde.serializer() : null;
+        Deserializer<K1> keyDeserializer = keySerde != null ? 
keySerde.deserializer() : null;
+        Deserializer<V1> valDeserializer = valSerde != null ? 
valSerde.deserializer() : null;
 
-        topology.addProcessor(thisWindowStreamName, thisWindowedStream, 
this.name);
-        topology.addProcessor(otherWindowStreamName, otherWindowedStream, 
((KStreamImpl) other).name);
-        topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
-        topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
-        topology.addProcessor(joinMergeName, joinMerge, joinThisName, 
joinOtherName);
-        topology.addStateStore(thisWindow, thisWindowStreamName, 
otherWindowStreamName);
-        topology.addStateStore(otherWindow, thisWindowStreamName, 
otherWindowStreamName);
+        String repartitionTopic = stream.name + REPARTITION_TOPIC_SUFFIX;
+        String sinkName = stream.topology.newName(SINK_NAME);
+        String filterName = stream.topology.newName(FILTER_NAME);
+        String sourceName = stream.topology.newName(SOURCE_NAME);
 
-        return new KStreamImpl<>(topology, joinMergeName, allSourceNodes);
+        stream.topology.addInternalTopic(repartitionTopic);
+        stream.topology.addProcessor(filterName, new KStreamFilter<>(new 
Predicate<K1, V1>() {
+            @Override
+            public boolean test(final K1 key, final V1 value) {
+                return key != null;
+            }
+        }, false), stream.name);
+
+        stream.topology.addSink(sinkName, repartitionTopic, keySerializer,
+                         valSerializer, filterName);
+        stream.topology.addSource(sourceName, keyDeserializer, valDeserializer,
+                           repartitionTopic);
+
+        return sourceName;
     }
 
     @SuppressWarnings("unchecked")
@@ -436,28 +481,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows,
             Serde<K> keySerde,
+            Serde<V> thisValSerde,
             Serde<V1> otherValueSerde) {
 
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) 
other);
-
-        StateStoreSupplier otherWindow = Stores.create(windows.name() + 
"-other")
-                .withKeys(keySerde)
-                .withValues(otherValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, true)
-                .build();
-
-        KStreamJoinWindow<K, V1> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, 
windows.maintainMs());
-        KStreamKStreamJoin<K, R, V, V1> joinThis = new 
KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, 
true);
-
-        String otherWindowStreamName = topology.newName(WINDOWED_NAME);
-        String joinThisName = topology.newName(LEFTJOIN_NAME);
-
-        topology.addProcessor(otherWindowStreamName, otherWindowedStream, 
((KStreamImpl) other).name);
-        topology.addProcessor(joinThisName, joinThis, this.name);
-        topology.addStateStore(otherWindow, joinThisName, 
otherWindowStreamName);
-
-        return new KStreamImpl<>(topology, joinThisName, allSourceNodes);
+        return doJoin(other,
+                      joiner,
+                      windows,
+                      keySerde,
+                      thisValSerde,
+                      otherValueSerde,
+                      new LeftJoin());
     }
 
     @Override
@@ -466,193 +499,197 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K> implements KStream<K, V
             ValueJoiner<V, V1, R> joiner,
             JoinWindows windows) {
 
-        return leftJoin(other, joiner, windows, null, null);
+        return leftJoin(other, joiner, windows, null, null, null);
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, 
V1, R> joiner) {
-        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) 
other);
-
-        String name = topology.newName(LEFTJOIN_NAME);
-
-        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, 
?, V1>) other, joiner), this.name);
-        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) 
other).name);
+        return leftJoin(other, joiner, null, null);
 
-        return new KStreamImpl<>(topology, name, allSourceNodes);
     }
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> 
reducer,
-                                                                 Windows<W> 
windows,
-                                                                 Serde<K> 
keySerde,
-                                                                 Serde<V> 
aggValueSerde) {
+    public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other,
+                                          ValueJoiner<V, V1, R> joiner,
+                                          Serde<K> keySerde,
+                                          Serde<V> valueSerde) {
+
+        if (repartitionRequired) {
+            KStreamImpl<K, V> thisStreamRepartitioned = 
this.repartitionForJoin(keySerde,
+                                                                               
 valueSerde
+            );
+            return thisStreamRepartitioned.doStreamTableLeftJoin(other, 
joiner);
+        } else {
+            return doStreamTableLeftJoin(other, joiner);
+        }
 
-        String reduceName = topology.newName(REDUCE_NAME);
+    }
 
-        KStreamWindowReduce<K, V, W> reduceSupplier = new 
KStreamWindowReduce<>(windows, windows.name(), reducer);
+    private <V1, R> KStream<K, R> doStreamTableLeftJoin(final KTable<K, V1> 
other,
+                                                        final ValueJoiner<V, 
V1, R> joiner) {
+        Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) 
other);
 
-        StateStoreSupplier reduceStore = Stores.create(windows.name())
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, false)
-                .build();
+        String name = topology.newName(LEFTJOIN_NAME);
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, reduceSupplier, this.name);
-        topology.addStateStore(reduceStore, reduceName);
+        topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, 
?, V1>) other, joiner), this.name);
+        topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) 
other).name);
 
-        // return the KTable representation with the intermediate topic as the 
sources
-        return new KTableImpl<>(topology, reduceName, reduceSupplier, 
sourceNodes);
+        return new KStreamImpl<>(topology, name, allSourceNodes, false);
     }
 
     @Override
-    public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> 
reducer,
-                                                                 Windows<W> 
windows) {
-
-        return reduceByKey(reducer, windows, null, null);
+    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> 
selector) {
+        return groupBy(selector, null, null);
     }
 
     @Override
-    public KTable<K, V> reduceByKey(Reducer<V> reducer,
-                                    Serde<K> keySerde,
-                                    Serde<V> aggValueSerde,
-                                    String name) {
-
-        String reduceName = topology.newName(REDUCE_NAME);
-
-        KStreamReduce<K, V> reduceSupplier = new KStreamReduce<>(name, 
reducer);
+    public <K1, V1> KGroupedStream<K1, V1> groupBy(KeyValueMapper<K, V, K1> 
selector,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> valSerde) {
 
-        StateStoreSupplier reduceStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
-
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(reduceName, reduceSupplier, this.name);
-        topology.addStateStore(reduceStore, reduceName);
-
-        // return the KTable representation with the intermediate topic as the 
sources
-        return new KTableImpl<>(topology, reduceName, reduceSupplier, 
sourceNodes);
+        String selectName = internalSelectKey(selector);
+        return new KGroupedStreamImpl<>(topology,
+                                        selectName,
+                                        sourceNodes,
+                                        keySerde,
+                                        valSerde, true);
     }
 
     @Override
-    public KTable<K, V> reduceByKey(Reducer<V> reducer, String name) {
-
-        return reduceByKey(reducer, null, null, name);
+    public KGroupedStream<K, V> groupByKey() {
+        return groupByKey(null, null);
     }
 
     @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> 
aggregateByKey(Initializer<T> initializer,
-                                                                       
Aggregator<K, V, T> aggregator,
-                                                                       
Windows<W> windows,
-                                                                       
Serde<K> keySerde,
-                                                                       
Serde<T> aggValueSerde) {
-
-        String aggregateName = topology.newName(AGGREGATE_NAME);
-
-        KStreamAggProcessorSupplier<K, Windowed<K>, V, T> aggregateSupplier = 
new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator);
-
-        StateStoreSupplier aggregateStore = Stores.create(windows.name())
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .windowed(windows.maintainMs(), windows.segments, false)
-                .build();
+    public KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
+                                           Serde<V> valSerde) {
+        return new KGroupedStreamImpl<>(topology,
+                                        this.name,
+                                        sourceNodes,
+                                        keySerde,
+                                        valSerde,
+                                        this.repartitionRequired);
+    }
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
-        topology.addStateStore(aggregateStore, aggregateName);
 
-        // return the KTable representation with the intermediate topic as the 
sources
-        return new KTableImpl<Windowed<K>, T, T>(topology, aggregateName, 
aggregateSupplier, sourceNodes);
+    private static <K, V> StateStoreSupplier createWindowedStateStore(final 
JoinWindows windows,
+                                                                     final 
Serde<K> keySerde,
+                                                                     final 
Serde<V> valueSerde,
+                                                                     final 
String nameSuffix) {
+        return Stores.create(windows.name() + nameSuffix)
+            .withKeys(keySerde)
+            .withValues(valueSerde)
+            .persistent()
+            .windowed(windows.maintainMs(), windows.segments, true)
+            .build();
     }
 
-    @Override
-    public <T, W extends Window> KTable<Windowed<K>, T> 
aggregateByKey(Initializer<T> initializer,
-                                                                       
Aggregator<K, V, T> aggregator,
-                                                                       
Windows<W> windows) {
+    private interface KStreamImplJoin {
 
-        return aggregateByKey(initializer, aggregator, windows, null, null);
+        <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                            KStream<K1, V2> other,
+                                            ValueJoiner<V1, V2, R> joiner,
+                                            JoinWindows windows,
+                                            Serde<K1> keySerde,
+                                            Serde<V1> lhsValueSerde,
+                                            Serde<V2> otherValueSerde);
     }
 
-    @Override
-    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                           Aggregator<K, V, T> aggregator,
-                                           Serde<K> keySerde,
-                                           Serde<T> aggValueSerde,
-                                           String name) {
+    private class DefaultJoin implements KStreamImplJoin {
 
-        String aggregateName = topology.newName(AGGREGATE_NAME);
+        private final boolean outer;
 
-        KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier = new 
KStreamAggregate<>(name, initializer, aggregator);
+        DefaultJoin(final boolean outer) {
+            this.outer = outer;
+        }
 
-        StateStoreSupplier aggregateStore = Stores.create(name)
-                .withKeys(keySerde)
-                .withValues(aggValueSerde)
-                .persistent()
-                .build();
+        @Override
+        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                                   KStream<K1, V2> other,
+                                                   ValueJoiner<V1, V2, R> 
joiner,
+                                                   JoinWindows windows,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> lhsValueSerde,
+                                                   Serde<V2> otherValueSerde) {
+
+            StateStoreSupplier thisWindow =
+                createWindowedStateStore(windows, keySerde, lhsValueSerde, 
"-this");
+
+            StateStoreSupplier otherWindow =
+                createWindowedStateStore(windows, keySerde, otherValueSerde, 
"-other");
+
+
+            KStreamJoinWindow<K1, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindow.name(),
+                                                                               
    windows.before + windows.after + 1,
+                                                                               
    windows.maintainMs());
+            KStreamJoinWindow<K1, V2> otherWindowedStream = new 
KStreamJoinWindow<>(otherWindow .name(),
+                                                                               
     windows.before + windows.after + 1,
+                                                                               
     windows.maintainMs());
+
+            KStreamKStreamJoin<K1, R, V1, V2> joinThis = new 
KStreamKStreamJoin<>(otherWindow.name(),
+                                                                               
   windows.before,
+                                                                               
   windows.after,
+                                                                               
   joiner,
+                                                                               
   outer);
+            KStreamKStreamJoin<K1, R, V2, V1> joinOther = new 
KStreamKStreamJoin<>(thisWindow.name(),
+                                                                               
    windows.before,
+                                                                               
    windows.after,
+                                                                               
    reverseJoiner(joiner),
+                                                                               
    outer);
+
+            KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>();
+
+            String thisWindowStreamName = topology.newName(WINDOWED_NAME);
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : 
topology.newName(JOINTHIS_NAME);
+            String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : 
topology.newName(JOINOTHER_NAME);
+            String joinMergeName = topology.newName(MERGE_NAME);
+
+            topology.addProcessor(thisWindowStreamName, thisWindowedStream, 
((AbstractStream) lhs).name);
+            topology.addProcessor(otherWindowStreamName, otherWindowedStream, 
((AbstractStream) other).name);
+            topology.addProcessor(joinThisName, joinThis, 
thisWindowStreamName);
+            topology.addProcessor(joinOtherName, joinOther, 
otherWindowStreamName);
+            topology.addProcessor(joinMergeName, joinMerge, joinThisName, 
joinOtherName);
+            topology.addStateStore(thisWindow, thisWindowStreamName, 
otherWindowStreamName);
+            topology.addStateStore(otherWindow, thisWindowStreamName, 
otherWindowStreamName);
+
+            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) 
lhs).sourceNodes);
+            allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
+            return new KStreamImpl<>(topology, joinMergeName, allSourceNodes, 
false);
+        }
+    }
 
-        // aggregate the values with the aggregator and local store
-        topology.addProcessor(aggregateName, aggregateSupplier, this.name);
-        topology.addStateStore(aggregateStore, aggregateName);
 
-        // return the KTable representation with the intermediate topic as the 
sources
-        return new KTableImpl<>(topology, aggregateName, aggregateSupplier, 
sourceNodes);
-    }
+    private class LeftJoin implements KStreamImplJoin {
 
-    @Override
-    public <T> KTable<K, T> aggregateByKey(Initializer<T> initializer,
-                                           Aggregator<K, V, T> aggregator,
-                                           String name) {
+        @Override
+        public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs,
+                                                   KStream<K1, V2> other,
+                                                   ValueJoiner<V1, V2, R> 
joiner,
+                                                   JoinWindows windows,
+                                                   Serde<K1> keySerde,
+                                                   Serde<V1> lhsValueSerde,
+                                                   Serde<V2> otherValueSerde) {
+            StateStoreSupplier otherWindow =
+                createWindowedStateStore(windows, keySerde, otherValueSerde, 
"-other");
 
-        return aggregateByKey(initializer, aggregator, null, null, name);
-    }
+            KStreamJoinWindow<K1, V1>
+                otherWindowedStream = new 
KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, 
windows.maintainMs());
+            KStreamKStreamJoin<K1, R, V1, V2>
+                joinThis = new KStreamKStreamJoin<>(otherWindow.name(), 
windows.before, windows.after, joiner, true);
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> 
windows,
-                                                                   Serde<K> 
keySerde) {
-        return this.aggregateByKey(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, windows, keySerde, Serdes.Long());
-    }
+            String otherWindowStreamName = topology.newName(WINDOWED_NAME);
+            String joinThisName = topology.newName(LEFTJOIN_NAME);
 
-    @Override
-    public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> 
windows) {
-        return countByKey(windows, null);
-    }
+            topology.addProcessor(otherWindowStreamName, otherWindowedStream, 
((AbstractStream) other).name);
+            topology.addProcessor(joinThisName, joinThis, ((AbstractStream) 
lhs).name);
+            topology.addStateStore(otherWindow, joinThisName, 
otherWindowStreamName);
 
-    @Override
-    public KTable<K, Long> countByKey(Serde<K> keySerde, String name) {
-        return this.aggregateByKey(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1L;
-                    }
-                }, keySerde, Serdes.Long(), name);
+            Set<String> allSourceNodes = new HashSet<>(((AbstractStream) 
lhs).sourceNodes);
+            allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);
+            return new KStreamImpl<>(topology, joinThisName, allSourceNodes, 
false);
+        }
     }
 
-    @Override
-    public KTable<K, Long> countByKey(String name) {
-        return countByKey(null, name);
-    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 72029a8..edde009 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,14 +60,11 @@ class KStreamKStreamJoin<K, R, V1, V2> implements 
ProcessorSupplier<K, V1> {
             otherWindow = (WindowStore<K, V2>) 
context.getStateStore(otherWindowName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V1 value) {
-            // the keys should never be null
             if (key == null)
-                throw new StreamsException("Record key for KStream-KStream 
join operator with other window state store " + otherWindowName + " should not 
be null.");
+                return;
 
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index ed6e216..dd5ba45 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -58,14 +57,12 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
         }
 
-        /**
-         * @throws StreamsException if key is null
-         */
+
         @Override
         public void process(K key, V value) {
-            // the keys should never be null
+            // If the key is null we don't need to proceed
             if (key == null)
-                throw new StreamsException("Record key for KStream reduce 
operator with state " + storeName + " should not be null.");
+                return;
 
             V oldAgg = store.get(key);
             V newAgg = oldAgg;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index a526506..46d99a8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -68,7 +68,7 @@ public class KStreamWindowReduce<K, V, W extends Window> 
implements KStreamAggPr
 
         @Override
         public void process(K key, V value) {
-            // if the key is null, we do not need proceed aggregating the 
record
+            // if the key is null, we do not need proceed aggregating
             // the record with the table
             if (key == null)
                 return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 51d4cb4..c5543ad 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -229,7 +229,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
             }
         }), this.name);
 
-        return new KStreamImpl<>(topology, name, sourceNodes);
+        return new KStreamImpl<>(topology, name, sourceNodes, false);
     }
 
     @Override

Reply via email to