[FLINK-8480][DataStream] Add APIs for Interval Joins. This adds the Java and Scala API for performing an IntervalJoin. In jave this will look like:
Example: ```java keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...}); ``` This closes #5482. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42ada8ad Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42ada8ad Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42ada8ad Branch: refs/heads/master Commit: 42ada8ad9ca28f94d0a0355658330198bbc2b577 Parents: f45b7f7 Author: Florian Schmidt <florian.schmidt.1...@icloud.com> Authored: Mon Jul 9 12:02:24 2018 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Thu Jul 12 21:03:26 2018 +0200 ---------------------------------------------------------------------- docs/dev/stream/operators/index.md | 15 + .../streaming/api/datastream/KeyedStream.java | 184 ++++ .../UnsupportedTimeCharacteristicException.java | 35 + .../api/functions/co/ProcessJoinFunction.java | 87 ++ .../functions/co/TimeBoundedJoinFunction.java | 87 -- .../api/operators/co/IntervalJoinOperator.java | 513 ++++++++++ .../co/TimeBoundedStreamJoinOperator.java | 513 ---------- .../operators/co/IntervalJoinOperatorTest.java | 941 +++++++++++++++++++ .../co/TimeBoundedStreamJoinOperatorTest.java | 941 ------------------- .../flink/streaming/api/scala/KeyedStream.scala | 106 ++- .../api/scala/IntervalJoinITCase.scala | 130 +++ .../streaming/runtime/IntervalJoinITCase.java | 451 +++++++++ 12 files changed, 2461 insertions(+), 1542 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/docs/dev/stream/operators/index.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/operators/index.md b/docs/dev/stream/operators/index.md index 1dbdef4..422dbbf 100644 --- a/docs/dev/stream/operators/index.md +++ b/docs/dev/stream/operators/index.md @@ -310,6 +310,21 @@ dataStream.join(otherStream) </td> </tr> <tr> + <td><strong>Interval Join</strong><br>KeyedStream,KeyedStream → DataStream</td> + <td> + <p>Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound</p> + {% highlight java %} +// this will join the two streams so that +// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 +keyedStream.intervalJoin(otherKeyedStream) + .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound + .upperBoundExclusive(true) // optional + .lowerBoundExclusive(true) // optional + .process(new IntervalJoinFunction() {...}); + {% endhighlight %} + </td> + </tr> + <tr> <td><strong>Window CoGroup</strong><br>DataStream,DataStream → DataStream</td> <td> <p>Cogroups two data streams on a given key and a common window.</p> http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index a948ae2..32a5c96 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; +import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator; import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -51,6 +52,7 @@ import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamGroupedFold; import org.apache.flink.streaming.api.operators.StreamGroupedReduce; +import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; @@ -76,6 +78,8 @@ import java.util.List; import java.util.Stack; import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A {@link KeyedStream} represents a {@link DataStream} on which operator state is * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a @@ -396,6 +400,186 @@ public class KeyedStream<T, KEY> extends DataStream<T> { } // ------------------------------------------------------------------------ + // Joining + // ------------------------------------------------------------------------ + + /** + * Join elements of this {@link KeyedStream} with elements of another {@link KeyedStream} over + * a time interval that can be specified with {@link IntervalJoin#between(Time, Time)}. + * + * @param otherStream The other keyed stream to join this keyed stream with + * @param <T1> Type parameter of elements in the other stream + * @return An instance of {@link IntervalJoin} with this keyed stream and the other keyed stream + */ + @PublicEvolving + public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) { + return new IntervalJoin<>(this, otherStream); + } + + /** + * Perform a join over a time interval. + * @param <T1> The type parameter of the elements in the first streams + * @param <T2> The The type parameter of the elements in the second stream + */ + @PublicEvolving + public static class IntervalJoin<T1, T2, KEY> { + + private final KeyedStream<T1, KEY> streamOne; + private final KeyedStream<T2, KEY> streamTwo; + + IntervalJoin( + KeyedStream<T1, KEY> streamOne, + KeyedStream<T2, KEY> streamTwo + ) { + this.streamOne = checkNotNull(streamOne); + this.streamTwo = checkNotNull(streamTwo); + } + + /** + * Specifies the time boundaries over which the join operation works, so that + * <pre>leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound</pre> + * By default both the lower and the upper bound are inclusive. This can be configured + * with {@link IntervalJoined#lowerBoundExclusive()} and + * {@link IntervalJoined#upperBoundExclusive()} + * + * @param lowerBound The lower bound. Needs to be smaller than or equal to the upperBound + * @param upperBound The upper bound. Needs to be bigger than or equal to the lowerBound + */ + @PublicEvolving + public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) { + + TimeCharacteristic timeCharacteristic = + streamOne.getExecutionEnvironment().getStreamTimeCharacteristic(); + + if (timeCharacteristic != TimeCharacteristic.EventTime) { + throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time"); + } + + checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); + checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); + + return new IntervalJoined<>( + streamOne, + streamTwo, + lowerBound.toMilliseconds(), + upperBound.toMilliseconds(), + true, + true + ); + } + } + + /** + * IntervalJoined is a container for two streams that have keys for both sides as well as + * the time boundaries over which elements should be joined. + * + * @param <IN1> Input type of elements from the first stream + * @param <IN2> Input type of elements from the second stream + * @param <KEY> The type of the key + */ + @PublicEvolving + public static class IntervalJoined<IN1, IN2, KEY> { + + private static final String INTERVAL_JOIN_FUNC_NAME = "IntervalJoin"; + + private final KeyedStream<IN1, KEY> left; + private final KeyedStream<IN2, KEY> right; + + private final long lowerBound; + private final long upperBound; + + private final KeySelector<IN1, KEY> keySelector1; + private final KeySelector<IN2, KEY> keySelector2; + + private boolean lowerBoundInclusive; + private boolean upperBoundInclusive; + + public IntervalJoined( + KeyedStream<IN1, KEY> left, + KeyedStream<IN2, KEY> right, + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive) { + + this.left = checkNotNull(left); + this.right = checkNotNull(right); + + this.lowerBound = lowerBound; + this.upperBound = upperBound; + + this.lowerBoundInclusive = lowerBoundInclusive; + this.upperBoundInclusive = upperBoundInclusive; + + this.keySelector1 = left.getKeySelector(); + this.keySelector2 = right.getKeySelector(); + } + + /** + * Set the upper bound to be exclusive. + */ + @PublicEvolving + public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() { + this.upperBoundInclusive = false; + return this; + } + + /** + * Set the lower bound to be exclusive. + */ + @PublicEvolving + public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() { + this.lowerBoundInclusive = false; + return this; + } + + /** + * Completes the join operation with the user function that is executed for each joined pair + * of elements. + * @param udf The user-defined function + * @param <OUT> The output type + * @return Returns a DataStream + */ + @PublicEvolving + public <OUT> DataStream<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> udf) { + + ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(udf); + + TypeInformation<OUT> resultType = TypeExtractor.getBinaryOperatorReturnType( + cleanedUdf, + ProcessJoinFunction.class, // ProcessJoinFunction<IN1, IN2, OUT> + 0, // 0 1 2 + 1, + 2, + new int[]{0}, // lambda input 1 type arg indices + new int[]{1}, // lambda input 1 type arg indices + TypeExtractor.NO_INDEX, // output arg indices + left.getType(), // input 1 type information + right.getType(), // input 2 type information + INTERVAL_JOIN_FUNC_NAME , + false + ); + + IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = + new IntervalJoinOperator<>( + lowerBound, + upperBound, + lowerBoundInclusive, + upperBoundInclusive, + left.getType().createSerializer(left.getExecutionConfig()), + right.getType().createSerializer(right.getExecutionConfig()), + cleanedUdf + ); + + return left + .connect(right) + .keyBy(keySelector1, keySelector2) + .transform(INTERVAL_JOIN_FUNC_NAME , resultType, operator); + + } + } + + // ------------------------------------------------------------------------ // Windowing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java new file mode 100644 index 0000000..cb2570a --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/UnsupportedTimeCharacteristicException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.FlinkRuntimeException; + +/** + * An exception that indicates that a time characteristic was used that is not supported in the + * current operation. + */ +@PublicEvolving +public class UnsupportedTimeCharacteristicException extends FlinkRuntimeException { + + private static final long serialVersionUID = -8109094930338075819L; + + public UnsupportedTimeCharacteristicException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java new file mode 100644 index 0000000..2c39abc --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.co; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +/** + * A function that processes two joined elements and produces a single output one. + * + * <p>This function will get called for every joined pair of elements the joined two streams. + * The timestamp of the joined pair as well as the timestamp of the left element and the right + * element can be accessed through the {@link Context}. + * + * @param <IN1> Type of the first input + * @param <IN2> Type of the second input + * @param <OUT> Type of the output + */ +@PublicEvolving +public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction { + + private static final long serialVersionUID = -2444626938039012398L; + + /** + * This method is called for each joined pair of elements. It can output zero or more elements + * through the provided {@link Collector} and has access to the timestamps of the joined elements + * and the result through the {@link Context}. + * + * @param left The left element of the joined pair. + * @param right The right element of the joined pair. + * @param ctx A context that allows querying the timestamps of the left, right and + * joined pair. In addition, this context allows to emit elements on a side output. + * @param out The collector to emit resulting elements to. + * @throws Exception This function may throw exceptions which cause the streaming program to + * fail and go in recovery mode. + */ + public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception; + + /** + * The context that is available during an invocation of + * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the + * left element in the joined pair, the right one, and that of the joined pair. In addition, this context + * allows to emit elements on a side output. + */ + public abstract class Context { + + /** + * @return The timestamp of the left element of a joined pair + */ + public abstract long getLeftTimestamp(); + + /** + * @return The timestamp of the right element of a joined pair + */ + public abstract long getRightTimestamp(); + + /** + * @return The timestamp of the joined pair. + */ + public abstract long getTimestamp(); + + /** + * Emits a record to the side output identified by the {@link OutputTag}. + * @param outputTag The output tag that identifies the side output to emit to + * @param value The record to emit + */ + public abstract <X> void output(OutputTag<X> outputTag, X value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java deleted file mode 100644 index cd745ca..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimeBoundedJoinFunction.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions.co; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; - -/** - * A function that processes two joined elements and produces a single output one. - * - * <p>This function will get called for every joined pair of elements the joined two streams. - * The timestamp of the joined pair as well as the timestamp of the left element and the right - * element can be accessed through the {@link Context}. - * - * @param <IN1> Type of the first input - * @param <IN2> Type of the second input - * @param <OUT> Type of the output - */ -@PublicEvolving -public abstract class TimeBoundedJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction { - - private static final long serialVersionUID = -2444626938039012398L; - - /** - * This method is called for each joined pair of elements. It can output zero or more elements - * through the provided {@link Collector} and has access to the timestamps of the joined elements - * and the result through the {@link Context}. - * - * @param left The left element of the joined pair. - * @param right The right element of the joined pair. - * @param ctx A context that allows querying the timestamps of the left, right and - * joined pair. In addition, this context allows to emit elements on a side output. - * @param out The collector to emit resulting elements to. - * @throws Exception This function may throw exceptions which cause the streaming program to - * fail and go in recovery mode. - */ - public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception; - - /** - * The context that is available during an invocation of - * {@link #processElement(Object, Object, Context, Collector)}. It gives access to the timestamps of the - * left element in the joined pair, the right one, and that of the joined pair. In addition, this context - * allows to emit elements on a side output. - */ - public abstract class Context { - - /** - * @return The timestamp of the left element of a joined pair - */ - public abstract long getLeftTimestamp(); - - /** - * @return The timestamp of the right element of a joined pair - */ - public abstract long getRightTimestamp(); - - /** - * @return The timestamp of the joined pair. - */ - public abstract long getTimestamp(); - - /** - * Emits a record to the side output identified by the {@link OutputTag}. - * @param outputTag The output tag that identifies the side output to emit to - * @param value The record to emit - */ - public abstract <X> void output(OutputTag<X> outputTag, X value); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java new file mode 100644 index 0000000..0c449e6 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins. + * + * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs + * (T1, T2) where t2.ts â [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the + * upper bound can be configured to be either inclusive or exclusive. + * + * <p>As soon as elements are joined they are passed to a user-defined {@link ProcessJoinFunction}. + * + * <p>The basic idea of this implementation is as follows: Whenever we receive an element at + * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer. + * We then check the right buffer to see whether there are any elements that can be joined. If + * there are, they are joined and passed to the aforementioned function. The same happens the + * other way around when receiving an element on the right side. + * + * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of + * the elements. + * + * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered + * per element. This timer indicates when an element is not considered for joining anymore and can + * be removed from the state. + * + * @param <K> The type of the key based on which we join elements. + * @param <T1> The type of the elements in the left stream. + * @param <T2> The type of the elements in the right stream. + * @param <OUT> The output type created by the user-defined function. + */ +@Internal +public class IntervalJoinOperator<K, T1, T2, OUT> + extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>> + implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> { + + private static final long serialVersionUID = -5380774605111543454L; + + private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class); + + private static final String LEFT_BUFFER = "LEFT_BUFFER"; + private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; + private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER"; + private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT"; + private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT"; + + private final long lowerBound; + private final long upperBound; + + private final TypeSerializer<T1> leftTypeSerializer; + private final TypeSerializer<T2> rightTypeSerializer; + + private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer; + private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer; + + private transient TimestampedCollector<OUT> collector; + private transient ContextImpl context; + + private transient InternalTimerService<String> internalTimerService; + + /** + * Creates a new IntervalJoinOperator. + * + * @param lowerBound The lower bound for evaluating if elements should be joined + * @param upperBound The upper bound for evaluating if elements should be joined + * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches + * the lower bound + * @param upperBoundInclusive Whether or not to include elements where the timestamp matches + * the upper bound + * @param udf A user-defined {@link ProcessJoinFunction} that gets called + * whenever two elements of T1 and T2 are joined + */ + public IntervalJoinOperator( + long lowerBound, + long upperBound, + boolean lowerBoundInclusive, + boolean upperBoundInclusive, + TypeSerializer<T1> leftTypeSerializer, + TypeSerializer<T2> rightTypeSerializer, + ProcessJoinFunction<T1, T2, OUT> udf) { + + super(Preconditions.checkNotNull(udf)); + + Preconditions.checkArgument(lowerBound <= upperBound, + "lowerBound <= upperBound must be fulfilled"); + + // Move buffer by +1 / -1 depending on inclusiveness in order not needing + // to check for inclusiveness later on + this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L; + this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L; + + this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer); + this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer); + } + + @Override + public void open() throws Exception { + super.open(); + collector = new TimestampedCollector<>(output); + context = new ContextImpl(userFunction); + internalTimerService = + getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( + LEFT_BUFFER, + LongSerializer.INSTANCE, + new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer)) + )); + + this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( + RIGHT_BUFFER, + LongSerializer.INSTANCE, + new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) + )); + } + + /** + * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord} + * arrives at the left stream, it will get added to the left buffer. Possible join candidates + * for that element will be looked up from the right buffer and if the pair lies within the + * user defined boundaries, it gets passed to the {@link ProcessJoinFunction}. + * + * @param record An incoming record to be joined + * @throws Exception Can throw an Exception during state access + */ + @Override + public void processElement1(StreamRecord<T1> record) throws Exception { + processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); + } + + /** + * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord} + * arrives at the right stream, it will get added to the right buffer. Possible join candidates + * for that element will be looked up from the left buffer and if the pair lies within the user + * defined boundaries, it gets passed to the {@link ProcessJoinFunction}. + * + * @param record An incoming record to be joined + * @throws Exception Can throw an exception during state access + */ + @Override + public void processElement2(StreamRecord<T2> record) throws Exception { + processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); + } + + @SuppressWarnings("unchecked") + private <OUR, OTHER> void processElement( + StreamRecord<OUR> record, + MapState<Long, List<BufferEntry<OUR>>> ourBuffer, + MapState<Long, List<BufferEntry<OTHER>>> otherBuffer, + long relativeLowerBound, + long relativeUpperBound, + boolean isLeft) throws Exception { + + final OUR ourValue = record.getValue(); + final long ourTimestamp = record.getTimestamp(); + + if (ourTimestamp == Long.MIN_VALUE) { + throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + + "interval stream joins need to have timestamps meaningful timestamps."); + } + + if (isLate(ourTimestamp)) { + return; + } + + addToBuffer(ourBuffer, ourValue, ourTimestamp); + + for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) { + final long timestamp = bucket.getKey(); + + if (timestamp < ourTimestamp + relativeLowerBound || + timestamp > ourTimestamp + relativeUpperBound) { + continue; + } + + for (BufferEntry<OTHER> entry: bucket.getValue()) { + if (isLeft) { + collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); + } else { + collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); + } + } + } + + long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; + if (isLeft) { + internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); + } else { + internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); + } + } + + private boolean isLate(long timestamp) { + long currentWatermark = internalTimerService.currentWatermark(); + return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; + } + + private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { + long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); + collector.setAbsoluteTimestamp(resultTimestamp); + context.leftTimestamp = leftTimestamp; + context.rightTimestamp = rightTimestamp; + userFunction.processElement(left, right, context, collector); + } + + private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception { + List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp); + if (elemsInBucket == null) { + elemsInBucket = new ArrayList<>(); + } + elemsInBucket.add(new BufferEntry<>(value, false)); + buffer.put(timestamp, elemsInBucket); + } + + @Override + public void onEventTime(InternalTimer<K, String> timer) throws Exception { + + long timerTimestamp = timer.getTimestamp(); + String namespace = timer.getNamespace(); + + logger.trace("onEventTime @ {}", timerTimestamp); + + switch (namespace) { + case CLEANUP_NAMESPACE_LEFT: { + long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; + logger.trace("Removing from left buffer @ {}", timestamp); + leftBuffer.remove(timestamp); + break; + } + case CLEANUP_NAMESPACE_RIGHT: { + long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; + logger.trace("Removing from right buffer @ {}", timestamp); + rightBuffer.remove(timestamp); + break; + } + default: + throw new RuntimeException("Invalid namespace " + namespace); + } + } + + @Override + public void onProcessingTime(InternalTimer<K, String> timer) throws Exception { + // do nothing. + } + + /** + * The context that is available during an invocation of + * {@link ProcessJoinFunction#processElement(Object, Object, ProcessJoinFunction.Context, Collector)}. + * + * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of + * the joined pair. In addition, this context allows to emit elements on a side output. + */ + private final class ContextImpl extends ProcessJoinFunction<T1, T2, OUT>.Context { + + private long leftTimestamp = Long.MIN_VALUE; + + private long rightTimestamp = Long.MIN_VALUE; + + private ContextImpl(ProcessJoinFunction<T1, T2, OUT> func) { + func.super(); + } + + @Override + public long getLeftTimestamp() { + return leftTimestamp; + } + + @Override + public long getRightTimestamp() { + return rightTimestamp; + } + + @Override + public long getTimestamp() { + return leftTimestamp; + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + Preconditions.checkArgument(outputTag != null, "OutputTag must not be null"); + output.collect(outputTag, new StreamRecord<>(value, getTimestamp())); + } + } + + /** + * A container for elements put in the left/write buffer. + * This will contain the element itself along with a flag indicating + * if it has been joined or not. + */ + private static class BufferEntry<T> { + + private final T element; + private final boolean hasBeenJoined; + + BufferEntry(T element, boolean hasBeenJoined) { + this.element = element; + this.hasBeenJoined = hasBeenJoined; + } + } + + /** + * A {@link TypeSerializer serializer} for the {@link BufferEntry}. + */ + private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> { + + private static final long serialVersionUID = -20197698803836236L; + + private final TypeSerializer<T> elementSerializer; + + private BufferEntrySerializer(TypeSerializer<T> elementSerializer) { + this.elementSerializer = Preconditions.checkNotNull(elementSerializer); + } + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public TypeSerializer<BufferEntry<T>> duplicate() { + return new BufferEntrySerializer<>(elementSerializer.duplicate()); + } + + @Override + public BufferEntry<T> createInstance() { + return null; + } + + @Override + public BufferEntry<T> copy(BufferEntry<T> from) { + return new BufferEntry<>(from.element, from.hasBeenJoined); + } + + @Override + public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException { + target.writeBoolean(record.hasBeenJoined); + elementSerializer.serialize(record.element, target); + } + + @Override + public BufferEntry<T> deserialize(DataInputView source) throws IOException { + boolean hasBeenJoined = source.readBoolean(); + T element = elementSerializer.deserialize(source); + return new BufferEntry<>(element, hasBeenJoined); + } + + @Override + public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.writeBoolean(source.readBoolean()); + elementSerializer.copy(source, target); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o; + return Objects.equals(elementSerializer, that.elementSerializer); + } + + @Override + public int hashCode() { + return Objects.hash(elementSerializer); + } + + @Override + public boolean canEqual(Object obj) { + return obj.getClass().equals(BufferEntrySerializer.class); + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + return new BufferSerializerConfigSnapshot<>(elementSerializer); + } + + @Override + public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof BufferSerializerConfigSnapshot) { + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig = + ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); + + CompatibilityResult<T> compatResult = + CompatibilityUtil.resolveCompatibilityResult( + previousSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousSerializerAndConfig.f1, + elementSerializer); + + if (!compatResult.isRequiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new BufferEntrySerializer<>( + new TypeDeserializerAdapter<>( + compatResult.getConvertDeserializer()))); + } + } + return CompatibilityResult.requiresMigration(); + } + } + + /** + * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer. + */ + public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + public BufferSerializerConfigSnapshot() { + } + + public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) { + super(userTypeSerializer); + } + + @Override + public int getVersion() { + return VERSION; + } + } + + @VisibleForTesting + MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() { + return leftBuffer; + } + + @VisibleForTesting + MapState<Long, List<BufferEntry<T2>>> getRightBuffer() { + return rightBuffer; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java deleted file mode 100644 index 26ad26b..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/TimeBoundedStreamJoinOperator.java +++ /dev/null @@ -1,513 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.operators.co; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.streaming.api.functions.co.TimeBoundedJoinFunction; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.InternalTimer; -import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.operators.Triggerable; -import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.OutputTag; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * An {@link TwoInputStreamOperator operator} to execute time-bounded stream inner joins. - * - * <p>By using a configurable lower and upper bound this operator will emit exactly those pairs - * (T1, T2) where t2.ts â [T1.ts + lowerBound, T1.ts + upperBound]. Both the lower and the - * upper bound can be configured to be either inclusive or exclusive. - * - * <p>As soon as elements are joined they are passed to a user-defined {@link TimeBoundedJoinFunction}. - * - * <p>The basic idea of this implementation is as follows: Whenever we receive an element at - * {@link #processElement1(StreamRecord)} (a.k.a. the left side), we add it to the left buffer. - * We then check the right buffer to see whether there are any elements that can be joined. If - * there are, they are joined and passed to the aforementioned function. The same happens the - * other way around when receiving an element on the right side. - * - * <p>Whenever a pair of elements is emitted it will be assigned the max timestamp of either of - * the elements. - * - * <p>In order to avoid the element buffers to grow indefinitely a cleanup timer is registered - * per element. This timer indicates when an element is not considered for joining anymore and can - * be removed from the state. - * - * @param <K> The type of the key based on which we join elements. - * @param <T1> The type of the elements in the left stream. - * @param <T2> The type of the elements in the right stream. - * @param <OUT> The output type created by the user-defined function. - */ -@Internal -public class TimeBoundedStreamJoinOperator<K, T1, T2, OUT> - extends AbstractUdfStreamOperator<OUT, TimeBoundedJoinFunction<T1, T2, OUT>> - implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> { - - private static final long serialVersionUID = -5380774605111543454L; - - private static final Logger logger = LoggerFactory.getLogger(TimeBoundedStreamJoinOperator.class); - - private static final String LEFT_BUFFER = "LEFT_BUFFER"; - private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; - private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER"; - private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT"; - private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT"; - - private final long lowerBound; - private final long upperBound; - - private final TypeSerializer<T1> leftTypeSerializer; - private final TypeSerializer<T2> rightTypeSerializer; - - private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer; - private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer; - - private transient TimestampedCollector<OUT> collector; - private transient ContextImpl context; - - private transient InternalTimerService<String> internalTimerService; - - /** - * Creates a new TimeBoundedStreamJoinOperator. - * - * @param lowerBound The lower bound for evaluating if elements should be joined - * @param upperBound The upper bound for evaluating if elements should be joined - * @param lowerBoundInclusive Whether or not to include elements where the timestamp matches - * the lower bound - * @param upperBoundInclusive Whether or not to include elements where the timestamp matches - * the upper bound - * @param udf A user-defined {@link TimeBoundedJoinFunction} that gets called - * whenever two elements of T1 and T2 are joined - */ - public TimeBoundedStreamJoinOperator( - long lowerBound, - long upperBound, - boolean lowerBoundInclusive, - boolean upperBoundInclusive, - TypeSerializer<T1> leftTypeSerializer, - TypeSerializer<T2> rightTypeSerializer, - TimeBoundedJoinFunction<T1, T2, OUT> udf) { - - super(Preconditions.checkNotNull(udf)); - - Preconditions.checkArgument(lowerBound <= upperBound, - "lowerBound <= upperBound must be fulfilled"); - - // Move buffer by +1 / -1 depending on inclusiveness in order not needing - // to check for inclusiveness later on - this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L; - this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L; - - this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer); - this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer); - } - - @Override - public void open() throws Exception { - super.open(); - collector = new TimestampedCollector<>(output); - context = new ContextImpl(userFunction); - internalTimerService = - getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this); - } - - @Override - public void initializeState(StateInitializationContext context) throws Exception { - super.initializeState(context); - - this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( - LEFT_BUFFER, - LongSerializer.INSTANCE, - new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer)) - )); - - this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( - RIGHT_BUFFER, - LongSerializer.INSTANCE, - new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) - )); - } - - /** - * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord} - * arrives at the left stream, it will get added to the left buffer. Possible join candidates - * for that element will be looked up from the right buffer and if the pair lies within the - * user defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}. - * - * @param record An incoming record to be joined - * @throws Exception Can throw an Exception during state access - */ - @Override - public void processElement1(StreamRecord<T1> record) throws Exception { - processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); - } - - /** - * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord} - * arrives at the right stream, it will get added to the right buffer. Possible join candidates - * for that element will be looked up from the left buffer and if the pair lies within the user - * defined boundaries, it gets passed to the {@link TimeBoundedJoinFunction}. - * - * @param record An incoming record to be joined - * @throws Exception Can throw an exception during state access - */ - @Override - public void processElement2(StreamRecord<T2> record) throws Exception { - processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); - } - - @SuppressWarnings("unchecked") - private <OUR, OTHER> void processElement( - StreamRecord<OUR> record, - MapState<Long, List<BufferEntry<OUR>>> ourBuffer, - MapState<Long, List<BufferEntry<OTHER>>> otherBuffer, - long relativeLowerBound, - long relativeUpperBound, - boolean isLeft) throws Exception { - - final OUR ourValue = record.getValue(); - final long ourTimestamp = record.getTimestamp(); - - if (ourTimestamp == Long.MIN_VALUE) { - throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + - "interval stream joins need to have timestamps meaningful timestamps."); - } - - if (isLate(ourTimestamp)) { - return; - } - - addToBuffer(ourBuffer, ourValue, ourTimestamp); - - for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) { - final long timestamp = bucket.getKey(); - - if (timestamp < ourTimestamp + relativeLowerBound || - timestamp > ourTimestamp + relativeUpperBound) { - continue; - } - - for (BufferEntry<OTHER> entry: bucket.getValue()) { - if (isLeft) { - collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); - } else { - collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); - } - } - } - - long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; - if (isLeft) { - internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); - } else { - internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); - } - } - - private boolean isLate(long timestamp) { - long currentWatermark = internalTimerService.currentWatermark(); - return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; - } - - private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { - long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); - collector.setAbsoluteTimestamp(resultTimestamp); - context.leftTimestamp = leftTimestamp; - context.rightTimestamp = rightTimestamp; - userFunction.processElement(left, right, context, collector); - } - - private <T> void addToBuffer(MapState<Long, List<BufferEntry<T>>> buffer, T value, long timestamp) throws Exception { - List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp); - if (elemsInBucket == null) { - elemsInBucket = new ArrayList<>(); - } - elemsInBucket.add(new BufferEntry<>(value, false)); - buffer.put(timestamp, elemsInBucket); - } - - @Override - public void onEventTime(InternalTimer<K, String> timer) throws Exception { - - long timerTimestamp = timer.getTimestamp(); - String namespace = timer.getNamespace(); - - logger.trace("onEventTime @ {}", timerTimestamp); - - switch (namespace) { - case CLEANUP_NAMESPACE_LEFT: { - long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; - logger.trace("Removing from left buffer @ {}", timestamp); - leftBuffer.remove(timestamp); - break; - } - case CLEANUP_NAMESPACE_RIGHT: { - long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; - logger.trace("Removing from right buffer @ {}", timestamp); - rightBuffer.remove(timestamp); - break; - } - default: - throw new RuntimeException("Invalid namespace " + namespace); - } - } - - @Override - public void onProcessingTime(InternalTimer<K, String> timer) throws Exception { - // do nothing. - } - - /** - * The context that is available during an invocation of - * {@link TimeBoundedJoinFunction#processElement(Object, Object, TimeBoundedJoinFunction.Context, Collector)}. - * - * <p>It gives access to the timestamps of the left element in the joined pair, the right one, and that of - * the joined pair. In addition, this context allows to emit elements on a side output. - */ - private final class ContextImpl extends TimeBoundedJoinFunction<T1, T2, OUT>.Context { - - private long leftTimestamp = Long.MIN_VALUE; - - private long rightTimestamp = Long.MIN_VALUE; - - private ContextImpl(TimeBoundedJoinFunction<T1, T2, OUT> func) { - func.super(); - } - - @Override - public long getLeftTimestamp() { - return leftTimestamp; - } - - @Override - public long getRightTimestamp() { - return rightTimestamp; - } - - @Override - public long getTimestamp() { - return leftTimestamp; - } - - @Override - public <X> void output(OutputTag<X> outputTag, X value) { - Preconditions.checkArgument(outputTag != null, "OutputTag must not be null"); - output.collect(outputTag, new StreamRecord<>(value, getTimestamp())); - } - } - - /** - * A container for elements put in the left/write buffer. - * This will contain the element itself along with a flag indicating - * if it has been joined or not. - */ - private static class BufferEntry<T> { - - private final T element; - private final boolean hasBeenJoined; - - BufferEntry(T element, boolean hasBeenJoined) { - this.element = element; - this.hasBeenJoined = hasBeenJoined; - } - } - - /** - * A {@link TypeSerializer serializer} for the {@link BufferEntry}. - */ - private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> { - - private static final long serialVersionUID = -20197698803836236L; - - private final TypeSerializer<T> elementSerializer; - - private BufferEntrySerializer(TypeSerializer<T> elementSerializer) { - this.elementSerializer = Preconditions.checkNotNull(elementSerializer); - } - - @Override - public boolean isImmutableType() { - return true; - } - - @Override - public TypeSerializer<BufferEntry<T>> duplicate() { - return new BufferEntrySerializer<>(elementSerializer.duplicate()); - } - - @Override - public BufferEntry<T> createInstance() { - return null; - } - - @Override - public BufferEntry<T> copy(BufferEntry<T> from) { - return new BufferEntry<>(from.element, from.hasBeenJoined); - } - - @Override - public BufferEntry<T> copy(BufferEntry<T> from, BufferEntry<T> reuse) { - return copy(from); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(BufferEntry<T> record, DataOutputView target) throws IOException { - target.writeBoolean(record.hasBeenJoined); - elementSerializer.serialize(record.element, target); - } - - @Override - public BufferEntry<T> deserialize(DataInputView source) throws IOException { - boolean hasBeenJoined = source.readBoolean(); - T element = elementSerializer.deserialize(source); - return new BufferEntry<>(element, hasBeenJoined); - } - - @Override - public BufferEntry<T> deserialize(BufferEntry<T> reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - target.writeBoolean(source.readBoolean()); - elementSerializer.copy(source, target); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - BufferEntrySerializer<?> that = (BufferEntrySerializer<?>) o; - return Objects.equals(elementSerializer, that.elementSerializer); - } - - @Override - public int hashCode() { - return Objects.hash(elementSerializer); - } - - @Override - public boolean canEqual(Object obj) { - return obj.getClass().equals(BufferEntrySerializer.class); - } - - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new BufferSerializerConfigSnapshot<>(elementSerializer); - } - - @Override - public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof BufferSerializerConfigSnapshot) { - Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig = - ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult<T> compatResult = - CompatibilityUtil.resolveCompatibilityResult( - previousSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousSerializerAndConfig.f1, - elementSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new BufferEntrySerializer<>( - new TypeDeserializerAdapter<>( - compatResult.getConvertDeserializer()))); - } - } - return CompatibilityResult.requiresMigration(); - } - } - - /** - * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer. - */ - public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { - - private static final int VERSION = 1; - - public BufferSerializerConfigSnapshot() { - } - - public BufferSerializerConfigSnapshot(final TypeSerializer<T> userTypeSerializer) { - super(userTypeSerializer); - } - - @Override - public int getVersion() { - return VERSION; - } - } - - @VisibleForTesting - MapState<Long, List<BufferEntry<T1>>> getLeftBuffer() { - return leftBuffer; - } - - @VisibleForTesting - MapState<Long, List<BufferEntry<T2>>> getRightBuffer() { - return rightBuffer; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/42ada8ad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java new file mode 100644 index 0000000..ee3f4d8 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperatorTest.java @@ -0,0 +1,941 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkException; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.stream.Collectors; + + +/** + * Tests for {@link IntervalJoinOperator}. + * Those tests cover correctness and cleaning of state + */ +@RunWith(Parameterized.class) +public class IntervalJoinOperatorTest { + + private final boolean lhsFasterThanRhs; + + @Parameters(name = "lhs faster than rhs: {0}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][]{ + {true}, {false} + }); + } + + public IntervalJoinOperatorTest(boolean lhsFasterThanRhs) { + this.lhsFasterThanRhs = lhsFasterThanRhs; + } + + @Test + public void testImplementationMirrorsCorrectly() throws Exception { + + long lowerBound = 1; + long upperBound = 3; + + boolean lowerBoundInclusive = true; + boolean upperBoundInclusive = false; + + setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(1, 2), + streamRecordOf(1, 3), + streamRecordOf(2, 3), + streamRecordOf(2, 4), + streamRecordOf(3, 4)) + .noLateRecords() + .close(); + + setupHarness(-1 * upperBound, upperBoundInclusive, -1 * lowerBound, lowerBoundInclusive) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(2, 1), + streamRecordOf(3, 1), + streamRecordOf(3, 2), + streamRecordOf(4, 2), + streamRecordOf(4, 3)) + .noLateRecords() + .close(); + } + + @Test // lhs - 2 <= rhs <= rhs + 2 + public void testNegativeInclusiveAndNegativeInclusive() throws Exception { + + setupHarness(-2, true, -1, true) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(2, 1), + streamRecordOf(3, 1), + streamRecordOf(3, 2), + streamRecordOf(4, 2), + streamRecordOf(4, 3) + ) + .noLateRecords() + .close(); + } + + @Test // lhs - 1 <= rhs <= rhs + 1 + public void testNegativeInclusiveAndPositiveInclusive() throws Exception { + + setupHarness(-1, true, 1, true) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(1, 1), + streamRecordOf(1, 2), + streamRecordOf(2, 1), + streamRecordOf(2, 2), + streamRecordOf(2, 3), + streamRecordOf(3, 2), + streamRecordOf(3, 3), + streamRecordOf(3, 4), + streamRecordOf(4, 3), + streamRecordOf(4, 4) + ) + .noLateRecords() + .close(); + } + + @Test // lhs + 1 <= rhs <= lhs + 2 + public void testPositiveInclusiveAndPositiveInclusive() throws Exception { + + setupHarness(1, true, 2, true) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(1, 2), + streamRecordOf(1, 3), + streamRecordOf(2, 3), + streamRecordOf(2, 4), + streamRecordOf(3, 4) + ) + .noLateRecords() + .close(); + } + + @Test + public void testNegativeExclusiveAndNegativeExlusive() throws Exception { + + setupHarness(-3, false, -1, false) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(3, 1), + streamRecordOf(4, 2) + ) + .noLateRecords() + .close(); + } + + @Test + public void testNegativeExclusiveAndPositiveExlusive() throws Exception { + + setupHarness(-1, false, 1, false) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(1, 1), + streamRecordOf(2, 2), + streamRecordOf(3, 3), + streamRecordOf(4, 4) + ) + .noLateRecords() + .close(); + } + + @Test + public void testPositiveExclusiveAndPositiveExlusive() throws Exception { + + setupHarness(1, false, 3, false) + .processElementsAndWatermarks(1, 4) + .andExpect( + streamRecordOf(1, 3), + streamRecordOf(2, 4) + ) + .noLateRecords() + .close(); + } + + @Test + public void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception { + + setupHarness(-1, true, 0, true) + .processElement1(1) + .processElement1(2) + .processElement1(3) + .processElement1(4) + .processElement1(5) + + .processElement2(1) + .processElement2(2) + .processElement2(3) + .processElement2(4) + .processElement2(5) // fill both buffers with values + + .processWatermark1(1) + .processWatermark2(1) // set common watermark to 1 and check that data is cleaned + + .assertLeftBufferContainsOnly(2, 3, 4, 5) + .assertRightBufferContainsOnly(1, 2, 3, 4, 5) + + .processWatermark1(4) // set common watermark to 4 and check that data is cleaned + .processWatermark2(4) + + .assertLeftBufferContainsOnly(5) + .assertRightBufferContainsOnly(4, 5) + + .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty + .processWatermark2(6) + + .assertLeftBufferEmpty() + .assertRightBufferEmpty() + + .close(); + } + + @Test + public void testStateCleanupNegativePositiveNegativeExlusive() throws Exception { + setupHarness(-2, false, 1, false) + .processElement1(1) + .processElement1(2) + .processElement1(3) + .processElement1(4) + .processElement1(5) + + .processElement2(1) + .processElement2(2) + .processElement2(3) + .processElement2(4) + .processElement2(5) // fill both buffers with values + + .processWatermark1(1) + .processWatermark2(1) // set common watermark to 1 and check that data is cleaned + + .assertLeftBufferContainsOnly(2, 3, 4, 5) + .assertRightBufferContainsOnly(1, 2, 3, 4, 5) + + .processWatermark1(4) // set common watermark to 4 and check that data is cleaned + .processWatermark2(4) + + .assertLeftBufferContainsOnly(5) + .assertRightBufferContainsOnly(4, 5) + + .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty + .processWatermark2(6) + + .assertLeftBufferEmpty() + .assertRightBufferEmpty() + + .close(); + } + + @Test + public void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception { + setupHarness(0, true, 1, true) + .processElement1(1) + .processElement1(2) + .processElement1(3) + .processElement1(4) + .processElement1(5) + + .processElement2(1) + .processElement2(2) + .processElement2(3) + .processElement2(4) + .processElement2(5) // fill both buffers with values + + .processWatermark1(1) + .processWatermark2(1) // set common watermark to 1 and check that data is cleaned + + .assertLeftBufferContainsOnly(1, 2, 3, 4, 5) + .assertRightBufferContainsOnly(2, 3, 4, 5) + + .processWatermark1(4) // set common watermark to 4 and check that data is cleaned + .processWatermark2(4) + + .assertLeftBufferContainsOnly(4, 5) + .assertRightBufferContainsOnly(5) + + .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty + .processWatermark2(6) + + .assertLeftBufferEmpty() + .assertRightBufferEmpty() + + .close(); + } + + @Test + public void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception { + setupHarness(-1, false, 2, false) + .processElement1(1) + .processElement1(2) + .processElement1(3) + .processElement1(4) + .processElement1(5) + + .processElement2(1) + .processElement2(2) + .processElement2(3) + .processElement2(4) + .processElement2(5) // fill both buffers with values + + .processWatermark1(1) + .processWatermark2(1) // set common watermark to 1 and check that data is cleaned + + .assertLeftBufferContainsOnly(1, 2, 3, 4, 5) + .assertRightBufferContainsOnly(2, 3, 4, 5) + + .processWatermark1(4) // set common watermark to 4 and check that data is cleaned + .processWatermark2(4) + + .assertLeftBufferContainsOnly(4, 5) + .assertRightBufferContainsOnly(5) + + .processWatermark1(6) // set common watermark to 6 and check that data all buffers are empty + .processWatermark2(6) + + .assertLeftBufferEmpty() + .assertRightBufferEmpty() + + .close(); + } + + @Test + public void testRestoreFromSnapshot() throws Exception { + + // config + int lowerBound = -1; + boolean lowerBoundInclusive = true; + int upperBound = 1; + boolean upperBoundInclusive = true; + + // create first test harness + OperatorSubtaskState handles; + List<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput; + + try (TestHarness testHarness = createTestHarness( + lowerBound, + lowerBoundInclusive, + upperBound, + upperBoundInclusive + )) { + + testHarness.setup(); + testHarness.open(); + + // process elements with first test harness + testHarness.processElement1(createStreamRecord(1, "lhs")); + testHarness.processWatermark1(new Watermark(1)); + + testHarness.processElement2(createStreamRecord(1, "rhs")); + testHarness.processWatermark2(new Watermark(1)); + + testHarness.processElement1(createStreamRecord(2, "lhs")); + testHarness.processWatermark1(new Watermark(2)); + + testHarness.processElement2(createStreamRecord(2, "rhs")); + testHarness.processWatermark2(new Watermark(2)); + + testHarness.processElement1(createStreamRecord(3, "lhs")); + testHarness.processWatermark1(new Watermark(3)); + + testHarness.processElement2(createStreamRecord(3, "rhs")); + testHarness.processWatermark2(new Watermark(3)); + + // snapshot and validate output + handles = testHarness.snapshot(0, 0); + testHarness.close(); + + expectedOutput = Lists.newArrayList( + streamRecordOf(1, 1), + streamRecordOf(1, 2), + streamRecordOf(2, 1), + streamRecordOf(2, 2), + streamRecordOf(2, 3), + streamRecordOf(3, 2), + streamRecordOf(3, 3) + ); + + TestHarnessUtil.assertNoLateRecords(testHarness.getOutput()); + assertOutput(expectedOutput, testHarness.getOutput()); + } + + try (TestHarness newTestHarness = createTestHarness( + lowerBound, + lowerBoundInclusive, + upperBound, + upperBoundInclusive + )) { + // create new test harness from snapshpt + + newTestHarness.setup(); + newTestHarness.initializeState(handles); + newTestHarness.open(); + + // process elements + newTestHarness.processElement1(createStreamRecord(4, "lhs")); + newTestHarness.processWatermark1(new Watermark(4)); + + newTestHarness.processElement2(createStreamRecord(4, "rhs")); + newTestHarness.processWatermark2(new Watermark(4)); + + // assert expected output + expectedOutput = Lists.newArrayList( + streamRecordOf(3, 4), + streamRecordOf(4, 3), + streamRecordOf(4, 4) + ); + + TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput()); + assertOutput(expectedOutput, newTestHarness.getOutput()); + } + } + + @Test + public void testContextCorrectLeftTimestamp() throws Exception { + + IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op = + new IntervalJoinOperator<>( + -1, + 1, + true, + true, + TestElem.serializer(), + TestElem.serializer(), + new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() { + @Override + public void processElement( + TestElem left, + TestElem right, + Context ctx, + Collector<Tuple2<TestElem, TestElem>> out) throws Exception { + Assert.assertEquals(left.ts, ctx.getLeftTimestamp()); + } + } + ); + + try (TestHarness testHarness = new TestHarness( + op, + (elem) -> elem.key, + (elem) -> elem.key, + TypeInformation.of(String.class) + )) { + + testHarness.setup(); + testHarness.open(); + + processElementsAndWatermarks(testHarness); + } + } + + @Test + public void testReturnsCorrectTimestamp() throws Exception { + IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op = + new IntervalJoinOperator<>( + -1, + 1, + true, + true, + TestElem.serializer(), + TestElem.serializer(), + new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() { + @Override + public void processElement( + TestElem left, + TestElem right, + Context ctx, + Collector<Tuple2<TestElem, TestElem>> out) throws Exception { + Assert.assertEquals(left.ts, ctx.getTimestamp()); + } + } + ); + + try (TestHarness testHarness = new TestHarness( + op, + (elem) -> elem.key, + (elem) -> elem.key, + TypeInformation.of(String.class) + )) { + + testHarness.setup(); + testHarness.open(); + + processElementsAndWatermarks(testHarness); + } + } + + @Test + public void testContextCorrectRightTimestamp() throws Exception { + + IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> op = + new IntervalJoinOperator<>( + -1, + 1, + true, + true, + TestElem.serializer(), + TestElem.serializer(), + new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>() { + @Override + public void processElement( + TestElem left, + TestElem right, + Context ctx, + Collector<Tuple2<TestElem, TestElem>> out) throws Exception { + Assert.assertEquals(right.ts, ctx.getRightTimestamp()); + } + } + ); + + try (TestHarness testHarness = new TestHarness( + op, + (elem) -> elem.key, + (elem) -> elem.key, + TypeInformation.of(String.class) + )) { + + testHarness.setup(); + testHarness.open(); + + processElementsAndWatermarks(testHarness); + } + } + + @Test(expected = FlinkException.class) + public void testFailsWithNoTimestampsLeft() throws Exception { + TestHarness newTestHarness = createTestHarness(0L, true, 0L, true); + + newTestHarness.setup(); + newTestHarness.open(); + + // note that the StreamRecord has no timestamp in constructor + newTestHarness.processElement1(new StreamRecord<>(new TestElem(0, "lhs"))); + } + + @Test(expected = FlinkException.class) + public void testFailsWithNoTimestampsRight() throws Exception { + try (TestHarness newTestHarness = createTestHarness(0L, true, 0L, true)) { + + newTestHarness.setup(); + newTestHarness.open(); + + // note that the StreamRecord has no timestamp in constructor + newTestHarness.processElement2(new StreamRecord<>(new TestElem(0, "rhs"))); + } + } + + @Test + public void testDiscardsLateData() throws Exception { + setupHarness(-1, true, 1, true) + .processElement1(1) + .processElement2(1) + .processElement1(2) + .processElement2(2) + .processElement1(3) + .processElement2(3) + .processWatermark1(3) + .processWatermark2(3) + .processElement1(1) // this element is late and should not be joined again + .processElement1(4) + .processElement2(4) + .processElement1(5) + .processElement2(5) + .andExpect( + streamRecordOf(1, 1), + streamRecordOf(1, 2), + + streamRecordOf(2, 1), + streamRecordOf(2, 2), + streamRecordOf(2, 3), + + streamRecordOf(3, 2), + streamRecordOf(3, 3), + streamRecordOf(3, 4), + + streamRecordOf(4, 3), + streamRecordOf(4, 4), + streamRecordOf(4, 5), + + streamRecordOf(5, 4), + streamRecordOf(5, 5) + ) + .noLateRecords() + .close(); + } + + private void assertEmpty(MapState<Long, ?> state) throws Exception { + boolean stateIsEmpty = Iterables.size(state.keys()) == 0; + Assert.assertTrue("state not empty", stateIsEmpty); + } + + private void assertContainsOnly(MapState<Long, ?> state, long... ts) throws Exception { + for (long t : ts) { + String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys(); + Assert.assertTrue(message, state.contains(t)); + } + + String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual: " + state.keys(); + Assert.assertEquals(message, ts.length, Iterables.size(state.keys())); + } + + private void assertOutput( + Iterable<StreamRecord<Tuple2<TestElem, TestElem>>> expectedOutput, + Queue<Object> actualOutput) { + + int actualSize = actualOutput.stream() + .filter(elem -> elem instanceof StreamRecord) + .collect(Collectors.toList()) + .size(); + + int expectedSize = Iterables.size(expectedOutput); + + Assert.assertEquals( + "Expected and actual size of stream records different", + expectedSize, + actualSize + ); + + for (StreamRecord<Tuple2<TestElem, TestElem>> record : expectedOutput) { + Assert.assertTrue(actualOutput.contains(record)); + } + } + + private TestHarness createTestHarness(long lowerBound, + boolean lowerBoundInclusive, + long upperBound, + boolean upperBoundInclusive) throws Exception { + + IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator = + new IntervalJoinOperator<>( + lowerBound, + upperBound, + lowerBoundInclusive, + upperBoundInclusive, + TestElem.serializer(), + TestElem.serializer(), + new PassthroughFunction() + ); + + return new TestHarness( + operator, + (elem) -> elem.key, // key + (elem) -> elem.key, // key + TypeInformation.of(String.class) + ); + } + + private JoinTestBuilder setupHarness(long lowerBound, + boolean lowerBoundInclusive, + long upperBound, + boolean upperBoundInclusive) throws Exception { + + IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator = + new IntervalJoinOperator<>( + lowerBound, + upperBound, + lowerBoundInclusive, + upperBoundInclusive, + TestElem.serializer(), + TestElem.serializer(), + new PassthroughFunction() + ); + + TestHarness t = new TestHarness( + operator, + (elem) -> elem.key, // key + (elem) -> elem.key, // key + TypeInformation.of(String.class) + ); + + return new JoinTestBuilder(t, operator); + } + + private class JoinTestBuilder { + + private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator; + private TestHarness testHarness; + + public JoinTestBuilder( + TestHarness t, + IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator + ) throws Exception { + + this.testHarness = t; + this.operator = operator; + t.open(); + t.setup(); + } + + public TestHarness get() { + return testHarness; + } + + public JoinTestBuilder processElement1(int ts) throws Exception { + testHarness.processElement1(createStreamRecord(ts, "lhs")); + return this; + } + + public JoinTestBuilder processElement2(int ts) throws Exception { + testHarness.processElement2(createStreamRecord(ts, "rhs")); + return this; + } + + public JoinTestBuilder processWatermark1(int ts) throws Exception { + testHarness.processWatermark1(new Watermark(ts)); + return this; + } + + public JoinTestBuilder processWatermark2(int ts) throws Exception { + testHarness.processWatermark2(new Watermark(ts)); + return this; + } + + public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception { + if (lhsFasterThanRhs) { + // add to lhs + for (int i = from; i <= to; i++) { + testHarness.processElement1(createStreamRecord(i, "lhs")); + testHarness.processWatermark1(new Watermark(i)); + } + + // add to rhs + for (int i = from; i <= to; i++) { + testHarness.processElement2(createStreamRecord(i, "rhs")); + testHarness.processWatermark2(new Watermark(i)); + } + } else { + // add to rhs + for (int i = from; i <= to; i++) { + testHarness.processElement2(createStreamRecord(i, "rhs")); + testHarness.processWatermark2(new Watermark(i)); + } + + // add to lhs + for (int i = from; i <= to; i++) { + testHarness.processElement1(createStreamRecord(i, "lhs")); + testHarness.processWatermark1(new Watermark(i)); + } + } + + return this; + } + + @SafeVarargs + public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>>... elems) { + assertOutput(Lists.newArrayList(elems), testHarness.getOutput()); + return this; + } + + public JoinTestBuilder assertLeftBufferContainsOnly(long... timestamps) { + + try { + assertContainsOnly(operator.getLeftBuffer(), timestamps); + } catch (Exception e) { + throw new RuntimeException(e); + } + return this; + } + + public JoinTestBuilder assertRightBufferContainsOnly(long... timestamps) { + + try { + assertContainsOnly(operator.getRightBuffer(), timestamps); + } catch (Exception e) { + throw new RuntimeException(e); + } + return this; + } + + public JoinTestBuilder assertLeftBufferEmpty() { + try { + assertEmpty(operator.getLeftBuffer()); + } catch (Exception e) { + throw new RuntimeException(e); + } + return this; + } + + public JoinTestBuilder assertRightBufferEmpty() { + try { + assertEmpty(operator.getRightBuffer()); + } catch (Exception e) { + throw new RuntimeException(e); + } + return this; + } + + public JoinTestBuilder noLateRecords() { + TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput()); + return this; + } + + public void close() throws Exception { + testHarness.close(); + } + } + + private static class PassthroughFunction extends ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> { + + @Override + public void processElement( + TestElem left, + TestElem right, + Context ctx, + Collector<Tuple2<TestElem, TestElem>> out) throws Exception { + out.collect(Tuple2.of(left, right)); + } + } + + private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf( + long lhsTs, + long rhsTs + ) { + TestElem lhs = new TestElem(lhsTs, "lhs"); + TestElem rhs = new TestElem(rhsTs, "rhs"); + + long ts = Math.max(lhsTs, rhsTs); + return new StreamRecord<>(Tuple2.of(lhs, rhs), ts); + } + + private static class TestElem { + String key; + long ts; + String source; + + public TestElem(long ts, String source) { + this.key = "key"; + this.ts = ts; + this.source = source; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + TestElem testElem = (TestElem) o; + + if (ts != testElem.ts) { + return false; + } + + if (key != null ? !key.equals(testElem.key) : testElem.key != null) { + return false; + } + + return source != null ? source.equals(testElem.source) : testElem.source == null; + } + + @Override + public int hashCode() { + int result = key != null ? key.hashCode() : 0; + result = 31 * result + (int) (ts ^ (ts >>> 32)); + result = 31 * result + (source != null ? source.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return this.source + ":" + this.ts; + } + + public static TypeSerializer<TestElem> serializer() { + return TypeInformation.of(new TypeHint<TestElem>() { + }).createSerializer(new ExecutionConfig()); + } + } + + private static StreamRecord<TestElem> createStreamRecord(long ts, String source) { + TestElem testElem = new TestElem(ts, source); + return new StreamRecord<>(testElem, ts); + } + + private void processElementsAndWatermarks(TestHarness testHarness) throws Exception { + if (lhsFasterThanRhs) { + // add to lhs + for (int i = 1; i <= 4; i++) { + testHarness.processElement1(createStreamRecord(i, "lhs")); + testHarness.processWatermark1(new Watermark(i)); + } + + // add to rhs + for (int i = 1; i <= 4; i++) { + testHarness.processElement2(createStreamRecord(i, "rhs")); + testHarness.processWatermark2(new Watermark(i)); + } + } else { + // add to rhs + for (int i = 1; i <= 4; i++) { + testHarness.processElement2(createStreamRecord(i, "rhs")); + testHarness.processWatermark2(new Watermark(i)); + } + + // add to lhs + for (int i = 1; i <= 4; i++) { + testHarness.processElement1(createStreamRecord(i, "lhs")); + testHarness.processWatermark1(new Watermark(i)); + } + } + } + + /** + * Custom test harness to avoid endless generics in all of the test code. + */ + private static class TestHarness extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> { + + TestHarness( + TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator, + KeySelector<TestElem, String> keySelector1, + KeySelector<TestElem, String> keySelector2, + TypeInformation<String> keyType) throws Exception { + super(operator, keySelector1, keySelector2, keyType); + } + } +}