Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4625#discussion_r137031980
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
---
@@ -115,10 +118,15 @@ object WindowJoinUtil {
case _ =>
Some(otherPreds.reduceLeft((l, r) =>
RelOptUtil.andJoinFilters(rexBuilder, l, r)))
}
-
- val bounds = Some(WindowBounds(timePreds.head.isEventTime,
leftLowerBound, leftUpperBound))
-
- (bounds, remainCondition)
+ if (timePreds.head.leftInputOnLeftSide) {
--- End diff --
Please replace the condition as follows:
```
val bounds = if (timePreds.head.leftInputOnLeftSide) {
Some(WindowBounds(
timePreds.head.isEventTime,
leftLowerBound,
leftUpperBound,
timePreds.head.leftTimeIdx,
timePreds.head.rightTimeIdx))
} else {
Some(WindowBounds(
timePreds.head.isEventTime,
leftLowerBound,
leftUpperBound,
timePreds.head.rightTimeIdx,
timePreds.head.leftTimeIdx))
}
(bounds, remainCondition)
```
We should not get the right index from the second time predicate because
the tables might be accessed in inverse order there (we would need to check
`leftInputOnLeftSide` again).
---