[FLINK-2819] Add Windowed Join/CoGroup Operator Based on Tagged Union

Right now, this does everything in memory, so the JVM will blow if data
for one key becomes too large.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8634dbbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8634dbbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8634dbbe

Branch: refs/heads/master
Commit: 8634dbbe998af53471bed2f3a6557c722bb37b87
Parents: 47b5cb7
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 6 16:33:04 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/CoGroupedStreams.java        | 563 +++++++++++++++++++
 .../streaming/api/datastream/DataStream.java    |  36 +-
 .../streaming/api/datastream/JoinedStreams.java | 328 +++++++++++
 .../datastream/temporal/StreamJoinOperator.java | 274 ---------
 .../datastream/temporal/TemporalOperator.java   | 124 ----
 .../api/datastream/temporal/TemporalWindow.java |  45 --
 .../windowing/ReduceWindowFunction.java         |  32 +-
 .../flink/streaming/api/CoGroupJoinITCase.java  | 372 ++++++++++++
 .../streaming/api/WindowCrossJoinTest.java      | 143 -----
 .../api/operators/co/SelfConnectionTest.java    |  61 --
 .../streaming/examples/join/WindowJoin.java     | 128 +++--
 .../examples/join/util/WindowJoinData.java      |  70 +--
 .../scala/examples/join/WindowJoin.scala        |  43 +-
 .../join/WindowJoinITCase.java                  | 101 ++--
 .../join/WindowJoinITCase.java                  | 101 ++--
 .../streaming/api/scala/CoGroupedStreams.scala  | 294 ++++++++++
 .../flink/streaming/api/scala/DataStream.scala  |  24 +-
 .../streaming/api/scala/JoinedStreams.scala     | 303 ++++++++++
 .../api/scala/StreamJoinOperator.scala          | 203 -------
 .../streaming/api/scala/TemporalOperator.scala  |  51 --
 .../streaming/api/scala/CoGroupJoinITCase.scala | 274 +++++++++
 .../StreamingScalaAPICompletenessTest.scala     |  14 +-
 22 files changed, 2409 insertions(+), 1175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
new file mode 100644
index 0000000..e1f1a96
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that 
have been co-grouped.
+ * A streaming co-group operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize co-group operation you also need to specify a {@link 
KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the groups are being built in memory so you need to ensure 
that they don't
+ * get too big. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> two = ...;
+ *
+ * DataStream<T> result = one.coGroup(two)
+ *     .where(new MyFirstKeySelector())
+ *     .equalTo(new MyFirstKeySelector())
+ *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .apply(new MyCoGroupFunction());
+ * } </pre>
+ */
+public class CoGroupedStreams {
+
+       /**
+        * A co-group operation that does not yet have its {@link KeySelector 
KeySelectors} defined.
+        *
+        * @param <T1> Type of the elements from the first input
+        * @param <T2> Type of the elements from the second input
+        */
+       public static class Unspecified<T1, T2> {
+               DataStream<T1> input1;
+               DataStream<T2> input2;
+
+               protected Unspecified(DataStream<T1> input1,
+                               DataStream<T2> input2) {
+                       this.input1 = input1;
+                       this.input2 = input2;
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the first 
input.
+                */
+               public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> 
keySelector)  {
+                       return new WithKey<>(input1, input2, 
input1.clean(keySelector), null);
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the second 
input.
+                */
+               public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> 
keySelector)  {
+                       return new WithKey<>(input1, input2, null, 
input1.clean(keySelector));
+               }
+       }
+
+       /**
+        * A co-group operation that has {@link KeySelector KeySelectors} 
defined for either both or
+        * one input.
+        *
+        * <p>
+        * You need to specify a {@code KeySelector} for both inputs using 
{@link #where(KeySelector)}
+        * and {@link #equalTo(KeySelector)} before you can proceeed with 
specifying a
+        * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+        *
+        * @param <T1> Type of the elements from the first input
+        * @param <T2> Type of the elements from the second input
+        * @param <KEY> Type of the key. This must be the same for both inputs
+        */
+       public static class WithKey<T1, T2, KEY> {
+               DataStream<T1> input1;
+               DataStream<T2> input2;
+
+               KeySelector<T1, KEY> keySelector1;
+               KeySelector<T2, KEY> keySelector2;
+
+               protected WithKey(DataStream<T1> input1, DataStream<T2> input2, 
KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
+                       this.input1 = input1;
+                       this.input2 = input2;
+
+                       this.keySelector1 = keySelector1;
+                       this.keySelector2 = keySelector2;
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the first 
input.
+                */
+               public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> 
keySelector)  {
+                       return new CoGroupedStreams.WithKey<>(input1, input2, 
input1.clean(keySelector), keySelector2);
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the second 
input.
+                */
+               public CoGroupedStreams.WithKey<T1, T2, KEY> 
equalTo(KeySelector<T2, KEY> keySelector)  {
+                       return new CoGroupedStreams.WithKey<>(input1, input2, 
keySelector1, input1.clean(keySelector));
+               }
+
+               /**
+                * Specifies the window on which the co-group operation works.
+                */
+               public <W extends Window> CoGroupedStreams.WithWindow<T1, T2, 
KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+                       if (keySelector1 == null || keySelector2 == null) {
+                               throw new UnsupportedOperationException("You 
first need to specify KeySelectors for both inputs using where() and 
equalTo().");
+
+                       }
+                       return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, assigner, null, null);
+               }
+       }
+
+       /**
+        * A co-group operation that has {@link KeySelector KeySelectors} 
defined for both inputs as
+        * well as a {@link WindowAssigner}.
+        *
+        * @param <T1> Type of the elements from the first input
+        * @param <T2> Type of the elements from the second input
+        * @param <KEY> Type of the key. This must be the same for both inputs
+        * @param <W> Type of {@link Window} on which the co-group operation 
works.
+        */
+       public static class WithWindow<T1, T2, KEY, W extends Window> {
+               private final DataStream<T1> input1;
+               private final DataStream<T2> input2;
+
+               private final KeySelector<T1, KEY> keySelector1;
+               private final KeySelector<T2, KEY> keySelector2;
+
+               private final WindowAssigner<? super TaggedUnion<T1, T2>, W> 
windowAssigner;
+
+               private final Trigger<? super TaggedUnion<T1, T2>, ? super W> 
trigger;
+
+               private final Evictor<? super TaggedUnion<T1, T2>, ? super W> 
evictor;
+
+               protected WithWindow(DataStream<T1> input1,
+                               DataStream<T2> input2,
+                               KeySelector<T1, KEY> keySelector1,
+                               KeySelector<T2, KEY> keySelector2,
+                               WindowAssigner<? super TaggedUnion<T1, T2>, W> 
windowAssigner,
+                               Trigger<? super TaggedUnion<T1, T2>, ? super W> 
trigger,
+                               Evictor<? super TaggedUnion<T1, T2>, ? super W> 
evictor) {
+                       this.input1 = input1;
+                       this.input2 = input2;
+
+                       this.keySelector1 = keySelector1;
+                       this.keySelector2 = keySelector2;
+
+                       this.windowAssigner = windowAssigner;
+                       this.trigger = trigger;
+                       this.evictor = evictor;
+               }
+
+               /**
+                * Sets the {@code Trigger} that should be used to trigger 
window emission.
+                */
+               public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super 
TaggedUnion<T1, T2>, ? super W> newTrigger) {
+                       return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, windowAssigner, newTrigger, evictor);
+               }
+
+               /**
+                * Sets the {@code Evictor} that should be used to evict 
elements from a window before emission.
+                *
+                * <p>
+                * Note: When using an evictor window performance will degrade 
significantly, since
+                * pre-aggregation of window results cannot be used.
+                */
+               public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super 
TaggedUnion<T1, T2>, ? super W> newEvictor) {
+                       return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, windowAssigner, trigger, newEvictor);
+               }
+
+               /**
+                * Completes the co-group operation with the user function that 
is executed
+                * for windowed groups.
+                */
+               public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> 
function) {
+
+                       TypeInformation<T> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
+                                       function,
+                                       CoGroupFunction.class,
+                                       true,
+                                       true,
+                                       input1.getType(),
+                                       input2.getType(),
+                                       "CoGroup",
+                                       false);
+
+                       return apply(function, resultType);
+               }
+
+               /**
+                * Completes the co-group operation with the user function that 
is executed
+                * for windowed groups.
+                */
+               public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> 
function, TypeInformation<T> resultType) {
+                       //clean the closure
+                       function = 
input1.getExecutionEnvironment().clean(function);
+
+                       DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
+                                       .map(new Input1Tagger<T1, T2>())
+                                       .returns(new 
UnionTypeInfo<>(input1.getType(), input2.getType()));
+                       DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
+                                       .map(new Input2Tagger<T1, T2>())
+                                       .returns(new 
UnionTypeInfo<>(input1.getType(), input2.getType()));
+
+                       WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = 
taggedInput1
+                                       .union(taggedInput2)
+                                       .keyBy(new 
UnionKeySelector<>(keySelector1, keySelector2))
+                                       .window(windowAssigner);
+
+                       if (trigger != null) {
+                               windowOp.trigger(trigger);
+                       }
+                       if (evictor != null) {
+                               windowOp.evictor(evictor);
+                       }
+
+                       return windowOp.apply(new CoGroupWindowFunction<T1, T2, 
T, KEY, W>(function), resultType);
+               }
+       }
+
+       /**
+        * Creates a new co-group operation from the two given inputs.
+        */
+       public static <T1, T2> Unspecified<T1, T2> createCoGroup(DataStream<T1> 
input1, DataStream<T2> input2) {
+               return new Unspecified<>(input1, input2);
+       }
+
+       /**
+        * Internal class for implementing tagged union co-group.
+        */
+       public static class TaggedUnion<T1, T2> {
+               private final T1 one;
+               private final T2 two;
+
+               private TaggedUnion(T1 one, T2 two) {
+                       this.one = one;
+                       this.two = two;
+               }
+
+               public boolean isOne() {
+                       return one != null;
+               }
+
+               public boolean isTwo() {
+                       return two != null;
+               }
+
+               public T1 getOne() {
+                       return one;
+               }
+
+               public T2 getTwo() {
+                       return two;
+               }
+
+               public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
+                       return new TaggedUnion<>(one, null);
+               }
+
+               public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
+                       return new TaggedUnion<>(null, two);
+               }
+       }
+
+       private static class UnionTypeInfo<T1, T2> extends 
TypeInformation<TaggedUnion<T1, T2>> {
+               private static final long serialVersionUID = 1L;
+
+               TypeInformation<T1> oneType;
+               TypeInformation<T2> twoType;
+
+               public UnionTypeInfo(TypeInformation<T1> oneType,
+                               TypeInformation<T2> twoType) {
+                       this.oneType = oneType;
+                       this.twoType = twoType;
+               }
+
+               @Override
+               public boolean isBasicType() {
+                       return false;
+               }
+
+               @Override
+               public boolean isTupleType() {
+                       return false;
+               }
+
+               @Override
+               public int getArity() {
+                       return 2;
+               }
+
+               @Override
+               public int getTotalFields() {
+                       return 2;
+               }
+
+               @Override
+               @SuppressWarnings("unchecked, rawtypes")
+               public Class<TaggedUnion<T1, T2>> getTypeClass() {
+                       return (Class) TaggedUnion.class;
+               }
+
+               @Override
+               public boolean isKeyType() {
+                       return true;
+               }
+
+               @Override
+               public TypeSerializer<TaggedUnion<T1, T2>> 
createSerializer(ExecutionConfig config) {
+                       return new 
UnionSerializer<>(oneType.createSerializer(config), 
twoType.createSerializer(config));
+               }
+
+               @Override
+               public String toString() {
+                       return "TaggedUnion<" + oneType + ", " + twoType + ">";
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof UnionTypeInfo) {
+                               @SuppressWarnings("unchecked")
+                               UnionTypeInfo<T1, T2> unionTypeInfo = 
(UnionTypeInfo<T1, T2>) obj;
+
+                               return unionTypeInfo.canEqual(this) && 
oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType);
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       return 31 *  oneType.hashCode() + twoType.hashCode();
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return obj instanceof UnionTypeInfo;
+               }
+       }
+
+       private static class UnionSerializer<T1, T2> extends 
TypeSerializer<TaggedUnion<T1, T2>> {
+               private static final long serialVersionUID = 1L;
+
+               private final TypeSerializer<T1> oneSerializer;
+               private final TypeSerializer<T2> twoSerializer;
+
+               public UnionSerializer(TypeSerializer<T1> oneSerializer,
+                               TypeSerializer<T2> twoSerializer) {
+                       this.oneSerializer = oneSerializer;
+                       this.twoSerializer = twoSerializer;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return false;
+               }
+
+               @Override
+               public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
+                       return this;
+               }
+
+               @Override
+               public TaggedUnion<T1, T2> createInstance() {
+                       return null;
+               }
+
+               @Override
+               public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) {
+                       if (from.isOne()) {
+                               return 
TaggedUnion.one(oneSerializer.copy(from.getOne()));
+                       } else {
+                               return 
TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+                       }
+               }
+
+               @Override
+               public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, 
TaggedUnion<T1, T2> reuse) {
+                       if (from.isOne()) {
+                               return 
TaggedUnion.one(oneSerializer.copy(from.getOne()));
+                       } else {
+                               return 
TaggedUnion.two(twoSerializer.copy(from.getTwo()));
+                       }               }
+
+               @Override
+               public int getLength() {
+                       return 0;
+               }
+
+               @Override
+               public void serialize(TaggedUnion<T1, T2> record, 
DataOutputView target) throws IOException {
+                       if (record.isOne()) {
+                               target.writeByte(1);
+                               oneSerializer.serialize(record.getOne(), 
target);
+                       } else {
+                               target.writeByte(2);
+                               twoSerializer.serialize(record.getTwo(), 
target);
+                       }
+               }
+
+               @Override
+               public TaggedUnion<T1, T2> deserialize(DataInputView source) 
throws IOException {
+                       byte tag = source.readByte();
+                       if (tag == 1) {
+                               return 
TaggedUnion.one(oneSerializer.deserialize(source));
+                       } else {
+                               return 
TaggedUnion.two(twoSerializer.deserialize(source));
+                       }
+               }
+
+               @Override
+               public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> 
reuse,
+                               DataInputView source) throws IOException {
+                       byte tag = source.readByte();
+                       if (tag == 1) {
+                               return 
TaggedUnion.one(oneSerializer.deserialize(source));
+                       } else {
+                               return 
TaggedUnion.two(twoSerializer.deserialize(source));
+                       }
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       byte tag = source.readByte();
+                       target.writeByte(tag);
+                       if (tag == 1) {
+                               oneSerializer.copy(source, target);
+                       } else {
+                               twoSerializer.copy(source, target);
+                       }
+               }
+
+               @Override
+               public int hashCode() {
+                       return 31 * oneSerializer.hashCode() + 
twoSerializer.hashCode();
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public boolean equals(Object obj) {
+                       if (obj instanceof UnionSerializer) {
+                               UnionSerializer<T1, T2> other = 
(UnionSerializer<T1, T2>) obj;
+
+                               return other.canEqual(this) && 
oneSerializer.equals(other.oneSerializer) && 
twoSerializer.equals(other.twoSerializer);
+                       } else {
+                               return false;
+                       }
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return obj instanceof UnionSerializer;
+               }
+       }
+
+       private static class Input1Tagger<T1, T2> implements MapFunction<T1, 
TaggedUnion<T1, T2>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public TaggedUnion<T1, T2> map(T1 value) throws Exception {
+                       return TaggedUnion.one(value);
+               }
+       }
+
+       private static class Input2Tagger<T1, T2> implements MapFunction<T2, 
TaggedUnion<T1, T2>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public TaggedUnion<T1, T2> map(T2 value) throws Exception {
+                       return TaggedUnion.two(value);
+               }
+       }
+
+       private static class UnionKeySelector<T1, T2, KEY> implements 
KeySelector<TaggedUnion<T1, T2>, KEY> {
+               private static final long serialVersionUID = 1L;
+
+               private final KeySelector<T1, KEY> keySelector1;
+               private final KeySelector<T2, KEY> keySelector2;
+
+               public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
+                               KeySelector<T2, KEY> keySelector2) {
+                       this.keySelector1 = keySelector1;
+                       this.keySelector2 = keySelector2;
+               }
+
+               @Override
+               public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
+                       if (value.isOne()) {
+                               return keySelector1.getKey(value.getOne());
+                       } else {
+                               return keySelector2.getKey(value.getTwo());
+                       }
+               }
+       }
+
+       private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends 
Window>
+                       extends WrappingFunction<CoGroupFunction<T1, T2, T>>
+                       implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, 
W> {
+               private static final long serialVersionUID = 1L;
+
+               public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> 
userFunction) {
+                       super(userFunction);
+               }
+
+               @Override
+               public void apply(KEY key,
+                               W window,
+                               Iterable<TaggedUnion<T1, T2>> values,
+                               Collector<T> out) throws Exception {
+                       List<T1> oneValues = Lists.newArrayList();
+                       List<T2> twoValues = Lists.newArrayList();
+                       for (TaggedUnion<T1, T2> val: values) {
+                               if (val.isOne()) {
+                                       oneValues.add(val.getOne());
+                               } else {
+                                       twoValues.add(val.getTwo());
+                               }
+                       }
+                       wrappedFunction.coGroup(oneValues, twoValues, out);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8de1a0d..0be1d56 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -44,7 +44,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis;
@@ -618,30 +617,19 @@ public class DataStream<T> {
        }
 
        /**
-        * Initiates a temporal Join transformation. <br/>
-        * A temporal Join transformation joins the elements of two
-        * {@link DataStream}s on key equality over a specified time window.
-        *
-        * <p>
-        * This method returns a {@link StreamJoinOperator} on which the
-        * {@link StreamJoinOperator#onWindow(long, 
java.util.concurrent.TimeUnit)}
-        * should be called to define the window, and then the
-        * {@link StreamJoinOperator.JoinWindow#where(int...)} and
-        * {@link StreamJoinOperator.JoinPredicate#equalTo(int...)} can be used 
to define
-        * the join keys.
-        * <p>
-        * The user can also use the
-        * {@link 
org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator.JoinPredicate.JoinedStream#with}
-        * to apply a custom join function.
-        * 
-        * @param dataStreamToJoin
-        *            The other DataStream with which this DataStream is joined.
-        * @return A {@link StreamJoinOperator} to continue the definition of 
the
-        *         Join transformation.
-        * 
+        * Creates a join operation. See {@link CoGroupedStreams} for an 
example of how the keys
+        * and window can be specified.
+        */
+       public <T2> CoGroupedStreams.Unspecified<T, T2> coGroup(DataStream<T2> 
otherStream) {
+               return CoGroupedStreams.createCoGroup(this, otherStream);
+       }
+
+       /**
+        * Creates a join operation. See {@link JoinedStreams} for an example 
of how the keys
+        * and window can be specified.
         */
-       public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> 
dataStreamToJoin) {
-               return new StreamJoinOperator<T, IN2>(this, dataStreamToJoin);
+       public <T2> JoinedStreams.Unspecified<T, T2> join(DataStream<T2> 
otherStream) {
+               return JoinedStreams.createJoin(this, otherStream);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
new file mode 100644
index 0000000..ee848e3
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ *{@code JoinedStreams} represents two {@link DataStream DataStreams} that 
have been joined.
+ * A streaming join operation is evaluated over elements in a window.
+ *
+ * <p>
+ * To finalize the join operation you also need to specify a {@link 
KeySelector} for
+ * both the first and second input and a {@link WindowAssigner}.
+ *
+ * <p>
+ * Note: Right now, the the join is being evaluated in memory so you need to 
ensure that the number
+ * of elements per key does not get too high. Otherwise the JVM might crash.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> one = ...;
+ * DataStream<Tuple2<String, Integer>> twp = ...;
+ *
+ * DataStream<T> result = one.join(two)
+ *     .where(new MyFirstKeySelector())
+ *     .equalTo(new MyFirstKeySelector())
+ *     .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .apply(new MyJoinFunction());
+ * } </pre>
+ */
+public class JoinedStreams extends CoGroupedStreams{
+
+       /**
+        * A join operation that does not yet have its {@link KeySelector 
KeySelectors} defined.
+        *
+        * @param <T1> Type of the elements from the first input
+        * @param <T2> Type of the elements from the second input
+        */
+       public static class Unspecified<T1, T2> {
+               DataStream<T1> input1;
+               DataStream<T2> input2;
+
+               protected Unspecified(DataStream<T1> input1,
+                               DataStream<T2> input2) {
+                       this.input1 = input1;
+                       this.input2 = input2;
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the first 
input.
+                */
+               public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> 
keySelector)  {
+                       return new WithKey<>(input1, input2, keySelector, null);
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the second 
input.
+                */
+               public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> 
keySelector)  {
+                       return new WithKey<>(input1, input2, null, keySelector);
+               }
+       }
+
+       /**
+        * A join operation that has {@link KeySelector KeySelectors} defined 
for either both or
+        * one input.
+        *
+        * <p>
+        * You need to specify a {@code KeySelector} for both inputs using 
{@link #where(KeySelector)}
+        * and {@link #equalTo(KeySelector)} before you can proceeed with 
specifying a
+        * {@link WindowAssigner} using {@link #window(WindowAssigner)}.
+        *
+        * @param <T1> Type of the elements from the first input
+        * @param <T2> Type of the elements from the second input
+        * @param <KEY> Type of the key. This must be the same for both inputs
+        */
+       public static class WithKey<T1, T2, KEY> {
+               DataStream<T1> input1;
+               DataStream<T2> input2;
+
+               KeySelector<T1, KEY> keySelector1;
+               KeySelector<T2, KEY> keySelector2;
+
+               protected WithKey(DataStream<T1> input1, DataStream<T2> input2, 
KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) {
+                       this.input1 = input1;
+                       this.input2 = input2;
+
+                       this.keySelector1 = keySelector1;
+                       this.keySelector2 = keySelector2;
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the first 
input.
+                */
+               public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> 
keySelector)  {
+                       return new JoinedStreams.WithKey<>(input1, input2, 
keySelector, keySelector2);
+               }
+
+               /**
+                * Specifies a {@link KeySelector} for elements from the second 
input.
+                */
+               public JoinedStreams.WithKey<T1, T2, KEY> 
equalTo(KeySelector<T2, KEY> keySelector)  {
+                       return new JoinedStreams.WithKey<>(input1, input2, 
keySelector1, keySelector);
+               }
+
+               /**
+                * Specifies the window on which the join operation works.
+                */
+               public <W extends Window> JoinedStreams.WithWindow<T1, T2, KEY, 
W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
+                       if (keySelector1 == null || keySelector2 == null) {
+                               throw new UnsupportedOperationException("You 
first need to specify KeySelectors for both inputs using where() and 
equalTo().");
+
+                       }
+                       return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, assigner, null, null);
+               }
+       }
+
+       /**
+        * A join operation that has {@link KeySelector KeySelectors} defined 
for both inputs as
+        * well as a {@link WindowAssigner}.
+        *
+        * @param <T1> Type of the elements from the first input
+        * @param <T2> Type of the elements from the second input
+        * @param <KEY> Type of the key. This must be the same for both inputs
+        * @param <W> Type of {@link Window} on which the join operation works.
+        */
+       public static class WithWindow<T1, T2, KEY, W extends Window> {
+               private final DataStream<T1> input1;
+               private final DataStream<T2> input2;
+
+               private final KeySelector<T1, KEY> keySelector1;
+               private final KeySelector<T2, KEY> keySelector2;
+
+               private final WindowAssigner<? super TaggedUnion<T1, T2>, W> 
windowAssigner;
+
+               private final Trigger<? super TaggedUnion<T1, T2>, ? super W> 
trigger;
+
+               private final Evictor<? super TaggedUnion<T1, T2>, ? super W> 
evictor;
+
+               protected WithWindow(DataStream<T1> input1,
+                               DataStream<T2> input2,
+                               KeySelector<T1, KEY> keySelector1,
+                               KeySelector<T2, KEY> keySelector2,
+                               WindowAssigner<? super TaggedUnion<T1, T2>, W> 
windowAssigner,
+                               Trigger<? super TaggedUnion<T1, T2>, ? super W> 
trigger,
+                               Evictor<? super TaggedUnion<T1, T2>, ? super W> 
evictor) {
+                       this.input1 = input1;
+                       this.input2 = input2;
+
+                       this.keySelector1 = keySelector1;
+                       this.keySelector2 = keySelector2;
+
+                       this.windowAssigner = windowAssigner;
+                       this.trigger = trigger;
+                       this.evictor = evictor;
+               }
+
+               /**
+                * Sets the {@code Trigger} that should be used to trigger 
window emission.
+                */
+               public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super 
TaggedUnion<T1, T2>, ? super W> newTrigger) {
+                       return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, windowAssigner, newTrigger, evictor);
+               }
+
+               /**
+                * Sets the {@code Evictor} that should be used to evict 
elements from a window before emission.
+                *
+                * <p>
+                * Note: When using an evictor window performance will degrade 
significantly, since
+                * pre-aggregation of window results cannot be used.
+                */
+               public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super 
TaggedUnion<T1, T2>, ? super W> newEvictor) {
+                       return new WithWindow<>(input1, input2, keySelector1, 
keySelector2, windowAssigner, trigger, newEvictor);
+               }
+
+               /**
+                * Completes the join operation with the user function that is 
executed
+                * for each combination of elements with the same key in a 
window.
+                */
+               public <T> DataStream<T> apply(JoinFunction<T1, T2, T> 
function) {
+                       TypeInformation<T> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
+                                       function,
+                                       JoinFunction.class,
+                                       true,
+                                       true,
+                                       input1.getType(),
+                                       input2.getType(),
+                                       "CoGroup",
+                                       false);
+
+                       return apply(function, resultType);
+               }
+
+               /**
+                * Completes the join operation with the user function that is 
executed
+                * for each combination of elements with the same key in a 
window.
+                */
+               public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> 
function, TypeInformation<T> resultType) {
+                       //clean the closure
+                       function = 
input1.getExecutionEnvironment().clean(function);
+
+                       return input1.coGroup(input2)
+                                       .where(keySelector1)
+                                       .equalTo(keySelector2)
+                                       .window(windowAssigner)
+                                       .trigger(trigger)
+                                       .evictor(evictor)
+                                       .apply(new 
FlatJoinCoGroupFunction<>(function), resultType);
+
+               }
+
+               /**
+                * Completes the join operation with the user function that is 
executed
+                * for each combination of elements with the same key in a 
window.
+                */
+               public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> 
function) {
+                       TypeInformation<T> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
+                                       function,
+                                       JoinFunction.class,
+                                       true,
+                                       true,
+                                       input1.getType(),
+                                       input2.getType(),
+                                       "CoGroup",
+                                       false);
+
+                       return apply(function, resultType);
+               }
+
+               /**
+                * Completes the join operation with the user function that is 
executed
+                * for each combination of elements with the same key in a 
window.
+                */
+               public <T> DataStream<T> apply(JoinFunction<T1, T2, T> 
function, TypeInformation<T> resultType) {
+                       //clean the closure
+                       function = 
input1.getExecutionEnvironment().clean(function);
+
+                       return input1.coGroup(input2)
+                                       .where(keySelector1)
+                                       .equalTo(keySelector2)
+                                       .window(windowAssigner)
+                                       .trigger(trigger)
+                                       .evictor(evictor)
+                                       .apply(new 
JoinCoGroupFunction<>(function), resultType);
+
+               }
+       }
+
+       /**
+        * Creates a new join operation from the two given inputs.
+        */
+       public static <T1, T2> Unspecified<T1, T2> createJoin(DataStream<T1> 
input1, DataStream<T2> input2) {
+               return new Unspecified<>(input1, input2);
+       }
+
+       /**
+        * CoGroup function that does a nested-loop join to get the join result.
+        */
+       private static class JoinCoGroupFunction<T1, T2, T>
+                       extends WrappingFunction<JoinFunction<T1, T2, T>>
+                       implements CoGroupFunction<T1, T2, T> {
+               private static final long serialVersionUID = 1L;
+
+               public JoinCoGroupFunction(JoinFunction<T1, T2, T> 
wrappedFunction) {
+                       super(wrappedFunction);
+               }
+
+               @Override
+               public void coGroup(Iterable<T1> first, Iterable<T2> second, 
Collector<T> out) throws Exception {
+                       for (T1 val1: first) {
+                               for (T2 val2: second) {
+                                       out.collect(wrappedFunction.join(val1, 
val2));
+                               }
+                       }
+               }
+       }
+
+       /**
+        * CoGroup function that does a nested-loop join to get the join 
result. (FlatJoin version)
+        */
+       private static class FlatJoinCoGroupFunction<T1, T2, T>
+                       extends WrappingFunction<FlatJoinFunction<T1, T2, T>>
+                       implements CoGroupFunction<T1, T2, T> {
+               private static final long serialVersionUID = 1L;
+
+               public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> 
wrappedFunction) {
+                       super(wrappedFunction);
+               }
+
+               @Override
+               public void coGroup(Iterable<T1> first, Iterable<T2> second, 
Collector<T> out) throws Exception {
+                       for (T1 val1: first) {
+                               for (T2 val2: second) {
+                                       wrappedFunction.join(val1, val2, out);
+                               }
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
deleted file mode 100644
index 4a5622d..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-public class StreamJoinOperator<I1, I2> extends
-               TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> 
{
-
-       public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) 
{
-               super(input1, input2);
-       }
-
-       @Override
-       protected JoinWindow<I1, I2> createNextWindowOperator() {
-               return new JoinWindow<I1, I2>(this);
-       }
-
-       public static class JoinWindow<I1, I2> implements 
TemporalWindow<JoinWindow<I1, I2>> {
-
-               private StreamJoinOperator<I1, I2> op;
-               private TypeInformation<I1> type1;
-
-               private JoinWindow(StreamJoinOperator<I1, I2> operator) {
-                       this.op = operator;
-                       this.type1 = op.input1.getType();
-               }
-
-               /**
-                * Continues a temporal Join transformation. <br/>
-                * Defines the {@link Tuple} fields of the first join {@link 
DataStream}
-                * that should be used as join keys.<br/>
-                * <b>Note: Fields can only be selected as join keys on Tuple
-                * DataStreams.</b><br/>
-                * 
-                * @param fields
-                *            The indexes of the other Tuple fields of the 
first join
-                *            DataStreams that should be used as keys.
-                * @return An incomplete Join transformation. Call
-                *         {@link JoinPredicate#equalTo} to continue the Join.
-                */
-               public JoinPredicate<I1, I2> where(int... fields) {
-                       return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
-                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1, op.input1.getExecutionEnvironment().getConfig()));
-               }
-
-               /**
-                * Continues a temporal join transformation. <br/>
-                * Defines the fields of the first join {@link DataStream} that 
should
-                * be used as grouping keys. Fields are the names of member 
fields of
-                * the underlying type of the data stream.
-                * 
-                * @param fields
-                *            The fields of the first join DataStream that 
should be
-                *            used as keys.
-                * @return An incomplete Join transformation. Call
-                *         {@link JoinPredicate#equalTo} to continue the Join.
-                */
-               public JoinPredicate<I1, I2> where(String... fields) {
-                       return new JoinPredicate<I1, I2>(op, 
KeySelectorUtil.getSelectorForKeys(
-                                       new Keys.ExpressionKeys<I1>(fields, 
type1), type1, op.input1.getExecutionEnvironment().getConfig()));
-               }
-
-               /**
-                * Continues a temporal Join transformation and defines a
-                * {@link KeySelector} function for the first join {@link 
DataStream}
-                * .</br> The KeySelector function is called for each element 
of the
-                * first DataStream and extracts a single key value on which the
-                * DataStream is joined. </br>
-                * 
-                * @param keySelector
-                *            The KeySelector function which extracts the key 
values
-                *            from the DataStream on which it is joined.
-                * @return An incomplete Join transformation. Call
-                *         {@link JoinPredicate#equalTo} to continue the Join.
-                */
-               public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> 
keySelector) {
-                       return new JoinPredicate<I1, I2>(op, keySelector);
-               }
-
-               @Override
-               public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) 
{
-                       return every(timeUnit.toMillis(length));
-               }
-
-               @Override
-               public JoinWindow<I1, I2> every(long length) {
-                       op.slideInterval = length;
-                       return this;
-               }
-
-               // 
----------------------------------------------------------------------------------------
-
-       }
-
-       /**
-        * Intermediate step of a temporal Join transformation. <br/>
-        * To continue the Join transformation, select the join key of the 
second
-        * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
-        * 
-        */
-       public static class JoinPredicate<I1, I2> {
-
-               private StreamJoinOperator<I1, I2> op;
-               private KeySelector<I1, ?> keys1;
-               private KeySelector<I2, ?> keys2;
-               private TypeInformation<I2> type2;
-
-               private JoinPredicate(StreamJoinOperator<I1, I2> operator, 
KeySelector<I1, ?> keys1) {
-                       this.op = operator;
-                       this.keys1 = keys1;
-                       this.type2 = op.input2.getType();
-               }
-
-               /**
-                * Creates a temporal Join transformation and defines the 
{@link Tuple}
-                * fields of the second join {@link DataStream} that should be 
used as
-                * join keys.<br/>
-                * </p> The resulting operator wraps each pair of joining 
elements in a
-                * Tuple2<I1,I2>(first, second). To use a different wrapping 
function
-                * use {@link JoinedStream#with(JoinFunction)}
-                * 
-                * @param fields
-                *            The indexes of the Tuple fields of the second join
-                *            DataStream that should be used as keys.
-                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
-                *         apply a custom wrapping
-                */
-               public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(int... 
fields) {
-                       keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields, type2),
-                                       type2, 
op.input1.getExecutionEnvironment().getConfig());
-                       return createJoinOperator();
-               }
-
-               /**
-                * Creates a temporal Join transformation and defines the 
fields of the
-                * second join {@link DataStream} that should be used as join 
keys. </p>
-                * The resulting operator wraps each pair of joining elements 
in a
-                * Tuple2<I1,I2>(first, second). To use a different wrapping 
function
-                * use {@link JoinedStream#with(JoinFunction)}
-                * 
-                * @param fields
-                *            The fields of the second join DataStream that 
should be
-                *            used as keys.
-                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
-                *         apply a custom wrapping
-                */
-               public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(String... 
fields) {
-                       this.keys2 = KeySelectorUtil.getSelectorForKeys(new 
Keys.ExpressionKeys<I2>(fields,
-                                       type2), type2, 
op.input1.getExecutionEnvironment().getConfig());
-                       return createJoinOperator();
-               }
-
-               /**
-                * Creates a temporal Join transformation and defines a
-                * {@link KeySelector} function for the second join {@link 
DataStream}
-                * .</br> The KeySelector function is called for each element 
of the
-                * second DataStream and extracts a single key value on which 
the
-                * DataStream is joined. </p> The resulting operator wraps each 
pair of
-                * joining elements in a Tuple2<I1,I2>(first, second). To use a
-                * different wrapping function use
-                * {@link JoinedStream#with(JoinFunction)}
-                * 
-                * 
-                * @param keySelector
-                *            The KeySelector function which extracts the key 
values
-                *            from the second DataStream on which it is joined.
-                * @return A streaming join operator. Call {@link 
JoinedStream#with} to
-                *         apply a custom wrapping
-                */
-               public <K> JoinedStream<I1, I2, Tuple2<I1, I2>> 
equalTo(KeySelector<I2, K> keySelector) {
-                       this.keys2 = keySelector;
-                       return createJoinOperator();
-               }
-
-               private JoinedStream<I1, I2, Tuple2<I1, I2>> 
createJoinOperator() {
-
-//                     JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new 
DefaultJoinFunction<I1, I2>();
-//
-//                     JoinWindowFunction<I1, I2, Tuple2<I1, I2>> 
joinWindowFunction = getJoinWindowFunction(
-//                                     joinFunction, this);
-//
-//                     TypeInformation<Tuple2<I1, I2>> outType = new 
TupleTypeInfo<Tuple2<I1, I2>>(
-//                                     op.input1.getType(), 
op.input2.getType());
-
-//                     return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, 
op.input1
-//                                     .keyBy(keys1)
-//                                     .connect(op.input2.keyBy(keys2))
-//                                     
.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
-//                                                     op.slideInterval, 
op.timeStamp1, op.timeStamp2));
-                       return null;
-               }
-
-               public static class JoinedStream<I1, I2, R> extends
-                               SingleOutputStreamOperator<R, JoinedStream<I1, 
I2, R>> {
-                       private final JoinPredicate<I1, I2> predicate;
-
-                       private JoinedStream(JoinPredicate<I1, I2> predicate, 
DataStream<R> ds) {
-                               super(ds.getExecutionEnvironment(), 
ds.getTransformation());
-                               this.predicate = predicate;
-                       }
-
-                       /**
-                        * Completes a stream join. </p> The resulting operator 
wraps each pair
-                        * of joining elements using the user defined {@link 
JoinFunction}
-                        *
-                        * @return The joined data stream.
-                        */
-                       @SuppressWarnings("unchecked")
-                       public <OUT> SingleOutputStreamOperator<OUT, ?> 
with(JoinFunction<I1, I2, OUT> joinFunction) {
-
-                               TypeInformation<OUT> outType = 
TypeExtractor.getJoinReturnTypes(joinFunction,
-                                               predicate.op.input1.getType(), 
predicate.op.input2.getType());
-
-//                             JoinWindowFunction<I1, I2, OUT> 
joinWindowFunction = getJoinWindowFunction(joinFunction, predicate);
-//
-
-//                             return new JoinedStream<I1, I2, OUT>(
-//                                             predicate, predicate.op.input1
-//                                             .keyBy(predicate.keys1)
-//                                             
.connect(predicate.op.input2.keyBy(predicate.keys2))
-//                                             
.addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize,
-//                                                             
predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2));
-                               return null;
-                       }
-               }
-       }
-
-       public static final class DefaultJoinFunction<I1, I2> implements
-                       JoinFunction<I1, I2, Tuple2<I1, I2>> {
-
-               private static final long serialVersionUID = 1L;
-               private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
-
-               @Override
-               public Tuple2<I1, I2> join(I1 first, I2 second) throws 
Exception {
-                       outTuple.f0 = first;
-                       outTuple.f1 = second;
-                       return outTuple;
-               }
-       }
-
-//     private static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> 
getJoinWindowFunction(
-//                     JoinFunction<I1, I2, OUT> joinFunction, 
JoinPredicate<I1, I2> predicate) {
-//             return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, 
predicate.keys2, joinFunction);
-//     }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
deleted file mode 100644
index 9da00f2..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
-
-       public final DataStream<I1> input1;
-       public final DataStream<I2> input2;
-
-       public long windowSize;
-       public long slideInterval;
-
-       public TimestampWrapper<I1> timeStamp1;
-       public TimestampWrapper<I2> timeStamp2;
-
-       public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
-               if (input1 == null || input2 == null) {
-                       throw new NullPointerException();
-               }
-               this.input1 = input1;
-               this.input2 = input2;
-       }
-
-       /**
-        * Continues a temporal transformation.<br/>
-        * Defines the window size on which the two DataStreams will be 
transformed.
-        * To define sliding windows call {@link TemporalWindow#every} on the
-        * resulting operator.
-        * 
-        * @param length
-        *            The size of the window in milliseconds.
-        * @param timeUnit
-        *            The unit if time to be used
-        * @return An incomplete temporal transformation.
-        */
-       @SuppressWarnings("unchecked")
-       public OP onWindow(long length, TimeUnit timeUnit) {
-               return onWindow(timeUnit.toMillis(length),
-                               (TimestampWrapper<I1>) 
SystemTimestamp.getWrapper(),
-                               (TimestampWrapper<I2>) 
SystemTimestamp.getWrapper());
-       }
-
-       /**
-        * Continues a temporal transformation.<br/>
-        * Defines the window size on which the two DataStreams will be
-        * transformed.To define sliding windows call {@link 
TemporalWindow#every}
-        * on the resulting operator.
-        * 
-        * @param length
-        *            The size of the window in milliseconds.
-        * @param timeStamp1
-        *            The timestamp used to extract time from the elements of 
the
-        *            first data stream.
-        * @param timeStamp2
-        *            The timestamp used to extract time from the elements of 
the
-        *            second data stream.
-        * @return An incomplete temporal transformation.
-        */
-       public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> 
timeStamp2) {
-               return onWindow(length, timeStamp1, timeStamp2, 0);
-       }
-
-       /**
-        * Continues a temporal transformation.<br/>
-        * Defines the window size on which the two DataStreams will be
-        * transformed.To define sliding windows call {@link 
TemporalWindow#every}
-        * on the resulting operator.
-        * 
-        * @param length
-        *            The size of the window in milliseconds.
-        * @param timeStamp1
-        *            The timestamp used to extract time from the elements of 
the
-        *            first data stream.
-        * @param timeStamp2
-        *            The timestamp used to extract time from the elements of 
the
-        *            second data stream.
-        * @param startTime
-        *            The start time to measure the first window
-        * @return An incomplete temporal transformation.
-        */
-       public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> 
timeStamp2,
-                       long startTime) {
-               return onWindow(length, new TimestampWrapper<I1>(timeStamp1, 
startTime),
-                               new TimestampWrapper<I2>(timeStamp2, 
startTime));
-       }
-
-       private OP onWindow(long length, TimestampWrapper<I1> timeStamp1,
-                       TimestampWrapper<I2> timeStamp2) {
-
-               this.windowSize = length;
-               this.slideInterval = length;
-
-               this.timeStamp1 = timeStamp1;
-               this.timeStamp2 = timeStamp2;
-
-               return createNextWindowOperator();
-       }
-
-       protected abstract OP createNextWindowOperator();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
deleted file mode 100644
index 49c75c4..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream.temporal;
-
-import java.util.concurrent.TimeUnit;
-
-public interface TemporalWindow<T> {
-
-       /**
-        * Defines the slide interval for this temporal operator
-        * 
-        * @param length
-        *            Length of the window
-        * @param timeUnit
-        *            Unit of time
-        * @return The temporal operator with slide interval specified
-        */
-       public T every(long length, TimeUnit timeUnit);
-
-       /**
-        * Defines the slide interval for this temporal operator
-        * 
-        * @param length
-        *            Length of the window
-        * @return The temporal operator with slide interval specified
-        */
-       public T every(long length);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
index 042fe18..edd8a34 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java
@@ -18,37 +18,17 @@
 package org.apache.flink.streaming.api.functions.windowing;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
-public class ReduceWindowFunction<K, W extends Window, T> extends 
RichWindowFunction<T, T, K, W> {
+public class ReduceWindowFunction<K, W extends Window, T>
+               extends WrappingFunction<ReduceFunction<T>>
+               implements WindowFunction<T, T, K, W> {
        private static final long serialVersionUID = 1L;
 
-       private final ReduceFunction<T> reduceFunction;
-
        public ReduceWindowFunction(ReduceFunction<T> reduceFunction) {
-               this.reduceFunction = reduceFunction;
-       }
-
-       @Override
-       public void setRuntimeContext(RuntimeContext ctx) {
-               super.setRuntimeContext(ctx);
-               FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx);
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               FunctionUtils.openFunction(reduceFunction, parameters);
-       }
-
-       @Override
-       public void close() throws Exception {
-               super.close();
-               FunctionUtils.closeFunction(reduceFunction);
+               super(reduceFunction);
        }
 
        @Override
@@ -59,7 +39,7 @@ public class ReduceWindowFunction<K, W extends Window, T> 
extends RichWindowFunc
                        if (result == null) {
                                result = v;
                        } else {
-                               result = reduceFunction.reduce(result, v);
+                               result = wrappedFunction.reduce(result, v);
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
new file mode 100644
index 0000000..c06a608
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
@@ -0,0 +1,372 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.api;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+
+       private static List<String> testResults;
+
+       @Test
+       public void testCoGroup() throws Exception {
+
+               testResults = Lists.newArrayList();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.getConfig().enableTimestamps();
+
+               DataStream<Tuple2<String, Integer>> source1 = env.addSource(new 
SourceFunction<Tuple2<String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple2<String, Integer>> 
ctx) throws Exception {
+                               ctx.collect(Tuple2.of("a", 0));
+                               ctx.collect(Tuple2.of("a", 1));
+                               ctx.collect(Tuple2.of("a", 2));
+
+                               ctx.collect(Tuple2.of("b", 3));
+                               ctx.collect(Tuple2.of("b", 4));
+                               ctx.collect(Tuple2.of("b", 5));
+
+                               ctx.collect(Tuple2.of("a", 6));
+                               ctx.collect(Tuple2.of("a", 7));
+                               ctx.collect(Tuple2.of("a", 8));
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).extractTimestamp(new Tuple2TimestampExtractor());
+
+               DataStream<Tuple2<String, Integer>> source2 = env.addSource(new 
SourceFunction<Tuple2<String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple2<String, Integer>> 
ctx) throws Exception {
+                               ctx.collect(Tuple2.of("a", 0));
+                               ctx.collect(Tuple2.of("a", 1));
+
+                               ctx.collect(Tuple2.of("b", 3));
+
+                               ctx.collect(Tuple2.of("c", 6));
+                               ctx.collect(Tuple2.of("c", 7));
+                               ctx.collect(Tuple2.of("c", 8));
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).extractTimestamp(new Tuple2TimestampExtractor());
+
+
+               source1.coGroup(source2)
+                               .where(new Tuple2KeyExtractor())
+                               .equalTo(new Tuple2KeyExtractor())
+                               .window(TumblingTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                               .apply(new 
CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
+                                       @Override
+                                       public void 
coGroup(Iterable<Tuple2<String, Integer>> first,
+                                                       Iterable<Tuple2<String, 
Integer>> second,
+                                                       Collector<String> out) 
throws Exception {
+                                               StringBuilder result = new 
StringBuilder();
+                                               result.append("F:");
+                                               for (Tuple2<String, Integer> t: 
first) {
+                                                       
result.append(t.toString());
+                                               }
+                                               result.append(" S:");
+                                               for (Tuple2<String, Integer> t: 
second) {
+                                                       
result.append(t.toString());
+                                               }
+                                               out.collect(result.toString());
+                                       }
+                               })
+                               .addSink(new SinkFunction<String>() {
+                                       @Override
+                                       public void invoke(String value) throws 
Exception {
+                                               testResults.add(value);
+                                       }
+                               });
+
+               env.execute("CoGroup Test");
+
+               List<String> expectedResult = Lists.newArrayList(
+                               "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
+                               "F:(b,3)(b,4)(b,5) S:(b,3)",
+                               "F:(a,6)(a,7)(a,8) S:",
+                               "F: S:(c,6)(c,7)(c,8)");
+
+               Collections.sort(expectedResult);
+               Collections.sort(testResults);
+
+               Assert.assertEquals(expectedResult, testResults);
+       }
+
+       @Test
+       public void testJoin() throws Exception {
+
+               testResults = Lists.newArrayList();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.getConfig().enableTimestamps();
+
+               DataStream<Tuple3<String, String, Integer>> source1 = 
env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple3<String, String, 
Integer>> ctx) throws Exception {
+                               ctx.collect(Tuple3.of("a", "x", 0));
+                               ctx.collect(Tuple3.of("a", "y", 1));
+                               ctx.collect(Tuple3.of("a", "z", 2));
+
+                               ctx.collect(Tuple3.of("b", "u", 3));
+                               ctx.collect(Tuple3.of("b", "w", 5));
+
+                               ctx.collect(Tuple3.of("a", "i", 6));
+                               ctx.collect(Tuple3.of("a", "j", 7));
+                               ctx.collect(Tuple3.of("a", "k", 8));
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).extractTimestamp(new Tuple3TimestampExtractor());
+
+               DataStream<Tuple3<String, String, Integer>> source2 = 
env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple3<String, String, 
Integer>> ctx) throws Exception {
+                               ctx.collect(Tuple3.of("a", "u", 0));
+                               ctx.collect(Tuple3.of("a", "w", 1));
+
+                               ctx.collect(Tuple3.of("b", "i", 3));
+                               ctx.collect(Tuple3.of("b", "k", 5));
+
+                               ctx.collect(Tuple3.of("a", "x", 6));
+                               ctx.collect(Tuple3.of("a", "z", 8));
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).extractTimestamp(new Tuple3TimestampExtractor());
+
+
+               source1.join(source2)
+                               .where(new Tuple3KeyExtractor())
+                               .equalTo(new Tuple3KeyExtractor())
+                               .window(TumblingTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                               .apply(new JoinFunction<Tuple3<String, String, 
Integer>, Tuple3<String, String, Integer>, String>() {
+                                       @Override
+                                       public String join(Tuple3<String, 
String, Integer> first, Tuple3<String, String, Integer> second) throws 
Exception {
+                                               return first + ":" + second;
+                                       }
+                               })
+                               .addSink(new SinkFunction<String>() {
+                                       @Override
+                                       public void invoke(String value) throws 
Exception {
+                                               testResults.add(value);
+                                       }
+                               });
+
+               env.execute("Join Test");
+
+               List<String> expectedResult = Lists.newArrayList(
+                               "(a,x,0):(a,u,0)",
+                               "(a,x,0):(a,w,1)",
+                               "(a,y,1):(a,u,0)",
+                               "(a,y,1):(a,w,1)",
+                               "(a,z,2):(a,u,0)",
+                               "(a,z,2):(a,w,1)",
+                               "(b,u,3):(b,i,3)",
+                               "(b,u,3):(b,k,5)",
+                               "(b,w,5):(b,i,3)",
+                               "(b,w,5):(b,k,5)",
+                               "(a,i,6):(a,x,6)",
+                               "(a,i,6):(a,z,8)",
+                               "(a,j,7):(a,x,6)",
+                               "(a,j,7):(a,z,8)",
+                               "(a,k,8):(a,x,6)",
+                               "(a,k,8):(a,z,8)");
+
+               Collections.sort(expectedResult);
+               Collections.sort(testResults);
+
+               Assert.assertEquals(expectedResult, testResults);
+       }
+
+       @Test
+       public void testSelfJoin() throws Exception {
+
+               testResults = Lists.newArrayList();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.getConfig().enableTimestamps();
+
+               DataStream<Tuple3<String, String, Integer>> source1 = 
env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Tuple3<String, String, 
Integer>> ctx) throws Exception {
+                               ctx.collect(Tuple3.of("a", "x", 0));
+                               ctx.collect(Tuple3.of("a", "y", 1));
+                               ctx.collect(Tuple3.of("a", "z", 2));
+
+                               ctx.collect(Tuple3.of("b", "u", 3));
+                               ctx.collect(Tuple3.of("b", "w", 5));
+
+                               ctx.collect(Tuple3.of("a", "i", 6));
+                               ctx.collect(Tuple3.of("a", "j", 7));
+                               ctx.collect(Tuple3.of("a", "k", 8));
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               }).extractTimestamp(new Tuple3TimestampExtractor());
+
+               source1.join(source1)
+                               .where(new Tuple3KeyExtractor())
+                               .equalTo(new Tuple3KeyExtractor())
+                               .window(TumblingTimeWindows.of(Time.of(3, 
TimeUnit.MILLISECONDS)))
+                               .apply(new JoinFunction<Tuple3<String, String, 
Integer>, Tuple3<String, String, Integer>, String>() {
+                                       @Override
+                                       public String join(Tuple3<String, 
String, Integer> first, Tuple3<String, String, Integer> second) throws 
Exception {
+                                               return first + ":" + second;
+                                       }
+                               })
+                               .addSink(new SinkFunction<String>() {
+                                       @Override
+                                       public void invoke(String value) throws 
Exception {
+                                               testResults.add(value);
+                                       }
+                               });
+
+               env.execute("Self-Join Test");
+
+               List<String> expectedResult = Lists.newArrayList(
+                               "(a,x,0):(a,x,0)",
+                               "(a,x,0):(a,y,1)",
+                               "(a,x,0):(a,z,2)",
+                               "(a,y,1):(a,x,0)",
+                               "(a,y,1):(a,y,1)",
+                               "(a,y,1):(a,z,2)",
+                               "(a,z,2):(a,x,0)",
+                               "(a,z,2):(a,y,1)",
+                               "(a,z,2):(a,z,2)",
+                               "(b,u,3):(b,u,3)",
+                               "(b,u,3):(b,w,5)",
+                               "(b,w,5):(b,u,3)",
+                               "(b,w,5):(b,w,5)",
+                               "(a,i,6):(a,i,6)",
+                               "(a,i,6):(a,j,7)",
+                               "(a,i,6):(a,k,8)",
+                               "(a,j,7):(a,i,6)",
+                               "(a,j,7):(a,j,7)",
+                               "(a,j,7):(a,k,8)",
+                               "(a,k,8):(a,i,6)",
+                               "(a,k,8):(a,j,7)",
+                               "(a,k,8):(a,k,8)");
+
+               Collections.sort(expectedResult);
+               Collections.sort(testResults);
+
+               Assert.assertEquals(expectedResult, testResults);
+       }
+
+       private static class Tuple2TimestampExtractor implements 
TimestampExtractor<Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public long extractTimestamp(Tuple2<String, Integer> element, 
long currentTimestamp) {
+                       return element.f1;
+               }
+
+               @Override
+               public long emitWatermark(Tuple2<String, Integer> element, long 
currentTimestamp) {
+                       return element.f1 - 1;
+               }
+
+               @Override
+               public long getCurrentWatermark() {
+                       return Long.MIN_VALUE;
+               }
+       }
+
+       private static class Tuple3TimestampExtractor implements 
TimestampExtractor<Tuple3<String, String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public long extractTimestamp(Tuple3<String, String, Integer> 
element, long currentTimestamp) {
+                       return element.f2;
+               }
+
+               @Override
+               public long emitWatermark(Tuple3<String, String, Integer> 
element, long currentTimestamp) {
+                       return element.f2 - 1;
+               }
+
+               @Override
+               public long getCurrentWatermark() {
+                       return Long.MIN_VALUE;
+               }
+       }
+
+       private static class Tuple2KeyExtractor implements 
KeySelector<Tuple2<String,Integer>, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String getKey(Tuple2<String, Integer> value) throws 
Exception {
+                       return value.f0;
+               }
+       }
+
+       private static class Tuple3KeyExtractor implements 
KeySelector<Tuple3<String, String, Integer>, String> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public String getKey(Tuple3<String, String, Integer> value) 
throws Exception {
+                       return value.f0;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
deleted file mode 100644
index 7d2a131..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import org.apache.flink.api.common.functions.CrossFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase {
-
-       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>();
-       private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> 
crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, 
Integer>>();
-
-       private static class MyTimestamp<T> implements Timestamp<T> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public long getTimestamp(T value) {
-                       return 101L;
-               }
-       }
-
-       /**
-        * TODO: enable once new join operator is ready
-        * @throws Exception
-        */
-       @Ignore
-       @Test
-       public void test() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(1);
-               env.setBufferTimeout(1);
-
-               TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> 
joinResultSink =
-                               new TestListResultSink<Tuple2<Tuple2<Integer, 
String>, Integer>>();
-               TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> 
crossResultSink =
-                               new TestListResultSink<Tuple2<Tuple2<Integer, 
String>, Integer>>();
-
-               ArrayList<Tuple2<Integer, String>> in1 = new 
ArrayList<Tuple2<Integer, String>>();
-               ArrayList<Tuple1<Integer>> in2 = new 
ArrayList<Tuple1<Integer>>();
-
-               in1.add(new Tuple2<Integer, String>(10, "a"));
-               in1.add(new Tuple2<Integer, String>(20, "b"));
-               in1.add(new Tuple2<Integer, String>(20, "x"));
-               in1.add(new Tuple2<Integer, String>(0, "y"));
-
-               in2.add(new Tuple1<Integer>(0));
-               in2.add(new Tuple1<Integer>(5));
-               in2.add(new Tuple1<Integer>(20));
-
-               joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "b"), 20));
-               joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "x"), 20));
-               joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(0, "y"), 0));
-
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(10, "a"), 0));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(10, "a"), 5));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(10, "a"), 20));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "b"), 0));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "b"), 5));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "b"), 20));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "x"), 0));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "x"), 5));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(20, "x"), 20));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(0, "y"), 0));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(0, "y"), 5));
-               crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, 
Integer>(
-                               new Tuple2<Integer, String>(0, "y"), 20));
-
-               DataStream<Tuple2<Integer, String>> inStream1 = 
env.fromCollection(in1);
-               DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
-
-               inStream1
-                               .join(inStream2)
-                               .onWindow(1000, new MyTimestamp<Tuple2<Integer, 
String>>(),
-                                               new 
MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0)
-                               .map(new ResultMap())
-                               .addSink(joinResultSink);
-
-               env.execute();
-
-               assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, 
Integer>>(joinExpectedResults),
-                               new HashSet<Tuple2<Tuple2<Integer, String>, 
Integer>>(joinResultSink.getResult()));
-               assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, 
Integer>>(crossExpectedResults),
-                               new HashSet<Tuple2<Tuple2<Integer, String>, 
Integer>>(crossResultSink.getResult()));
-       }
-
-       private static class ResultMap implements
-                       MapFunction<Tuple2<Tuple2<Integer, String>, 
Tuple1<Integer>>,
-                                       Tuple2<Tuple2<Integer, String>, 
Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple2<Tuple2<Integer, String>, Integer> 
map(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) throws Exception {
-                       return new Tuple2<Tuple2<Integer, String>, 
Integer>(value.f0, value.f1.f0);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index fc9de1d..c116c01 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -47,67 +47,6 @@ public class SelfConnectionTest extends 
StreamingMultipleProgramsTestBase {
        private static List<String> expected;
 
        /**
-        * TODO: enable once new join operator is implemented
-        */
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Ignore
-       @Test
-       public void sameDataStreamTest() {
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(3);
-
-               TestListResultSink<String> resultSink = new 
TestListResultSink<String>();
-
-               Timestamp<Integer> timeStamp = new IntegerTimestamp();
-
-               KeySelector keySelector = new KeySelector<Integer, Integer>() {
-
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public Integer getKey(Integer value) throws Exception {
-                               return value;
-                       }
-               };
-
-               DataStream<Integer> src = env.fromElements(1, 3, 5);
-
-               @SuppressWarnings("unused")
-               DataStreamSink<Tuple2<Integer, Integer>> dataStream =
-                               src.join(src).onWindow(50L, timeStamp, 
timeStamp).where(keySelector).equalTo(keySelector)
-                                               .map(new 
MapFunction<Tuple2<Integer, Integer>, String>() {
-
-                                                       private static final 
long serialVersionUID = 1L;
-
-                                                       @Override
-                                                       public String 
map(Tuple2<Integer, Integer> value) throws Exception {
-                                                               return 
value.toString();
-                                                       }
-                                               })
-                                               .addSink(resultSink);
-
-
-               try {
-                       env.execute();
-
-                       expected = new ArrayList<String>();
-
-                       expected.addAll(Arrays.asList("(1,1)", "(3,3)", 
"(5,5)"));
-
-                       List<String> result = resultSink.getResult();
-
-                       Collections.sort(expected);
-                       Collections.sort(result);
-
-                       assertEquals(expected, result);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail();
-               }
-       }
-
-       /**
         * We connect two different data streams in a chain to a CoMap.
         */
        @Test

Reply via email to