Github user florianschmidt1994 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5482#discussion_r183705882
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY>
keySelector) {
public <W extends Window> WithWindow<T1, T2, KEY, W>
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2,
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+ /**
+ * Specifies the time boundaries over which the join
operation works, so that
+ * <pre>leftElement.timestamp + lowerBound <=
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+ * By default both the lower and the upper bound are
inclusive. This can be configured
+ * with {@link
TimeBounded#lowerBoundExclusive(boolean)} and
+ * {@link TimeBounded#upperBoundExclusive(boolean)}
+ *
+ * @param lowerBound The lower bound. Needs to be
smaller than or equal to the upperBound
+ * @param upperBound The upper bound. Needs to be
bigger than or equal to the lowerBound
+ */
+ public TimeBounded<T1, T2, KEY> between(Time
lowerBound, Time upperBound) {
+
+ TimeCharacteristic timeCharacteristic =
+
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+ if (timeCharacteristic !=
TimeCharacteristic.EventTime) {
+ throw new
RuntimeException("Time-bounded stream joins are only supported in event time");
+ }
+
+ checkNotNull(lowerBound, "A lower bound needs
to be provided for a time-bounded join");
+ checkNotNull(upperBound, "An upper bound needs
to be provided for a time-bounded join");
+ return new TimeBounded<>(
+ input1,
+ input2,
+ lowerBound.toMilliseconds(),
+ upperBound.toMilliseconds(),
+ true,
+ true,
+ keySelector1,
+ keySelector2
+ );
+ }
+ }
+ }
+
+ /**
+ * Joined streams that have keys for both sides as well as the time
boundaries over which
+ * elements should be joined defined.
+ *
+ * @param <IN1> Input type of elements from the first stream
+ * @param <IN2> Input type of elements from the second stream
+ * @param <KEY> The type of the key
+ */
+ public static class TimeBounded<IN1, IN2, KEY> {
+
+ private static final String TIMEBOUNDED_JOIN_FUNC_NAME =
"TimeBoundedJoin";
+
+ private final DataStream<IN1> left;
+ private final DataStream<IN2> right;
+
+ private final long lowerBound;
+ private final long upperBound;
+
+ private final KeySelector<IN1, KEY> keySelector1;
+ private final KeySelector<IN2, KEY> keySelector2;
+
+ private boolean lowerBoundInclusive;
+ private boolean upperBoundInclusive;
+
+ public TimeBounded(
+ DataStream<IN1> left,
+ DataStream<IN2> right,
+ long lowerBound,
+ long upperBound,
+ boolean lowerBoundInclusive,
+ boolean upperBoundInclusive,
+ KeySelector<IN1, KEY> keySelector1,
+ KeySelector<IN2, KEY> keySelector2) {
+
+ this.left = Preconditions.checkNotNull(left);
+ this.right = Preconditions.checkNotNull(right);
+
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+
+ this.lowerBoundInclusive = lowerBoundInclusive;
+ this.upperBoundInclusive = upperBoundInclusive;
+
+ this.keySelector1 =
Preconditions.checkNotNull(keySelector1);
+ this.keySelector2 =
Preconditions.checkNotNull(keySelector2);
+ }
+
+ /**
+ * Configure whether the upper bound should be considered
exclusive or inclusive.
+ */
+ public TimeBounded<IN1, IN2, KEY> upperBoundExclusive(boolean
exclusive) {
+ this.upperBoundInclusive = !exclusive;
+ return this;
+ }
+
+ /**
+ * Configure whether the lower bound should be considered
exclusive or inclusive.
+ */
+ public TimeBounded<IN1, IN2, KEY> lowerBoundExclusive(boolean
exclusive) {
+ this.lowerBoundInclusive = !exclusive;
+ return this;
+ }
+
+ /**
+ * Completes the join operation with the user function that is
executed for each joined pair
+ * of elements.
+ * @param udf The user-defined function
+ * @param <OUT> The output type
+ * @return Returns a DataStream
+ */
+ public <OUT> DataStream<OUT>
process(TimeBoundedJoinFunction<IN1, IN2, OUT> udf) {
+
+ ConnectedStreams<IN1, IN2> connected =
left.connect(right);
+
+ udf = left.getExecutionEnvironment().clean(udf);
+
+ TypeInformation<OUT> resultType =
TypeExtractor.getBinaryOperatorReturnType(
+ udf,
+ TimeBoundedJoinFunction.class, //
TimeBoundedJoinFunction<IN1, IN2, OUT>
+ 0, //
0 1 2
+ 1,
+ 2,
+ new int[]{0}, // lambda input
1 type arg indices
+ new int[]{1}, // lambda input
1 type arg indices
+ TypeExtractor.NO_INDEX, // output arg
indices
+ left.getType(), // input 1 type
information
+ right.getType(), // input 1 type
information
+ TIMEBOUNDED_JOIN_FUNC_NAME,
+ false
+ );
+
+ long bucketGranularity =
Time.seconds(1).toMilliseconds();
--- End diff --
Yes, this should actually be exposed either through the API or config file
(or both?). I'll give this one some thought as well and come back
---