Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4471#discussion_r152830710
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
---
@@ -116,14 +135,100 @@ object UpdatingPlanChecker {
val windowStartEnd = w.getWindowProperties.map(_.name)
// we have only a unique key if at least one window property is
selected
if (windowStartEnd.nonEmpty) {
- keys = Some(groupKeys ++ windowStartEnd)
+ val smallestAttribute = windowStartEnd.sorted.head
+ Some((groupKeys.map(e => (e, e)) ++ windowStartEnd.map((_,
smallestAttribute))).toList)
+ } else {
+ None
+ }
+
+ case j: DataStreamJoin =>
+ val joinType = j.getJoinType
+ joinType match {
+ case JoinRelType.INNER => {
+ // get key(s) for inner join
+ val lInputKeys = visit(j.getLeft)
+ val rInputKeys = visit(j.getRight)
+ if (lInputKeys.isEmpty || rInputKeys.isEmpty) {
+ None
+ } else {
+ // Output of inner join must have keys if left and right
both contain key(s).
+ // Key groups from both side will be merged by join
equi-predicates
+ val lFieldNames: Seq[String] =
j.getLeft.getRowType.getFieldNames
+ val rFieldNames: Seq[String] =
j.getRight.getRowType.getFieldNames
+ val lJoinKeys: Seq[String] =
j.getJoinInfo.leftKeys.map(lFieldNames.get(_))
+ val rJoinKeys: Seq[String] =
j.getJoinInfo.rightKeys.map(rFieldNames.get(_))
+
+ getOutputKeysForInnerJoin(
+ lFieldNames ++ rFieldNames,
+ lInputKeys.get ++ rInputKeys.get,
+ lJoinKeys.zip(rJoinKeys).toList
+ )
+ }
+ }
+ case _ => throw new UnsupportedOperationException(
+ s"An Unsupported JoinType [ $joinType ]")
}
case _: DataStreamRel =>
- // anything else does not forward keys or might duplicate key,
so we can stop
- keys = None
+ // anything else does not forward keys, so we can stop
+ None
}
}
- }
+ def getOutputKeysForInnerJoin(
--- End diff --
Please document this method.
---