Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5482#discussion_r172302147
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY>
keySelector) {
public <W extends Window> WithWindow<T1, T2, KEY, W>
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2,
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+ /**
+ * Specifies the time boundaries over which the join
operation works, so that
+ * <pre>leftElement.timestamp + lowerBound <=
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+ * By default both the lower and the upper bound are
inclusive. This can be configured
+ * with {@link
TimeBounded#lowerBoundExclusive(boolean)} and
+ * {@link TimeBounded#upperBoundExclusive(boolean)}
+ *
+ * @param lowerBound The lower bound. Needs to be
smaller than or equal to the upperBound
+ * @param upperBound The upper bound. Needs to be
bigger than or equal to the lowerBound
+ */
+ public TimeBounded<T1, T2, KEY> between(Time
lowerBound, Time upperBound) {
+
+ TimeCharacteristic timeCharacteristic =
+
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+ if (timeCharacteristic !=
TimeCharacteristic.EventTime) {
+ throw new
RuntimeException("Time-bounded stream joins are only supported in event time");
--- End diff --
should use `IllegalStateException`. or even better, shall we create a Flink
specific exception?
---