Hello Community,
I’ve recently been working on adding support for outer joins [1] and timestamp
assignment [2] to the IntervalJoin in the DataStream API.
As this is a public API and it should be simple and understandable for users I
wanted to gather some feedback on some variations that I drafted up:
1. Add outer joins
Approach A
keyedStreamA.intervalJoin(keyedStreamB)
.leftOuter() // .rightOuter, .fullOuter()
.between(<Time>, <Time>)
.process(new ProcessJoinFunction() { /* … */ }
Approach B
keyedStreamA.intervalLeftJoin(keyedStreamB) // intervalRightJoin,
intervalFullOuterJoin
.between(<Time>, <Time>)
.process(new ProcessJoinFunction() { /* … */ }
Approach C
keyedStreamA.intervalJoin(keyedStreamB)
.joinType(JoinType.INNER) // Reuse existing (internally used)
JoinType
Personally I feel like C is the cleanest approach, but it has the problem that
checking for invalid timestamp strategy & join combinations can only be done
during runtime, whereas A and B would allow us to express valid combinations
through the type system.
2. Assign timestamps to the joined pairs
When two elements are joined together, this will add support for specifying
which of the elements timestamps should be assigned as the results timestamp.
The for options are MIN, MAX, LEFT and RIGHT, where MIN selects the minimum of
the two elements timestamps, MAX the maximum, LEFT the left elements timestamp
and RIGHT the right elements timestamp.
Approach A
keyedStreamA.intervalJoin(streamB)
.between(<Time>, <Time>)
.assignLeftTimestamp() // assignRightTimestamp(),
assignMinTimestamp(), assignMaxTimestamp()
.process(new ProcessJoinFunction() { /* … */ }
Approach B
keyedStreamA.intervalJoin(keyedStreamB)
.between(<Time>, <Time>)
.assignTimestamp(TimestampStrategy.LEFT) // .RIGHT, .MIN, .MAX
Again I feel like B is the cleanest approach, but has the same caveat with
runtime vs. type system checks as the approach above. This could be especially
interesting when it comes to combinations of join types and timestamp
assignments, where we will have a few combinations that are not possibly.
Any feedback would be greatly appreciated. I also updated the design doc at [3]
if anyone wants to hop in on further discussions!
Florian
[1] https://issues.apache.org/jira/browse/FLINK-8483
<https://issues.apache.org/jira/browse/FLINK-8483>
[2] https://issues.apache.org/jira/browse/FLINK-8482
<https://issues.apache.org/jira/browse/FLINK-8482>
[3]
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c
<https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.6pxr0kgtqp3c>