Github user florianschmidt1994 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5482#discussion_r183703916
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
---
@@ -137,6 +158,151 @@ public EqualTo equalTo(KeySelector<T2, KEY>
keySelector) {
public <W extends Window> WithWindow<T1, T2, KEY, W>
window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
return new WithWindow<>(input1, input2,
keySelector1, keySelector2, keyType, assigner, null, null);
}
+
+ /**
+ * Specifies the time boundaries over which the join
operation works, so that
+ * <pre>leftElement.timestamp + lowerBound <=
rightElement.timestamp <= leftElement.timestamp + upperBound</pre>
+ * By default both the lower and the upper bound are
inclusive. This can be configured
+ * with {@link
TimeBounded#lowerBoundExclusive(boolean)} and
+ * {@link TimeBounded#upperBoundExclusive(boolean)}
+ *
+ * @param lowerBound The lower bound. Needs to be
smaller than or equal to the upperBound
+ * @param upperBound The upper bound. Needs to be
bigger than or equal to the lowerBound
+ */
+ public TimeBounded<T1, T2, KEY> between(Time
lowerBound, Time upperBound) {
+
+ TimeCharacteristic timeCharacteristic =
+
input1.getExecutionEnvironment().getStreamTimeCharacteristic();
+
+ if (timeCharacteristic !=
TimeCharacteristic.EventTime) {
+ throw new
RuntimeException("Time-bounded stream joins are only supported in event time");
--- End diff --
I'm not sure about the `IllegalStateException`. The description of it says
>Signals that a method has been invoked at an illegal or inappropriate
time. In other words, the Java environment or Java application is not in an
appropriate state for the requested operation.
But I'm not opposed to the idea of creating a custom exception for all
cases where an operation is not supported with the current time characteristic.
I'll see if I can think of something
---