[FLINK-8480][DataStream] Add APIs for Interval Joins.

This adds the Java and Scala API for performing an IntervalJoin.
In jave this will look like:

Example:

```java
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper 
bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});
```

This closes #5482.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42ada8ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42ada8ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42ada8ad

Branch: refs/heads/master
Commit: 42ada8ad9ca28f94d0a0355658330198bbc2b577
Parents: f45b7f7
Author: Florian Schmidt <florian.schmidt.1...@icloud.com>
Authored: Mon Jul 9 12:02:24 2018 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Thu Jul 12 21:03:26 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/index.md              |  15 +
 .../streaming/api/datastream/KeyedStream.java   | 184 ++++
 .../UnsupportedTimeCharacteristicException.java |  35 +
 .../api/functions/co/ProcessJoinFunction.java   |  87 ++
 .../functions/co/TimeBoundedJoinFunction.java   |  87 --
 .../api/operators/co/IntervalJoinOperator.java  | 513 ++++++++++
 .../co/TimeBoundedStreamJoinOperator.java       | 513 ----------
 .../operators/co/IntervalJoinOperatorTest.java  | 941 +++++++++++++++++++
 .../co/TimeBoundedStreamJoinOperatorTest.java   | 941 -------------------
 .../flink/streaming/api/scala/KeyedStream.scala | 106 ++-
 .../api/scala/IntervalJoinITCase.scala          | 130 +++
 .../streaming/runtime/IntervalJoinITCase.java   | 451 +++++++++
 12 files changed, 2461 insertions(+), 1542 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/docs/dev/stream/operators/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/index.md 
b/docs/dev/stream/operators/index.md
index 1dbdef4..422dbbf 100644
--- a/docs/dev/stream/operators/index.md
+++ b/docs/dev/stream/operators/index.md
@@ -310,6 +310,21 @@ dataStream.join(otherStream)
           </td>
         </tr>
         <tr>
+          <td><strong>Interval Join</strong><br>KeyedStream,KeyedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Join two elements e1 and e2 of two keyed streams with a common 
key over a given time interval, so that e1.timestamp + lowerBound <= 
e2.timestamp <= e1.timestamp + upperBound</p>
+    {% highlight java %}
+// this will join the two streams so that
+// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
+keyedStream.intervalJoin(otherKeyedStream)
+    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper 
bound
+    .upperBoundExclusive(true) // optional
+    .lowerBoundExclusive(true) // optional
+    .process(new IntervalJoinFunction() {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
           <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
           <td>
             <p>Cogroups two data streams on a given key and a common 
window.</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index a948ae2..32a5c96 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.streaming.api.functions.ProcessFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
 import 
org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
 import 
org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -51,6 +52,7 @@ import 
org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -76,6 +78,8 @@ import java.util.List;
 import java.util.Stack;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link KeyedStream} represents a {@link DataStream} on which operator 
state is
  * partitioned by key using a provided {@link KeySelector}. Typical operations 
supported by a
@@ -396,6 +400,186 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
        }
 
        // 
------------------------------------------------------------------------
+       //  Joining
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Join elements of this {@link KeyedStream} with elements of another 
{@link KeyedStream} over
+        * a time interval that can be specified with {@link 
IntervalJoin#between(Time, Time)}.
+        *
+        * @param otherStream The other keyed stream to join this keyed stream 
with
+        * @param <T1> Type parameter of elements in the other stream
+        * @return An instance of {@link IntervalJoin} with this keyed stream 
and the other keyed stream
+        */
+       @PublicEvolving
+       public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> 
otherStream) {
+               return new IntervalJoin<>(this, otherStream);
+       }
+
+       /**
+        * Perform a join over a time interval.
+        * @param <T1> The type parameter of the elements in the first streams
+        * @param <T2> The The type parameter of the elements in the second 
stream
+        */
+       @PublicEvolving
+       public static class IntervalJoin<T1, T2, KEY> {
+
+               private final KeyedStream<T1, KEY> streamOne;
+               private final KeyedStream<T2, KEY> streamTwo;
+
+               IntervalJoin(
+                               KeyedStream<T1, KEY> streamOne,
+                               KeyedStream<T2, KEY> streamTwo
+               ) {
+                       this.streamOne = checkNotNull(streamOne);
+                       this.streamTwo = checkNotNull(streamTwo);
+               }
+
+               /**
+                * Specifies the time boundaries over which the join operation 
works, so that
+                * <pre>leftElement.timestamp + lowerBound <= 
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+                * By default both the lower and the upper bound are inclusive. 
This can be configured
+                * with {@link IntervalJoined#lowerBoundExclusive()} and
+                * {@link IntervalJoined#upperBoundExclusive()}
+                *
+                * @param lowerBound The lower bound. Needs to be smaller than 
or equal to the upperBound
+                * @param upperBound The upper bound. Needs to be bigger than 
or equal to the lowerBound
+                */
+               @PublicEvolving
+               public IntervalJoined<T1, T2, KEY> between(Time lowerBound, 
Time upperBound) {
+
+                       TimeCharacteristic timeCharacteristic =
+                               
streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+                       if (timeCharacteristic != TimeCharacteristic.EventTime) 
{
+                               throw new 
UnsupportedTimeCharacteristicException("Time-bounded stream joins are only 
supported in event time");
+                       }
+
+                       checkNotNull(lowerBound, "A lower bound needs to be 
provided for a time-bounded join");
+                       checkNotNull(upperBound, "An upper bound needs to be 
provided for a time-bounded join");
+
+                       return new IntervalJoined<>(
+                               streamOne,
+                               streamTwo,
+                               lowerBound.toMilliseconds(),
+                               upperBound.toMilliseconds(),
+                               true,
+                               true
+                       );
+               }
+       }
+
+       /**
+        * IntervalJoined is a container for two streams that have keys for 
both sides as well as
+        * the time boundaries over which elements should be joined.
+        *
+        * @param <IN1> Input type of elements from the first stream
+        * @param <IN2> Input type of elements from the second stream
+        * @param <KEY> The type of the key
+        */
+       @PublicEvolving
+       public static class IntervalJoined<IN1, IN2, KEY> {
+
+               private static final String INTERVAL_JOIN_FUNC_NAME = 
"IntervalJoin";
+
+               private final KeyedStream<IN1, KEY> left;
+               private final KeyedStream<IN2, KEY> right;
+
+               private final long lowerBound;
+               private final long upperBound;
+
+               private final KeySelector<IN1, KEY> keySelector1;
+               private final KeySelector<IN2, KEY> keySelector2;
+
+               private boolean lowerBoundInclusive;
+               private boolean upperBoundInclusive;
+
+               public IntervalJoined(
+                               KeyedStream<IN1, KEY> left,
+                               KeyedStream<IN2, KEY> right,
+                               long lowerBound,
+                               long upperBound,
+                               boolean lowerBoundInclusive,
+                               boolean upperBoundInclusive) {
+
+                       this.left = checkNotNull(left);
+                       this.right = checkNotNull(right);
+
+                       this.lowerBound = lowerBound;
+                       this.upperBound = upperBound;
+
+                       this.lowerBoundInclusive = lowerBoundInclusive;
+                       this.upperBoundInclusive = upperBoundInclusive;
+
+                       this.keySelector1 = left.getKeySelector();
+                       this.keySelector2 = right.getKeySelector();
+               }
+
+               /**
+                * Set the upper bound to be exclusive.
+                */
+               @PublicEvolving
+               public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
+                       this.upperBoundInclusive = false;
+                       return this;
+               }
+
+               /**
+                * Set the lower bound to be exclusive.
+                */
+               @PublicEvolving
+               public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
+                       this.lowerBoundInclusive = false;
+                       return this;
+               }
+
+               /**
+                * Completes the join operation with the user function that is 
executed for each joined pair
+                * of elements.
+                * @param udf The user-defined function
+                * @param <OUT> The output type
+                * @return Returns a DataStream
+                */
+               @PublicEvolving
+               public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, 
IN2, OUT> udf) {
+
+                       ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = 
left.getExecutionEnvironment().clean(udf);
+
+                       TypeInformation<OUT> resultType = 
TypeExtractor.getBinaryOperatorReturnType(
+                               cleanedUdf,
+                               ProcessJoinFunction.class,    // 
ProcessJoinFunction<IN1, IN2, OUT>
+                               0,     //                                       
    0    1    2
+                               1,
+                               2,
+                               new int[]{0},                   // lambda input 
1 type arg indices
+                               new int[]{1},                   // lambda input 
1 type arg indices
+                               TypeExtractor.NO_INDEX,         // output arg 
indices
+                               left.getType(),                 // input 1 type 
information
+                               right.getType(),                // input 2 type 
information
+                               INTERVAL_JOIN_FUNC_NAME ,
+                               false
+                       );
+
+                       IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
+                               new IntervalJoinOperator<>(
+                                       lowerBound,
+                                       upperBound,
+                                       lowerBoundInclusive,
+                                       upperBoundInclusive,
+                                       
left.getType().createSerializer(left.getExecutionConfig()),
+                                       
right.getType().createSerializer(right.getExecutionConfig()),
+                                       cleanedUdf
+                               );
+
+                       return left
+                               .connect(right)
+                               .keyBy(keySelector1, keySelector2)
+                               .transform(INTERVAL_JOIN_FUNC_NAME , 
resultType, operator);
+
+               }
+       }
+
+       // 
------------------------------------------------------------------------
        //  Windowing
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
new file mode 100644
index 0000000..cb2570a
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.FlinkRuntimeException;
+
+/**
+ * An exception that indicates that a time characteristic was used that is not 
supported in the
+ * current operation.
+ */
+@PublicEvolving
+public class UnsupportedTimeCharacteristicException extends 
FlinkRuntimeException {
+
+       private static final long serialVersionUID = -8109094930338075819L;
+
+       public UnsupportedTimeCharacteristicException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
new file mode 100644
index 0000000..2c39abc
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * A function that processes two joined elements and produces a single output 
one.
+ *
+ * <p>This function will get called for every joined pair of elements the 
joined two streams.
+ * The timestamp of the joined pair as well as the timestamp of the left 
element and the right
+ * element can be accessed through the {@link Context}.
+ *
+ * @param <IN1> Type of the first input
+ * @param <IN2> Type of the second input
+ * @param <OUT> Type of the output
+ */
+@PublicEvolving
+public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends 
AbstractRichFunction {
+
+       private static final long serialVersionUID = -2444626938039012398L;
+
+       /**
+        * This method is called for each joined pair of elements. It can 
output zero or more elements
+        * through the provided {@link Collector} and has access to the 
timestamps of the joined elements
+        * and the result through the {@link Context}.
+        *
+        * @param left         The left element of the joined pair.
+        * @param right        The right element of the joined pair.
+        * @param ctx          A context that allows querying the timestamps of 
the left, right and
+        *                     joined pair. In addition, this context allows to 
emit elements on a side output.
+        * @param out          The collector to emit resulting elements to.
+        * @throws Exception   This function may throw exceptions which cause 
the streaming program to
+        *                                         fail and go in recovery mode.
+        */
+       public abstract void processElement(IN1 left, IN2 right, Context ctx, 
Collector<OUT> out) throws Exception;
+
+       /**
+        * The context that is available during an invocation of
+        * {@link #processElement(Object, Object, Context, Collector)}. It 
gives access to the timestamps of the
+        * left element in the joined pair, the right one, and that of the 
joined pair. In addition, this context
+        * allows to emit elements on a side output.
+        */
+       public abstract class Context {
+
+               /**
+                * @return The timestamp of the left element of a joined pair
+                */
+               public abstract long getLeftTimestamp();
+
+               /**
+                * @return The timestamp of the right element of a joined pair
+                */
+               public abstract long getRightTimestamp();
+
+               /**
+                * @return The timestamp of the joined pair.
+                */
+               public abstract long getTimestamp();
+
+               /**
+                * Emits a record to the side output identified by the {@link 
OutputTag}.
+                * @param outputTag The output tag that identifies the side 
output to emit to
+                * @param value The record to emit
+                */
+               public abstract <X> void output(OutputTag<X> outputTag, X 
value);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
deleted file mode 100644
index cd745ca..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * A function that processes two joined elements and produces a single output 
one.
- *
- * <p>This function will get called for every joined pair of elements the 
joined two streams.
- * The timestamp of the joined pair as well as the timestamp of the left 
element and the right
- * element can be accessed through the {@link Context}.
- *
- * @param <IN1> Type of the first input
- * @param <IN2> Type of the second input
- * @param <OUT> Type of the output
- */
-@PublicEvolving
-public abstract class TimeBoundedJoinFunction<IN1, IN2, OUT> extends 
AbstractRichFunction {
-
-       private static final long serialVersionUID = -2444626938039012398L;
-
-       /**
-        * This method is called for each joined pair of elements. It can 
output zero or more elements
-        * through the provided {@link Collector} and has access to the 
timestamps of the joined elements
-        * and the result through the {@link Context}.
-        *
-        * @param left         The left element of the joined pair.
-        * @param right        The right element of the joined pair.
-        * @param ctx          A context that allows querying the timestamps of 
the left, right and
-        *                     joined pair. In addition, this context allows to 
emit elements on a side output.
-        * @param out          The collector to emit resulting elements to.
-        * @throws Exception   This function may throw exceptions which cause 
the streaming program to
-        *                                         fail and go in recovery mode.
-        */
-       public abstract void processElement(IN1 left, IN2 right, Context ctx, 
Collector<OUT> out) throws Exception;
-
-       /**
-        * The context that is available during an invocation of
-        * {@link #processElement(Object, Object, Context, Collector)}. It 
gives access to the timestamps of the
-        * left element in the joined pair, the right one, and that of the 
joined pair. In addition, this context
-        * allows to emit elements on a side output.
-        */
-       public abstract class Context {
-
-               /**
-                * @return The timestamp of the left element of a joined pair
-                */
-               public abstract long getLeftTimestamp();
-
-               /**
-                * @return The timestamp of the right element of a joined pair
-                */
-               public abstract long getRightTimestamp();
-
-               /**
-                * @return The timestamp of the joined pair.
-                */
-               public abstract long getTimestamp();
-
-               /**
-                * Emits a record to the side output identified by the {@link 
OutputTag}.
-                * @param outputTag The output tag that identifies the side 
output to emit to
-                * @param value The record to emit
-                */
-               public abstract <X> void output(OutputTag<X> outputTag, X 
value);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
new file mode 100644
index 0000000..0c449e6
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * An {@link TwoInputStreamOperator operator} to execute time-bounded stream 
inner joins.
+ *
+ * <p>By using a configurable lower and upper bound this operator will emit 
exactly those pairs
+ * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the 
lower and the
+ * upper bound can be configured to be either inclusive or exclusive.
+ *
+ * <p>As soon as elements are joined they are passed to a user-defined {@link 
ProcessJoinFunction}.
+ *
+ * <p>The basic idea of this implementation is as follows: Whenever we receive 
an element at
+ * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to 
the left buffer.
+ * We then check the right buffer to see whether there are any elements that 
can be joined. If
+ * there are, they are joined and passed to the aforementioned function. The 
same happens the
+ * other way around when receiving an element on the right side.
+ *
+ * <p>Whenever a pair of elements is emitted it will be assigned the max 
timestamp of either of
+ * the elements.
+ *
+ * <p>In order to avoid the element buffers to grow indefinitely a cleanup 
timer is registered
+ * per element. This timer indicates when an element is not considered for 
joining anymore and can
+ * be removed from the state.
+ *
+ * @param <K>  The type of the key based on which we join elements.
+ * @param <T1> The type of the elements in the left stream.
+ * @param <T2> The type of the elements in the right stream.
+ * @param <OUT>        The output type created by the user-defined function.
+ */
+@Internal
+public class IntervalJoinOperator<K, T1, T2, OUT>
+               extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, 
T2, OUT>>
+               implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, 
String> {
+
+       private static final long serialVersionUID = -5380774605111543454L;
+
+       private static final Logger logger = 
LoggerFactory.getLogger(IntervalJoinOperator.class);
+
+       private static final String LEFT_BUFFER = "LEFT_BUFFER";
+       private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
+       private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
+       private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
+       private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
+
+       private final long lowerBound;
+       private final long upperBound;
+
+       private final TypeSerializer<T1> leftTypeSerializer;
+       private final TypeSerializer<T2> rightTypeSerializer;
+
+       private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
+       private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
+
+       private transient TimestampedCollector<OUT> collector;
+       private transient ContextImpl context;
+
+       private transient InternalTimerService<String> internalTimerService;
+
+       /**
+        * Creates a new IntervalJoinOperator.
+        *
+        * @param lowerBound          The lower bound for evaluating if 
elements should be joined
+        * @param upperBound          The upper bound for evaluating if 
elements should be joined
+        * @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
+        *                            the lower bound
+        * @param upperBoundInclusive Whether or not to include elements where 
the timestamp matches
+        *                            the upper bound
+        * @param udf                 A user-defined {@link 
ProcessJoinFunction} that gets called
+        *                            whenever two elements of T1 and T2 are 
joined
+        */
+       public IntervalJoinOperator(
+                       long lowerBound,
+                       long upperBound,
+                       boolean lowerBoundInclusive,
+                       boolean upperBoundInclusive,
+                       TypeSerializer<T1> leftTypeSerializer,
+                       TypeSerializer<T2> rightTypeSerializer,
+                       ProcessJoinFunction<T1, T2, OUT> udf) {
+
+               super(Preconditions.checkNotNull(udf));
+
+               Preconditions.checkArgument(lowerBound <= upperBound,
+                       "lowerBound <= upperBound must be fulfilled");
+
+               // Move buffer by +1 / -1 depending on inclusiveness in order 
not needing
+               // to check for inclusiveness later on
+               this.lowerBound = (lowerBoundInclusive) ? lowerBound : 
lowerBound + 1L;
+               this.upperBound = (upperBoundInclusive) ? upperBound : 
upperBound - 1L;
+
+               this.leftTypeSerializer = 
Preconditions.checkNotNull(leftTypeSerializer);
+               this.rightTypeSerializer = 
Preconditions.checkNotNull(rightTypeSerializer);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               collector = new TimestampedCollector<>(output);
+               context = new ContextImpl(userFunction);
+               internalTimerService =
+                       getInternalTimerService(CLEANUP_TIMER_NAME, 
StringSerializer.INSTANCE, this);
+       }
+
+       @Override
+       public void initializeState(StateInitializationContext context) throws 
Exception {
+               super.initializeState(context);
+
+               this.leftBuffer = context.getKeyedStateStore().getMapState(new 
MapStateDescriptor<>(
+                       LEFT_BUFFER,
+                       LongSerializer.INSTANCE,
+                       new ListSerializer<>(new 
BufferEntrySerializer<>(leftTypeSerializer))
+               ));
+
+               this.rightBuffer = context.getKeyedStateStore().getMapState(new 
MapStateDescriptor<>(
+                       RIGHT_BUFFER,
+                       LongSerializer.INSTANCE,
+                       new ListSerializer<>(new 
BufferEntrySerializer<>(rightTypeSerializer))
+               ));
+       }
+
+       /**
+        * Process a {@link StreamRecord} from the left stream. Whenever an 
{@link StreamRecord}
+        * arrives at the left stream, it will get added to the left buffer. 
Possible join candidates
+        * for that element will be looked up from the right buffer and if the 
pair lies within the
+        * user defined boundaries, it gets passed to the {@link 
ProcessJoinFunction}.
+        *
+        * @param record An incoming record to be joined
+        * @throws Exception Can throw an Exception during state access
+        */
+       @Override
+       public void processElement1(StreamRecord<T1> record) throws Exception {
+               processElement(record, leftBuffer, rightBuffer, lowerBound, 
upperBound, true);
+       }
+
+       /**
+        * Process a {@link StreamRecord} from the right stream. Whenever a 
{@link StreamRecord}
+        * arrives at the right stream, it will get added to the right buffer. 
Possible join candidates
+        * for that element will be looked up from the left buffer and if the 
pair lies within the user
+        * defined boundaries, it gets passed to the {@link 
ProcessJoinFunction}.
+        *
+        * @param record An incoming record to be joined
+        * @throws Exception Can throw an exception during state access
+        */
+       @Override
+       public void processElement2(StreamRecord<T2> record) throws Exception {
+               processElement(record, rightBuffer, leftBuffer, -upperBound, 
-lowerBound, false);
+       }
+
+       @SuppressWarnings("unchecked")
+       private <OUR, OTHER> void processElement(
+                       StreamRecord<OUR> record,
+                       MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
+                       MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
+                       long relativeLowerBound,
+                       long relativeUpperBound,
+                       boolean isLeft) throws Exception {
+
+               final OUR ourValue = record.getValue();
+               final long ourTimestamp = record.getTimestamp();
+
+               if (ourTimestamp == Long.MIN_VALUE) {
+                       throw new FlinkException("Long.MIN_VALUE timestamp: 
Elements used in " +
+                                       "interval stream joins need to have 
timestamps meaningful timestamps.");
+               }
+
+               if (isLate(ourTimestamp)) {
+                       return;
+               }
+
+               addToBuffer(ourBuffer, ourValue, ourTimestamp);
+
+               for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: 
otherBuffer.entries()) {
+                       final long timestamp  = bucket.getKey();
+
+                       if (timestamp < ourTimestamp + relativeLowerBound ||
+                                       timestamp > ourTimestamp + 
relativeUpperBound) {
+                               continue;
+                       }
+
+                       for (BufferEntry<OTHER> entry: bucket.getValue()) {
+                               if (isLeft) {
+                                       collect((T1) ourValue, (T2) 
entry.element, ourTimestamp, timestamp);
+                               } else {
+                                       collect((T1) entry.element, (T2) 
ourValue, timestamp, ourTimestamp);
+                               }
+                       }
+               }
+
+               long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + 
relativeUpperBound : ourTimestamp;
+               if (isLeft) {
+                       
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, 
cleanupTime);
+               } else {
+                       
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, 
cleanupTime);
+               }
+       }
+
+       private boolean isLate(long timestamp) {
+               long currentWatermark = internalTimerService.currentWatermark();
+               return currentWatermark != Long.MIN_VALUE && timestamp < 
currentWatermark;
+       }
+
+       private void collect(T1 left, T2 right, long leftTimestamp, long 
rightTimestamp) throws Exception {
+               long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
+               collector.setAbsoluteTimestamp(resultTimestamp);
+               context.leftTimestamp = leftTimestamp;
+               context.rightTimestamp = rightTimestamp;
+               userFunction.processElement(left, right, context, collector);
+       }
+
+       private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> 
buffer, T value, long timestamp) throws Exception {
+               List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
+               if (elemsInBucket == null) {
+                       elemsInBucket = new ArrayList<>();
+               }
+               elemsInBucket.add(new BufferEntry<>(value, false));
+               buffer.put(timestamp, elemsInBucket);
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, String> timer) throws 
Exception {
+
+               long timerTimestamp = timer.getTimestamp();
+               String namespace = timer.getNamespace();
+
+               logger.trace("onEventTime @ {}", timerTimestamp);
+
+               switch (namespace) {
+                       case CLEANUP_NAMESPACE_LEFT: {
+                               long timestamp = (upperBound <= 0L) ? 
timerTimestamp : timerTimestamp - upperBound;
+                               logger.trace("Removing from left buffer @ {}", 
timestamp);
+                               leftBuffer.remove(timestamp);
+                               break;
+                       }
+                       case CLEANUP_NAMESPACE_RIGHT: {
+                               long timestamp = (lowerBound <= 0L) ? 
timerTimestamp + lowerBound : timerTimestamp;
+                               logger.trace("Removing from right buffer @ {}", 
timestamp);
+                               rightBuffer.remove(timestamp);
+                               break;
+                       }
+                       default:
+                               throw new RuntimeException("Invalid namespace " 
+ namespace);
+               }
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, String> timer) throws 
Exception {
+               // do nothing.
+       }
+
+       /**
+        * The context that is available during an invocation of
+        * {@link ProcessJoinFunction#processElement(Object, Object, 
ProcessJoinFunction.Context, Collector)}.
+        *
+        * <p>It gives access to the timestamps of the left element in the 
joined pair, the right one, and that of
+        * the joined pair. In addition, this context allows to emit elements 
on a side output.
+        */
+       private final class ContextImpl extends ProcessJoinFunction<T1, T2, 
OUT>.Context {
+
+               private long leftTimestamp = Long.MIN_VALUE;
+
+               private long rightTimestamp = Long.MIN_VALUE;
+
+               private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) {
+                       func.super();
+               }
+
+               @Override
+               public long getLeftTimestamp() {
+                       return leftTimestamp;
+               }
+
+               @Override
+               public long getRightTimestamp() {
+                       return rightTimestamp;
+               }
+
+               @Override
+               public long getTimestamp() {
+                       return leftTimestamp;
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       Preconditions.checkArgument(outputTag != null, 
"OutputTag must not be null");
+                       output.collect(outputTag, new StreamRecord<>(value, 
getTimestamp()));
+               }
+       }
+
+       /**
+        * A container for elements put in the left/write buffer.
+        * This will contain the element itself along with a flag indicating
+        * if it has been joined or not.
+        */
+       private static class BufferEntry<T> {
+
+               private final T element;
+               private final boolean hasBeenJoined;
+
+               BufferEntry(T element, boolean hasBeenJoined) {
+                       this.element = element;
+                       this.hasBeenJoined = hasBeenJoined;
+               }
+       }
+
+       /**
+        * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
+        */
+       private static class BufferEntrySerializer<T> extends 
TypeSerializer<BufferEntry<T>> {
+
+               private static final long serialVersionUID = 
-20197698803836236L;
+
+               private final TypeSerializer<T> elementSerializer;
+
+               private BufferEntrySerializer(TypeSerializer<T> 
elementSerializer) {
+                       this.elementSerializer = 
Preconditions.checkNotNull(elementSerializer);
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return true;
+               }
+
+               @Override
+               public TypeSerializer<BufferEntry<T>> duplicate() {
+                       return new 
BufferEntrySerializer<>(elementSerializer.duplicate());
+               }
+
+               @Override
+               public BufferEntry<T> createInstance() {
+                       return null;
+               }
+
+               @Override
+               public BufferEntry<T> copy(BufferEntry<T> from) {
+                       return new BufferEntry<>(from.element, 
from.hasBeenJoined);
+               }
+
+               @Override
+               public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> 
reuse) {
+                       return copy(from);
+               }
+
+               @Override
+               public int getLength() {
+                       return -1;
+               }
+
+               @Override
+               public void serialize(BufferEntry<T> record, DataOutputView 
target) throws IOException {
+                       target.writeBoolean(record.hasBeenJoined);
+                       elementSerializer.serialize(record.element, target);
+               }
+
+               @Override
+               public BufferEntry<T> deserialize(DataInputView source) throws 
IOException {
+                       boolean hasBeenJoined = source.readBoolean();
+                       T element = elementSerializer.deserialize(source);
+                       return new BufferEntry<>(element, hasBeenJoined);
+               }
+
+               @Override
+               public BufferEntry<T> deserialize(BufferEntry<T> reuse, 
DataInputView source) throws IOException {
+                       return deserialize(source);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       target.writeBoolean(source.readBoolean());
+                       elementSerializer.copy(source, target);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       BufferEntrySerializer<?> that = 
(BufferEntrySerializer<?>) o;
+                       return Objects.equals(elementSerializer, 
that.elementSerializer);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(elementSerializer);
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return 
obj.getClass().equals(BufferEntrySerializer.class);
+               }
+
+               @Override
+               public TypeSerializerConfigSnapshot snapshotConfiguration() {
+                       return new 
BufferSerializerConfigSnapshot<>(elementSerializer);
+               }
+
+               @Override
+               public CompatibilityResult<BufferEntry<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+                       if (configSnapshot instanceof 
BufferSerializerConfigSnapshot) {
+                               Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> previousSerializerAndConfig =
+                                               
((BufferSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
+
+                               CompatibilityResult<T> compatResult =
+                                               
CompatibilityUtil.resolveCompatibilityResult(
+                                                               
previousSerializerAndConfig.f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               
previousSerializerAndConfig.f1,
+                                                               
elementSerializer);
+
+                               if (!compatResult.isRequiresMigration()) {
+                                       return CompatibilityResult.compatible();
+                               } else if 
(compatResult.getConvertDeserializer() != null) {
+                                       return 
CompatibilityResult.requiresMigration(
+                                                       new 
BufferEntrySerializer<>(
+                                                                       new 
TypeDeserializerAdapter<>(
+                                                                               
        compatResult.getConvertDeserializer())));
+                               }
+                       }
+                       return CompatibilityResult.requiresMigration();
+               }
+       }
+
+       /**
+        * The {@link CompositeTypeSerializerConfigSnapshot configuration} of 
our serializer.
+        */
+       public static class BufferSerializerConfigSnapshot<T> extends 
CompositeTypeSerializerConfigSnapshot {
+
+               private static final int VERSION = 1;
+
+               public BufferSerializerConfigSnapshot() {
+               }
+
+               public BufferSerializerConfigSnapshot(final TypeSerializer<T> 
userTypeSerializer) {
+                       super(userTypeSerializer);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
+
+       @VisibleForTesting
+       MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
+               return leftBuffer;
+       }
+
+       @VisibleForTesting
+       MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
+               return rightBuffer;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
deleted file mode 100644
index 26ad26b..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * An {@link TwoInputStreamOperator operator} to execute time-bounded stream 
inner joins.
- *
- * <p>By using a configurable lower and upper bound this operator will emit 
exactly those pairs
- * (T1, T2) where t2.ts ∈ [T1.ts + lowerBound, T1.ts + upperBound]. Both the 
lower and the
- * upper bound can be configured to be either inclusive or exclusive.
- *
- * <p>As soon as elements are joined they are passed to a user-defined {@link 
TimeBoundedJoinFunction}.
- *
- * <p>The basic idea of this implementation is as follows: Whenever we receive 
an element at
- * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to 
the left buffer.
- * We then check the right buffer to see whether there are any elements that 
can be joined. If
- * there are, they are joined and passed to the aforementioned function. The 
same happens the
- * other way around when receiving an element on the right side.
- *
- * <p>Whenever a pair of elements is emitted it will be assigned the max 
timestamp of either of
- * the elements.
- *
- * <p>In order to avoid the element buffers to grow indefinitely a cleanup 
timer is registered
- * per element. This timer indicates when an element is not considered for 
joining anymore and can
- * be removed from the state.
- *
- * @param <K>  The type of the key based on which we join elements.
- * @param <T1> The type of the elements in the left stream.
- * @param <T2> The type of the elements in the right stream.
- * @param <OUT>        The output type created by the user-defined function.
- */
-@Internal
-public class TimeBoundedStreamJoinOperator<K, T1, T2, OUT>
-               extends AbstractUdfStreamOperator<OUT, 
TimeBoundedJoinFunction<T1, T2, OUT>>
-               implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, 
String> {
-
-       private static final long serialVersionUID = -5380774605111543454L;
-
-       private static final Logger logger = 
LoggerFactory.getLogger(TimeBoundedStreamJoinOperator.class);
-
-       private static final String LEFT_BUFFER = "LEFT_BUFFER";
-       private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
-       private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
-       private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
-       private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";
-
-       private final long lowerBound;
-       private final long upperBound;
-
-       private final TypeSerializer<T1> leftTypeSerializer;
-       private final TypeSerializer<T2> rightTypeSerializer;
-
-       private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
-       private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
-
-       private transient TimestampedCollector<OUT> collector;
-       private transient ContextImpl context;
-
-       private transient InternalTimerService<String> internalTimerService;
-
-       /**
-        * Creates a new TimeBoundedStreamJoinOperator.
-        *
-        * @param lowerBound          The lower bound for evaluating if 
elements should be joined
-        * @param upperBound          The upper bound for evaluating if 
elements should be joined
-        * @param lowerBoundInclusive Whether or not to include elements where 
the timestamp matches
-        *                            the lower bound
-        * @param upperBoundInclusive Whether or not to include elements where 
the timestamp matches
-        *                            the upper bound
-        * @param udf                 A user-defined {@link 
TimeBoundedJoinFunction} that gets called
-        *                            whenever two elements of T1 and T2 are 
joined
-        */
-       public TimeBoundedStreamJoinOperator(
-                       long lowerBound,
-                       long upperBound,
-                       boolean lowerBoundInclusive,
-                       boolean upperBoundInclusive,
-                       TypeSerializer<T1> leftTypeSerializer,
-                       TypeSerializer<T2> rightTypeSerializer,
-                       TimeBoundedJoinFunction<T1, T2, OUT> udf) {
-
-               super(Preconditions.checkNotNull(udf));
-
-               Preconditions.checkArgument(lowerBound <= upperBound,
-                       "lowerBound <= upperBound must be fulfilled");
-
-               // Move buffer by +1 / -1 depending on inclusiveness in order 
not needing
-               // to check for inclusiveness later on
-               this.lowerBound = (lowerBoundInclusive) ? lowerBound : 
lowerBound + 1L;
-               this.upperBound = (upperBoundInclusive) ? upperBound : 
upperBound - 1L;
-
-               this.leftTypeSerializer = 
Preconditions.checkNotNull(leftTypeSerializer);
-               this.rightTypeSerializer = 
Preconditions.checkNotNull(rightTypeSerializer);
-       }
-
-       @Override
-       public void open() throws Exception {
-               super.open();
-               collector = new TimestampedCollector<>(output);
-               context = new ContextImpl(userFunction);
-               internalTimerService =
-                       getInternalTimerService(CLEANUP_TIMER_NAME, 
StringSerializer.INSTANCE, this);
-       }
-
-       @Override
-       public void initializeState(StateInitializationContext context) throws 
Exception {
-               super.initializeState(context);
-
-               this.leftBuffer = context.getKeyedStateStore().getMapState(new 
MapStateDescriptor<>(
-                       LEFT_BUFFER,
-                       LongSerializer.INSTANCE,
-                       new ListSerializer<>(new 
BufferEntrySerializer<>(leftTypeSerializer))
-               ));
-
-               this.rightBuffer = context.getKeyedStateStore().getMapState(new 
MapStateDescriptor<>(
-                       RIGHT_BUFFER,
-                       LongSerializer.INSTANCE,
-                       new ListSerializer<>(new 
BufferEntrySerializer<>(rightTypeSerializer))
-               ));
-       }
-
-       /**
-        * Process a {@link StreamRecord} from the left stream. Whenever an 
{@link StreamRecord}
-        * arrives at the left stream, it will get added to the left buffer. 
Possible join candidates
-        * for that element will be looked up from the right buffer and if the 
pair lies within the
-        * user defined boundaries, it gets passed to the {@link 
TimeBoundedJoinFunction}.
-        *
-        * @param record An incoming record to be joined
-        * @throws Exception Can throw an Exception during state access
-        */
-       @Override
-       public void processElement1(StreamRecord<T1> record) throws Exception {
-               processElement(record, leftBuffer, rightBuffer, lowerBound, 
upperBound, true);
-       }
-
-       /**
-        * Process a {@link StreamRecord} from the right stream. Whenever a 
{@link StreamRecord}
-        * arrives at the right stream, it will get added to the right buffer. 
Possible join candidates
-        * for that element will be looked up from the left buffer and if the 
pair lies within the user
-        * defined boundaries, it gets passed to the {@link 
TimeBoundedJoinFunction}.
-        *
-        * @param record An incoming record to be joined
-        * @throws Exception Can throw an exception during state access
-        */
-       @Override
-       public void processElement2(StreamRecord<T2> record) throws Exception {
-               processElement(record, rightBuffer, leftBuffer, -upperBound, 
-lowerBound, false);
-       }
-
-       @SuppressWarnings("unchecked")
-       private <OUR, OTHER> void processElement(
-                       StreamRecord<OUR> record,
-                       MapState<Long, List<BufferEntry<OUR>>> ourBuffer,
-                       MapState<Long, List<BufferEntry<OTHER>>> otherBuffer,
-                       long relativeLowerBound,
-                       long relativeUpperBound,
-                       boolean isLeft) throws Exception {
-
-               final OUR ourValue = record.getValue();
-               final long ourTimestamp = record.getTimestamp();
-
-               if (ourTimestamp == Long.MIN_VALUE) {
-                       throw new FlinkException("Long.MIN_VALUE timestamp: 
Elements used in " +
-                                       "interval stream joins need to have 
timestamps meaningful timestamps.");
-               }
-
-               if (isLate(ourTimestamp)) {
-                       return;
-               }
-
-               addToBuffer(ourBuffer, ourValue, ourTimestamp);
-
-               for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: 
otherBuffer.entries()) {
-                       final long timestamp  = bucket.getKey();
-
-                       if (timestamp < ourTimestamp + relativeLowerBound ||
-                                       timestamp > ourTimestamp + 
relativeUpperBound) {
-                               continue;
-                       }
-
-                       for (BufferEntry<OTHER> entry: bucket.getValue()) {
-                               if (isLeft) {
-                                       collect((T1) ourValue, (T2) 
entry.element, ourTimestamp, timestamp);
-                               } else {
-                                       collect((T1) entry.element, (T2) 
ourValue, timestamp, ourTimestamp);
-                               }
-                       }
-               }
-
-               long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + 
relativeUpperBound : ourTimestamp;
-               if (isLeft) {
-                       
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, 
cleanupTime);
-               } else {
-                       
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, 
cleanupTime);
-               }
-       }
-
-       private boolean isLate(long timestamp) {
-               long currentWatermark = internalTimerService.currentWatermark();
-               return currentWatermark != Long.MIN_VALUE && timestamp < 
currentWatermark;
-       }
-
-       private void collect(T1 left, T2 right, long leftTimestamp, long 
rightTimestamp) throws Exception {
-               long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);
-               collector.setAbsoluteTimestamp(resultTimestamp);
-               context.leftTimestamp = leftTimestamp;
-               context.rightTimestamp = rightTimestamp;
-               userFunction.processElement(left, right, context, collector);
-       }
-
-       private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> 
buffer, T value, long timestamp) throws Exception {
-               List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
-               if (elemsInBucket == null) {
-                       elemsInBucket = new ArrayList<>();
-               }
-               elemsInBucket.add(new BufferEntry<>(value, false));
-               buffer.put(timestamp, elemsInBucket);
-       }
-
-       @Override
-       public void onEventTime(InternalTimer<K, String> timer) throws 
Exception {
-
-               long timerTimestamp = timer.getTimestamp();
-               String namespace = timer.getNamespace();
-
-               logger.trace("onEventTime @ {}", timerTimestamp);
-
-               switch (namespace) {
-                       case CLEANUP_NAMESPACE_LEFT: {
-                               long timestamp = (upperBound <= 0L) ? 
timerTimestamp : timerTimestamp - upperBound;
-                               logger.trace("Removing from left buffer @ {}", 
timestamp);
-                               leftBuffer.remove(timestamp);
-                               break;
-                       }
-                       case CLEANUP_NAMESPACE_RIGHT: {
-                               long timestamp = (lowerBound <= 0L) ? 
timerTimestamp + lowerBound : timerTimestamp;
-                               logger.trace("Removing from right buffer @ {}", 
timestamp);
-                               rightBuffer.remove(timestamp);
-                               break;
-                       }
-                       default:
-                               throw new RuntimeException("Invalid namespace " 
+ namespace);
-               }
-       }
-
-       @Override
-       public void onProcessingTime(InternalTimer<K, String> timer) throws 
Exception {
-               // do nothing.
-       }
-
-       /**
-        * The context that is available during an invocation of
-        * {@link TimeBoundedJoinFunction#processElement(Object, Object, 
TimeBoundedJoinFunction.Context, Collector)}.
-        *
-        * <p>It gives access to the timestamps of the left element in the 
joined pair, the right one, and that of
-        * the joined pair. In addition, this context allows to emit elements 
on a side output.
-        */
-       private final class ContextImpl extends TimeBoundedJoinFunction<T1, T2, 
OUT>.Context {
-
-               private long leftTimestamp = Long.MIN_VALUE;
-
-               private long rightTimestamp = Long.MIN_VALUE;
-
-               private ContextImpl(TimeBoundedJoinFunction<T1, T2, OUT> func) {
-                       func.super();
-               }
-
-               @Override
-               public long getLeftTimestamp() {
-                       return leftTimestamp;
-               }
-
-               @Override
-               public long getRightTimestamp() {
-                       return rightTimestamp;
-               }
-
-               @Override
-               public long getTimestamp() {
-                       return leftTimestamp;
-               }
-
-               @Override
-               public <X> void output(OutputTag<X> outputTag, X value) {
-                       Preconditions.checkArgument(outputTag != null, 
"OutputTag must not be null");
-                       output.collect(outputTag, new StreamRecord<>(value, 
getTimestamp()));
-               }
-       }
-
-       /**
-        * A container for elements put in the left/write buffer.
-        * This will contain the element itself along with a flag indicating
-        * if it has been joined or not.
-        */
-       private static class BufferEntry<T> {
-
-               private final T element;
-               private final boolean hasBeenJoined;
-
-               BufferEntry(T element, boolean hasBeenJoined) {
-                       this.element = element;
-                       this.hasBeenJoined = hasBeenJoined;
-               }
-       }
-
-       /**
-        * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
-        */
-       private static class BufferEntrySerializer<T> extends 
TypeSerializer<BufferEntry<T>> {
-
-               private static final long serialVersionUID = 
-20197698803836236L;
-
-               private final TypeSerializer<T> elementSerializer;
-
-               private BufferEntrySerializer(TypeSerializer<T> 
elementSerializer) {
-                       this.elementSerializer = 
Preconditions.checkNotNull(elementSerializer);
-               }
-
-               @Override
-               public boolean isImmutableType() {
-                       return true;
-               }
-
-               @Override
-               public TypeSerializer<BufferEntry<T>> duplicate() {
-                       return new 
BufferEntrySerializer<>(elementSerializer.duplicate());
-               }
-
-               @Override
-               public BufferEntry<T> createInstance() {
-                       return null;
-               }
-
-               @Override
-               public BufferEntry<T> copy(BufferEntry<T> from) {
-                       return new BufferEntry<>(from.element, 
from.hasBeenJoined);
-               }
-
-               @Override
-               public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> 
reuse) {
-                       return copy(from);
-               }
-
-               @Override
-               public int getLength() {
-                       return -1;
-               }
-
-               @Override
-               public void serialize(BufferEntry<T> record, DataOutputView 
target) throws IOException {
-                       target.writeBoolean(record.hasBeenJoined);
-                       elementSerializer.serialize(record.element, target);
-               }
-
-               @Override
-               public BufferEntry<T> deserialize(DataInputView source) throws 
IOException {
-                       boolean hasBeenJoined = source.readBoolean();
-                       T element = elementSerializer.deserialize(source);
-                       return new BufferEntry<>(element, hasBeenJoined);
-               }
-
-               @Override
-               public BufferEntry<T> deserialize(BufferEntry<T> reuse, 
DataInputView source) throws IOException {
-                       return deserialize(source);
-               }
-
-               @Override
-               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
-                       target.writeBoolean(source.readBoolean());
-                       elementSerializer.copy(source, target);
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-
-                       BufferEntrySerializer<?> that = 
(BufferEntrySerializer<?>) o;
-                       return Objects.equals(elementSerializer, 
that.elementSerializer);
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(elementSerializer);
-               }
-
-               @Override
-               public boolean canEqual(Object obj) {
-                       return 
obj.getClass().equals(BufferEntrySerializer.class);
-               }
-
-               @Override
-               public TypeSerializerConfigSnapshot snapshotConfiguration() {
-                       return new 
BufferSerializerConfigSnapshot<>(elementSerializer);
-               }
-
-               @Override
-               public CompatibilityResult<BufferEntry<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       if (configSnapshot instanceof 
BufferSerializerConfigSnapshot) {
-                               Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot> previousSerializerAndConfig =
-                                               
((BufferSerializerConfigSnapshot) 
configSnapshot).getSingleNestedSerializerAndConfig();
-
-                               CompatibilityResult<T> compatResult =
-                                               
CompatibilityUtil.resolveCompatibilityResult(
-                                                               
previousSerializerAndConfig.f0,
-                                                               
UnloadableDummyTypeSerializer.class,
-                                                               
previousSerializerAndConfig.f1,
-                                                               
elementSerializer);
-
-                               if (!compatResult.isRequiresMigration()) {
-                                       return CompatibilityResult.compatible();
-                               } else if 
(compatResult.getConvertDeserializer() != null) {
-                                       return 
CompatibilityResult.requiresMigration(
-                                                       new 
BufferEntrySerializer<>(
-                                                                       new 
TypeDeserializerAdapter<>(
-                                                                               
        compatResult.getConvertDeserializer())));
-                               }
-                       }
-                       return CompatibilityResult.requiresMigration();
-               }
-       }
-
-       /**
-        * The {@link CompositeTypeSerializerConfigSnapshot configuration} of 
our serializer.
-        */
-       public static class BufferSerializerConfigSnapshot<T> extends 
CompositeTypeSerializerConfigSnapshot {
-
-               private static final int VERSION = 1;
-
-               public BufferSerializerConfigSnapshot() {
-               }
-
-               public BufferSerializerConfigSnapshot(final TypeSerializer<T> 
userTypeSerializer) {
-                       super(userTypeSerializer);
-               }
-
-               @Override
-               public int getVersion() {
-                       return VERSION;
-               }
-       }
-
-       @VisibleForTesting
-       MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() {
-               return leftBuffer;
-       }
-
-       @VisibleForTesting
-       MapState<Long, List<BufferEntry<T2>>> getRightBuffer() {
-               return rightBuffer;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
new file mode 100644
index 0000000..ee3f4d8
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+
+/**
+ * Tests for {@link IntervalJoinOperator}.
+ * Those tests cover correctness and cleaning of state
+ */
+@RunWith(Parameterized.class)
+public class IntervalJoinOperatorTest {
+
+       private final boolean lhsFasterThanRhs;
+
+       @Parameters(name = "lhs faster than rhs: {0}")
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][]{
+                       {true}, {false}
+               });
+       }
+
+       public IntervalJoinOperatorTest(boolean lhsFasterThanRhs) {
+               this.lhsFasterThanRhs = lhsFasterThanRhs;
+       }
+
+       @Test
+       public void testImplementationMirrorsCorrectly() throws Exception {
+
+               long lowerBound = 1;
+               long upperBound = 3;
+
+               boolean lowerBoundInclusive = true;
+               boolean upperBoundInclusive = false;
+
+               setupHarness(lowerBound, lowerBoundInclusive, upperBound, 
upperBoundInclusive)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(1, 2),
+                               streamRecordOf(1, 3),
+                               streamRecordOf(2, 3),
+                               streamRecordOf(2, 4),
+                               streamRecordOf(3, 4))
+                       .noLateRecords()
+                       .close();
+
+               setupHarness(-1 * upperBound, upperBoundInclusive, -1 * 
lowerBound, lowerBoundInclusive)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(2, 1),
+                               streamRecordOf(3, 1),
+                               streamRecordOf(3, 2),
+                               streamRecordOf(4, 2),
+                               streamRecordOf(4, 3))
+                       .noLateRecords()
+                       .close();
+       }
+
+       @Test // lhs - 2 <= rhs <= rhs + 2
+       public void testNegativeInclusiveAndNegativeInclusive() throws 
Exception {
+
+               setupHarness(-2, true, -1, true)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(2, 1),
+                               streamRecordOf(3, 1),
+                               streamRecordOf(3, 2),
+                               streamRecordOf(4, 2),
+                               streamRecordOf(4, 3)
+                       )
+                       .noLateRecords()
+                       .close();
+       }
+
+       @Test // lhs - 1 <= rhs <= rhs + 1
+       public void testNegativeInclusiveAndPositiveInclusive() throws 
Exception {
+
+               setupHarness(-1, true, 1, true)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(1, 1),
+                               streamRecordOf(1, 2),
+                               streamRecordOf(2, 1),
+                               streamRecordOf(2, 2),
+                               streamRecordOf(2, 3),
+                               streamRecordOf(3, 2),
+                               streamRecordOf(3, 3),
+                               streamRecordOf(3, 4),
+                               streamRecordOf(4, 3),
+                               streamRecordOf(4, 4)
+                       )
+                       .noLateRecords()
+                       .close();
+       }
+
+       @Test // lhs + 1 <= rhs <= lhs + 2
+       public void testPositiveInclusiveAndPositiveInclusive() throws 
Exception {
+
+               setupHarness(1, true, 2, true)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(1, 2),
+                               streamRecordOf(1, 3),
+                               streamRecordOf(2, 3),
+                               streamRecordOf(2, 4),
+                               streamRecordOf(3, 4)
+                       )
+                       .noLateRecords()
+                       .close();
+       }
+
+       @Test
+       public void testNegativeExclusiveAndNegativeExlusive() throws Exception 
{
+
+               setupHarness(-3, false, -1, false)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(3, 1),
+                               streamRecordOf(4, 2)
+                       )
+                       .noLateRecords()
+                       .close();
+       }
+
+       @Test
+       public void testNegativeExclusiveAndPositiveExlusive() throws Exception 
{
+
+               setupHarness(-1, false, 1, false)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(1, 1),
+                               streamRecordOf(2, 2),
+                               streamRecordOf(3, 3),
+                               streamRecordOf(4, 4)
+                       )
+                       .noLateRecords()
+                       .close();
+       }
+
+       @Test
+       public void testPositiveExclusiveAndPositiveExlusive() throws Exception 
{
+
+               setupHarness(1, false, 3, false)
+                       .processElementsAndWatermarks(1, 4)
+                       .andExpect(
+                               streamRecordOf(1, 3),
+                               streamRecordOf(2, 4)
+                       )
+                       .noLateRecords()
+                       .close();
+       }
+
+       @Test
+       public void testStateCleanupNegativeInclusiveNegativeInclusive() throws 
Exception {
+
+               setupHarness(-1, true, 0, true)
+                       .processElement1(1)
+                       .processElement1(2)
+                       .processElement1(3)
+                       .processElement1(4)
+                       .processElement1(5)
+
+                       .processElement2(1)
+                       .processElement2(2)
+                       .processElement2(3)
+                       .processElement2(4)
+                       .processElement2(5) // fill both buffers with values
+
+                       .processWatermark1(1)
+                       .processWatermark2(1) // set common watermark to 1 and 
check that data is cleaned
+
+                       .assertLeftBufferContainsOnly(2, 3, 4, 5)
+                       .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+                       .processWatermark1(4) // set common watermark to 4 and 
check that data is cleaned
+                       .processWatermark2(4)
+
+                       .assertLeftBufferContainsOnly(5)
+                       .assertRightBufferContainsOnly(4, 5)
+
+                       .processWatermark1(6) // set common watermark to 6 and 
check that data all buffers are empty
+                       .processWatermark2(6)
+
+                       .assertLeftBufferEmpty()
+                       .assertRightBufferEmpty()
+
+                       .close();
+       }
+
+       @Test
+       public void testStateCleanupNegativePositiveNegativeExlusive() throws 
Exception {
+               setupHarness(-2, false, 1, false)
+                       .processElement1(1)
+                       .processElement1(2)
+                       .processElement1(3)
+                       .processElement1(4)
+                       .processElement1(5)
+
+                       .processElement2(1)
+                       .processElement2(2)
+                       .processElement2(3)
+                       .processElement2(4)
+                       .processElement2(5) // fill both buffers with values
+
+                       .processWatermark1(1)
+                       .processWatermark2(1) // set common watermark to 1 and 
check that data is cleaned
+
+                       .assertLeftBufferContainsOnly(2, 3, 4, 5)
+                       .assertRightBufferContainsOnly(1, 2, 3, 4, 5)
+
+                       .processWatermark1(4) // set common watermark to 4 and 
check that data is cleaned
+                       .processWatermark2(4)
+
+                       .assertLeftBufferContainsOnly(5)
+                       .assertRightBufferContainsOnly(4, 5)
+
+                       .processWatermark1(6) // set common watermark to 6 and 
check that data all buffers are empty
+                       .processWatermark2(6)
+
+                       .assertLeftBufferEmpty()
+                       .assertRightBufferEmpty()
+
+                       .close();
+       }
+
+       @Test
+       public void testStateCleanupPositiveInclusivePositiveInclusive() throws 
Exception {
+               setupHarness(0, true, 1, true)
+                       .processElement1(1)
+                       .processElement1(2)
+                       .processElement1(3)
+                       .processElement1(4)
+                       .processElement1(5)
+
+                       .processElement2(1)
+                       .processElement2(2)
+                       .processElement2(3)
+                       .processElement2(4)
+                       .processElement2(5) // fill both buffers with values
+
+                       .processWatermark1(1)
+                       .processWatermark2(1) // set common watermark to 1 and 
check that data is cleaned
+
+                       .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+                       .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+                       .processWatermark1(4) // set common watermark to 4 and 
check that data is cleaned
+                       .processWatermark2(4)
+
+                       .assertLeftBufferContainsOnly(4, 5)
+                       .assertRightBufferContainsOnly(5)
+
+                       .processWatermark1(6) // set common watermark to 6 and 
check that data all buffers are empty
+                       .processWatermark2(6)
+
+                       .assertLeftBufferEmpty()
+                       .assertRightBufferEmpty()
+
+                       .close();
+       }
+
+       @Test
+       public void testStateCleanupPositiveExlusivePositiveExclusive() throws 
Exception {
+               setupHarness(-1, false, 2, false)
+                       .processElement1(1)
+                       .processElement1(2)
+                       .processElement1(3)
+                       .processElement1(4)
+                       .processElement1(5)
+
+                       .processElement2(1)
+                       .processElement2(2)
+                       .processElement2(3)
+                       .processElement2(4)
+                       .processElement2(5) // fill both buffers with values
+
+                       .processWatermark1(1)
+                       .processWatermark2(1) // set common watermark to 1 and 
check that data is cleaned
+
+                       .assertLeftBufferContainsOnly(1, 2, 3, 4, 5)
+                       .assertRightBufferContainsOnly(2, 3, 4, 5)
+
+                       .processWatermark1(4) // set common watermark to 4 and 
check that data is cleaned
+                       .processWatermark2(4)
+
+                       .assertLeftBufferContainsOnly(4, 5)
+                       .assertRightBufferContainsOnly(5)
+
+                       .processWatermark1(6) // set common watermark to 6 and 
check that data all buffers are empty
+                       .processWatermark2(6)
+
+                       .assertLeftBufferEmpty()
+                       .assertRightBufferEmpty()
+
+                       .close();
+       }
+
+       @Test
+       public void testRestoreFromSnapshot() throws Exception {
+
+               // config
+               int lowerBound = -1;
+               boolean lowerBoundInclusive = true;
+               int upperBound = 1;
+               boolean upperBoundInclusive = true;
+
+               // create first test harness
+               OperatorSubtaskState handles;
+               List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput;
+
+               try (TestHarness testHarness = createTestHarness(
+                       lowerBound,
+                       lowerBoundInclusive,
+                       upperBound,
+                       upperBoundInclusive
+               )) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       // process elements with first test harness
+                       testHarness.processElement1(createStreamRecord(1, 
"lhs"));
+                       testHarness.processWatermark1(new Watermark(1));
+
+                       testHarness.processElement2(createStreamRecord(1, 
"rhs"));
+                       testHarness.processWatermark2(new Watermark(1));
+
+                       testHarness.processElement1(createStreamRecord(2, 
"lhs"));
+                       testHarness.processWatermark1(new Watermark(2));
+
+                       testHarness.processElement2(createStreamRecord(2, 
"rhs"));
+                       testHarness.processWatermark2(new Watermark(2));
+
+                       testHarness.processElement1(createStreamRecord(3, 
"lhs"));
+                       testHarness.processWatermark1(new Watermark(3));
+
+                       testHarness.processElement2(createStreamRecord(3, 
"rhs"));
+                       testHarness.processWatermark2(new Watermark(3));
+
+                       // snapshot and validate output
+                       handles = testHarness.snapshot(0, 0);
+                       testHarness.close();
+
+                       expectedOutput = Lists.newArrayList(
+                               streamRecordOf(1, 1),
+                               streamRecordOf(1, 2),
+                               streamRecordOf(2, 1),
+                               streamRecordOf(2, 2),
+                               streamRecordOf(2, 3),
+                               streamRecordOf(3, 2),
+                               streamRecordOf(3, 3)
+                       );
+
+                       
TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
+                       assertOutput(expectedOutput, testHarness.getOutput());
+               }
+
+               try (TestHarness newTestHarness = createTestHarness(
+                       lowerBound,
+                       lowerBoundInclusive,
+                       upperBound,
+                       upperBoundInclusive
+               )) {
+                       // create new test harness from snapshpt
+
+                       newTestHarness.setup();
+                       newTestHarness.initializeState(handles);
+                       newTestHarness.open();
+
+                       // process elements
+                       newTestHarness.processElement1(createStreamRecord(4, 
"lhs"));
+                       newTestHarness.processWatermark1(new Watermark(4));
+
+                       newTestHarness.processElement2(createStreamRecord(4, 
"rhs"));
+                       newTestHarness.processWatermark2(new Watermark(4));
+
+                       // assert expected output
+                       expectedOutput = Lists.newArrayList(
+                               streamRecordOf(3, 4),
+                               streamRecordOf(4, 3),
+                               streamRecordOf(4, 4)
+                       );
+
+                       
TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
+                       assertOutput(expectedOutput, 
newTestHarness.getOutput());
+               }
+       }
+
+       @Test
+       public void testContextCorrectLeftTimestamp() throws Exception {
+
+               IntervalJoinOperator<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> op =
+                       new IntervalJoinOperator<>(
+                               -1,
+                               1,
+                               true,
+                               true,
+                               TestElem.serializer(),
+                               TestElem.serializer(),
+                               new ProcessJoinFunction<TestElem, TestElem, 
Tuple2<TestElem, TestElem>>() {
+                                       @Override
+                                       public void processElement(
+                                               TestElem left,
+                                               TestElem right,
+                                               Context ctx,
+                                               Collector<Tuple2<TestElem, 
TestElem>> out) throws Exception {
+                                               Assert.assertEquals(left.ts, 
ctx.getLeftTimestamp());
+                                       }
+                               }
+                       );
+
+               try (TestHarness testHarness = new TestHarness(
+                       op,
+                       (elem) -> elem.key,
+                       (elem) -> elem.key,
+                       TypeInformation.of(String.class)
+               )) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       processElementsAndWatermarks(testHarness);
+               }
+       }
+
+       @Test
+       public void testReturnsCorrectTimestamp() throws Exception {
+               IntervalJoinOperator<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> op =
+                       new IntervalJoinOperator<>(
+                               -1,
+                               1,
+                               true,
+                               true,
+                               TestElem.serializer(),
+                               TestElem.serializer(),
+                               new ProcessJoinFunction<TestElem, TestElem, 
Tuple2<TestElem, TestElem>>() {
+                                       @Override
+                                       public void processElement(
+                                               TestElem left,
+                                               TestElem right,
+                                               Context ctx,
+                                               Collector<Tuple2<TestElem, 
TestElem>> out) throws Exception {
+                                               Assert.assertEquals(left.ts, 
ctx.getTimestamp());
+                                       }
+                               }
+                       );
+
+               try (TestHarness testHarness = new TestHarness(
+                       op,
+                       (elem) -> elem.key,
+                       (elem) -> elem.key,
+                       TypeInformation.of(String.class)
+               )) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       processElementsAndWatermarks(testHarness);
+               }
+       }
+
+       @Test
+       public void testContextCorrectRightTimestamp() throws Exception {
+
+               IntervalJoinOperator<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> op =
+                       new IntervalJoinOperator<>(
+                               -1,
+                               1,
+                               true,
+                               true,
+                               TestElem.serializer(),
+                               TestElem.serializer(),
+                               new ProcessJoinFunction<TestElem, TestElem, 
Tuple2<TestElem, TestElem>>() {
+                                       @Override
+                                       public void processElement(
+                                               TestElem left,
+                                               TestElem right,
+                                               Context ctx,
+                                               Collector<Tuple2<TestElem, 
TestElem>> out) throws Exception {
+                                               Assert.assertEquals(right.ts, 
ctx.getRightTimestamp());
+                                       }
+                               }
+                       );
+
+               try (TestHarness testHarness = new TestHarness(
+                       op,
+                       (elem) -> elem.key,
+                       (elem) -> elem.key,
+                       TypeInformation.of(String.class)
+               )) {
+
+                       testHarness.setup();
+                       testHarness.open();
+
+                       processElementsAndWatermarks(testHarness);
+               }
+       }
+
+       @Test(expected = FlinkException.class)
+       public void testFailsWithNoTimestampsLeft() throws Exception {
+               TestHarness newTestHarness = createTestHarness(0L, true, 0L, 
true);
+
+               newTestHarness.setup();
+               newTestHarness.open();
+
+               // note that the StreamRecord has no timestamp in constructor
+               newTestHarness.processElement1(new StreamRecord<>(new 
TestElem(0, "lhs")));
+       }
+
+       @Test(expected = FlinkException.class)
+       public void testFailsWithNoTimestampsRight() throws Exception {
+               try (TestHarness newTestHarness = createTestHarness(0L, true, 
0L, true)) {
+
+                       newTestHarness.setup();
+                       newTestHarness.open();
+
+                       // note that the StreamRecord has no timestamp in 
constructor
+                       newTestHarness.processElement2(new StreamRecord<>(new 
TestElem(0, "rhs")));
+               }
+       }
+
+       @Test
+       public void testDiscardsLateData() throws Exception {
+               setupHarness(-1, true, 1, true)
+                       .processElement1(1)
+                       .processElement2(1)
+                       .processElement1(2)
+                       .processElement2(2)
+                       .processElement1(3)
+                       .processElement2(3)
+                       .processWatermark1(3)
+                       .processWatermark2(3)
+                       .processElement1(1) // this element is late and should 
not be joined again
+                       .processElement1(4)
+                       .processElement2(4)
+                       .processElement1(5)
+                       .processElement2(5)
+                       .andExpect(
+                               streamRecordOf(1, 1),
+                               streamRecordOf(1, 2),
+
+                               streamRecordOf(2, 1),
+                               streamRecordOf(2, 2),
+                               streamRecordOf(2, 3),
+
+                               streamRecordOf(3, 2),
+                               streamRecordOf(3, 3),
+                               streamRecordOf(3, 4),
+
+                               streamRecordOf(4, 3),
+                               streamRecordOf(4, 4),
+                               streamRecordOf(4, 5),
+
+                               streamRecordOf(5, 4),
+                               streamRecordOf(5, 5)
+                       )
+                       .noLateRecords()
+                       .close();
+       }
+
+       private void assertEmpty(MapState<Long, ?> state) throws Exception {
+               boolean stateIsEmpty = Iterables.size(state.keys()) == 0;
+               Assert.assertTrue("state not empty", stateIsEmpty);
+       }
+
+       private void assertContainsOnly(MapState<Long, ?> state, long... ts) 
throws Exception {
+               for (long t : ts) {
+                       String message = "Keys not found in state. \n Expected: 
" + Arrays.toString(ts) + "\n Actual:   " + state.keys();
+                       Assert.assertTrue(message, state.contains(t));
+               }
+
+               String message = "Too many objects in state. \n Expected: " + 
Arrays.toString(ts) + "\n Actual:   " + state.keys();
+               Assert.assertEquals(message, ts.length, 
Iterables.size(state.keys()));
+       }
+
+       private void assertOutput(
+               Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> 
expectedOutput,
+               Queue<Object> actualOutput) {
+
+               int actualSize = actualOutput.stream()
+                       .filter(elem -> elem instanceof StreamRecord)
+                       .collect(Collectors.toList())
+                       .size();
+
+               int expectedSize = Iterables.size(expectedOutput);
+
+               Assert.assertEquals(
+                       "Expected and actual size of stream records different",
+                       expectedSize,
+                       actualSize
+               );
+
+               for (StreamRecord<Tuple2<TestElem, TestElem>> record : 
expectedOutput) {
+                       Assert.assertTrue(actualOutput.contains(record));
+               }
+       }
+
+       private TestHarness createTestHarness(long lowerBound,
+               boolean lowerBoundInclusive,
+               long upperBound,
+               boolean upperBoundInclusive) throws Exception {
+
+               IntervalJoinOperator<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> operator =
+                       new IntervalJoinOperator<>(
+                               lowerBound,
+                               upperBound,
+                               lowerBoundInclusive,
+                               upperBoundInclusive,
+                               TestElem.serializer(),
+                               TestElem.serializer(),
+                               new PassthroughFunction()
+                       );
+
+               return new TestHarness(
+                       operator,
+                       (elem) -> elem.key, // key
+                       (elem) -> elem.key, // key
+                       TypeInformation.of(String.class)
+               );
+       }
+
+       private JoinTestBuilder setupHarness(long lowerBound,
+               boolean lowerBoundInclusive,
+               long upperBound,
+               boolean upperBoundInclusive) throws Exception {
+
+               IntervalJoinOperator<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> operator =
+                       new IntervalJoinOperator<>(
+                               lowerBound,
+                               upperBound,
+                               lowerBoundInclusive,
+                               upperBoundInclusive,
+                               TestElem.serializer(),
+                               TestElem.serializer(),
+                               new PassthroughFunction()
+                       );
+
+               TestHarness t = new TestHarness(
+                       operator,
+                       (elem) -> elem.key, // key
+                       (elem) -> elem.key, // key
+                       TypeInformation.of(String.class)
+               );
+
+               return new JoinTestBuilder(t, operator);
+       }
+
+       private class JoinTestBuilder {
+
+               private IntervalJoinOperator<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> operator;
+               private TestHarness testHarness;
+
+               public JoinTestBuilder(
+                       TestHarness t,
+                       IntervalJoinOperator<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> operator
+               ) throws Exception {
+
+                       this.testHarness = t;
+                       this.operator = operator;
+                       t.open();
+                       t.setup();
+               }
+
+               public TestHarness get() {
+                       return testHarness;
+               }
+
+               public JoinTestBuilder processElement1(int ts) throws Exception 
{
+                       testHarness.processElement1(createStreamRecord(ts, 
"lhs"));
+                       return this;
+               }
+
+               public JoinTestBuilder processElement2(int ts) throws Exception 
{
+                       testHarness.processElement2(createStreamRecord(ts, 
"rhs"));
+                       return this;
+               }
+
+               public JoinTestBuilder processWatermark1(int ts) throws 
Exception {
+                       testHarness.processWatermark1(new Watermark(ts));
+                       return this;
+               }
+
+               public JoinTestBuilder processWatermark2(int ts) throws 
Exception {
+                       testHarness.processWatermark2(new Watermark(ts));
+                       return this;
+               }
+
+               public JoinTestBuilder processElementsAndWatermarks(int from, 
int to) throws Exception {
+                       if (lhsFasterThanRhs) {
+                               // add to lhs
+                               for (int i = from; i <= to; i++) {
+                                       
testHarness.processElement1(createStreamRecord(i, "lhs"));
+                                       testHarness.processWatermark1(new 
Watermark(i));
+                               }
+
+                               // add to rhs
+                               for (int i = from; i <= to; i++) {
+                                       
testHarness.processElement2(createStreamRecord(i, "rhs"));
+                                       testHarness.processWatermark2(new 
Watermark(i));
+                               }
+                       } else {
+                               // add to rhs
+                               for (int i = from; i <= to; i++) {
+                                       
testHarness.processElement2(createStreamRecord(i, "rhs"));
+                                       testHarness.processWatermark2(new 
Watermark(i));
+                               }
+
+                               // add to lhs
+                               for (int i = from; i <= to; i++) {
+                                       
testHarness.processElement1(createStreamRecord(i, "lhs"));
+                                       testHarness.processWatermark1(new 
Watermark(i));
+                               }
+                       }
+
+                       return this;
+               }
+
+               @SafeVarargs
+               public final JoinTestBuilder 
andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) {
+                       assertOutput(Lists.newArrayList(elems), 
testHarness.getOutput());
+                       return this;
+               }
+
+               public JoinTestBuilder assertLeftBufferContainsOnly(long... 
timestamps) {
+
+                       try {
+                               assertContainsOnly(operator.getLeftBuffer(), 
timestamps);
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+                       return this;
+               }
+
+               public JoinTestBuilder assertRightBufferContainsOnly(long... 
timestamps) {
+
+                       try {
+                               assertContainsOnly(operator.getRightBuffer(), 
timestamps);
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+                       return this;
+               }
+
+               public JoinTestBuilder assertLeftBufferEmpty() {
+                       try {
+                               assertEmpty(operator.getLeftBuffer());
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+                       return this;
+               }
+
+               public JoinTestBuilder assertRightBufferEmpty() {
+                       try {
+                               assertEmpty(operator.getRightBuffer());
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+                       return this;
+               }
+
+               public JoinTestBuilder noLateRecords() {
+                       
TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
+                       return this;
+               }
+
+               public void close() throws Exception {
+                       testHarness.close();
+               }
+       }
+
+       private static class PassthroughFunction extends 
ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
+
+               @Override
+               public void processElement(
+                       TestElem left,
+                       TestElem right,
+                       Context ctx,
+                       Collector<Tuple2<TestElem, TestElem>> out) throws 
Exception {
+                       out.collect(Tuple2.of(left, right));
+               }
+       }
+
+       private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(
+               long lhsTs,
+               long rhsTs
+       ) {
+               TestElem lhs = new TestElem(lhsTs, "lhs");
+               TestElem rhs = new TestElem(rhsTs, "rhs");
+
+               long ts = Math.max(lhsTs, rhsTs);
+               return new StreamRecord<>(Tuple2.of(lhs, rhs), ts);
+       }
+
+       private static class TestElem {
+               String key;
+               long ts;
+               String source;
+
+               public TestElem(long ts, String source) {
+                       this.key = "key";
+                       this.ts = ts;
+                       this.source = source;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+
+                       TestElem testElem = (TestElem) o;
+
+                       if (ts != testElem.ts) {
+                               return false;
+                       }
+
+                       if (key != null ? !key.equals(testElem.key) : 
testElem.key != null) {
+                               return false;
+                       }
+
+                       return source != null ? source.equals(testElem.source) 
: testElem.source == null;
+               }
+
+               @Override
+               public int hashCode() {
+                       int result = key != null ? key.hashCode() : 0;
+                       result = 31 * result + (int) (ts ^ (ts >>> 32));
+                       result = 31 * result + (source != null ? 
source.hashCode() : 0);
+                       return result;
+               }
+
+               @Override
+               public String toString() {
+                       return this.source + ":" + this.ts;
+               }
+
+               public static TypeSerializer<TestElem> serializer() {
+                       return TypeInformation.of(new TypeHint<TestElem>() {
+                       }).createSerializer(new ExecutionConfig());
+               }
+       }
+
+       private static StreamRecord<TestElem> createStreamRecord(long ts, 
String source) {
+               TestElem testElem = new TestElem(ts, source);
+               return new StreamRecord<>(testElem, ts);
+       }
+
+       private void processElementsAndWatermarks(TestHarness testHarness) 
throws Exception {
+               if (lhsFasterThanRhs) {
+                       // add to lhs
+                       for (int i = 1; i <= 4; i++) {
+                               
testHarness.processElement1(createStreamRecord(i, "lhs"));
+                               testHarness.processWatermark1(new Watermark(i));
+                       }
+
+                       // add to rhs
+                       for (int i = 1; i <= 4; i++) {
+                               
testHarness.processElement2(createStreamRecord(i, "rhs"));
+                               testHarness.processWatermark2(new Watermark(i));
+                       }
+               } else {
+                       // add to rhs
+                       for (int i = 1; i <= 4; i++) {
+                               
testHarness.processElement2(createStreamRecord(i, "rhs"));
+                               testHarness.processWatermark2(new Watermark(i));
+                       }
+
+                       // add to lhs
+                       for (int i = 1; i <= 4; i++) {
+                               
testHarness.processElement1(createStreamRecord(i, "lhs"));
+                               testHarness.processWatermark1(new Watermark(i));
+                       }
+               }
+       }
+
+       /**
+        * Custom test harness to avoid endless generics in all of the test 
code.
+        */
+       private static class TestHarness extends 
KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, 
Tuple2<TestElem, TestElem>> {
+
+               TestHarness(
+                       TwoInputStreamOperator<TestElem, TestElem, 
Tuple2<TestElem, TestElem>> operator,
+                       KeySelector<TestElem, String> keySelector1,
+                       KeySelector<TestElem, String> keySelector2,
+                       TypeInformation<String> keyType) throws Exception {
+                       super(operator, keySelector1, keySelector2, keyType);
+               }
+       }
+}

Reply via email to