beyond1920 commented on a change in pull request #15695:
URL: https://github.com/apache/flink/pull/15695#discussion_r617223473
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
##########
@@ -98,31 +98,37 @@ object WindowJoinUtil {
val remainRightKeysArray = mutable.ArrayBuffer[Int]()
// convert remain pairs to RexInputRef tuple for building
SqlStdOperatorTable.EQUALS calls
// or SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
- joinInfo.pairs().foreach { p =>
- if (!windowStartEqualityLeftKeys.contains(p.source) &&
- !windowEndEqualityLeftKeys.contains(p.source)) {
- val leftFieldType = leftChildFieldsType.get(p.source).getType
- val leftInputRef = new RexInputRef(p.source, leftFieldType)
- val rightFieldType = rightChildFieldsType.get(p.target).getType
- val rightIndex = leftFieldCnt + p.target
+ joinSpec.getLeftKeys.zip(joinSpec.getRightKeys).
+ zip(joinSpec.getFilterNulls).foreach { case ((source, target),
filterNull) =>
+ if (!windowStartEqualityLeftKeys.contains(source) &&
+ !windowEndEqualityLeftKeys.contains(source)) {
+ val leftFieldType = leftChildFieldsType.get(source).getType
+ val leftInputRef = new RexInputRef(source, leftFieldType)
+ val rightFieldType = rightChildFieldsType.get(target).getType
+ val rightIndex = leftFieldCnt + target
val rightInputRef = new RexInputRef(rightIndex, rightFieldType)
- val remainEqual = rexBuilder.makeCall(
- SqlStdOperatorTable.EQUALS,
- leftInputRef,
- rightInputRef)
- remainEquals.add(remainEqual)
- remainLeftKeysArray.add(p.source)
- remainRightKeysArray.add(p.target)
+ val op = if (filterNull) {
+ SqlStdOperatorTable.EQUALS
+ } else {
+ SqlStdOperatorTable.IS_NOT_DISTINCT_FROM
+ }
+ val remainEqual = rexBuilder.makeCall(op, leftInputRef,
rightInputRef)
+ remainEquals += remainEqual
+ remainLeftKeysArray += source
+ remainRightKeysArray += target
}
}
- val remainAnds = remainEquals ++ joinInfo.nonEquiConditions
+ val notEquiCondition = joinSpec.getNonEquiCondition
+ if (notEquiCondition.isPresent) {
+ remainEquals += notEquiCondition.get()
Review comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]