Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r132243950
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
    @@ -90,40 +96,86 @@ object UpdatingPlanChecker {
                   // resolve names of input fields
                   .map(io => (inNames.get(io._1), io._2))
     
    -            // filter by input keys
    -            val outKeys = inOutNames.filter(io => 
keys.get.contains(io._1)).map(_._2)
    -            // check if all keys have been preserved
    -            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
    +            // filter by input keyAncestors
    +            val outKeyAncesters = inOutNames
    +              .filter(io => keyAncestors.get.map(e => 
e._1).contains(io._1))
    +              .map(io => (io._2, keyAncestors.get.find(ka => ka._1 == 
io._1).get._2))
    +
    +            // check if all keyAncestors have been preserved
    +            if (outKeyAncesters.nonEmpty &&
    +              outKeyAncesters.map(ka => ka._2).distinct.length ==
    +                keyAncestors.get.map(ka => ka._2).distinct.length) {
                   // all key have been preserved (but possibly renamed)
    -              keys = Some(outKeys.toArray)
    +              Some(outKeyAncesters.toList)
                 } else {
                   // some (or all) keys have been removed. Keys are no longer 
unique and removed
    -              keys = None
    +              None
                 }
    +          } else {
    +            None
               }
    +
             case _: DataStreamOverAggregate =>
    -          super.visit(node, ordinal, parent)
    -        // keys are always forwarded by Over aggregate
    +          // keyAncestors are always forwarded by Over aggregate
    +          visit(node.getInput(0))
             case a: DataStreamGroupAggregate =>
    -          // get grouping keys
    +          // get grouping keyAncestors
               val groupKeys = 
a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
    -          keys = Some(groupKeys.toArray)
    +          Some(groupKeys.map(e => (e, e)).toList)
             case w: DataStreamGroupWindowAggregate =>
    -          // get grouping keys
    +          // get grouping keyAncestors
               val groupKeys =
                 
w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
               // get window start and end time
               val windowStartEnd = w.getWindowProperties.map(_.name)
               // we have only a unique key if at least one window property is 
selected
               if (windowStartEnd.nonEmpty) {
    -            keys = Some(groupKeys ++ windowStartEnd)
    +            Some((groupKeys ++ windowStartEnd).map(e => (e, e)).toList)
    +          } else {
    +            None
    +          }
    +
    +        case j: DataStreamJoin =>
    +          val leftKeyAncestors = visit(j.getLeft)
    +          val rightKeyAncestors = visit(j.getRight)
    +          if (!leftKeyAncestors.isDefined || !rightKeyAncestors.isDefined) 
{
    +            None
    +          } else {
    +            // both left and right contain keys
    +            val leftJoinKeys =
    +              j.getLeft.getRowType.getFieldNames.asScala.zipWithIndex
    +              .filter(e => j.getJoinInfo.leftKeys.contains(e._2))
    +              .map(e => e._1)
    +            val rightJoinKeys =
    +              j.getRight.getRowType.getFieldNames.asScala.zipWithIndex
    +                .filter(e => j.getJoinInfo.rightKeys.contains(e._2))
    +                .map(e => e._1)
    +
    +            val leftKeys = leftKeyAncestors.get.map(e => e._1)
    +            val rightKeys = rightKeyAncestors.get.map(e => e._1)
    +
    +            //1. join key = left key = right key
    +            if (leftJoinKeys == leftKeys && rightJoinKeys == rightKeys) {
    --- End diff --
    
    I think this condition is too strict. We have to check for containment not 
equality. 
    Equi join predicates on additional attributes are fine and also the 
assignment of keys to join predicates is not relevant (`l_key1 = r_key1 AND 
l_key2 = r_key2` is equivalent to `l_key1 = r_key2 AND l_key2 = r_key1`). 
    
    Moreover, we have to handle fields with common ancestors when checking for 
containment in the join attributes. If we have an input table that has two 
fields which resolve to the same key field, only on of the fields needs to be 
included in the join predicates.
    
    So we could do something like:
    
        // create maps to look up the ancestor of each key field
        val leftKeyAncestorMap: Map[String, String] = leftKeyAncestors.get.toMap
        val rightKeyAncestorMap: Map[String, String] = 
rightKeyAncestors.get.toMap
    
        // resolve fields to their ancestor if known
        val resolvedLeftJoinKeys = leftJoinKeys
          .map(k => leftKeyAncestorMap.getOrElse(k, k))
          .distinct
        val resolvedRightJoinKeys = rightJoinKeys
          .map(k => rightKeyAncestorMap.getOrElse(k, k))
          .distinct
    
        // resolve keys to their ancestor
        val resolvedLeftKeys = leftKeyAncestors.get.map(_._2).distinct
        val resolvedRightKeys = rightKeyAncestors.get.map(_._2).distinct
    
        // check that if all unique keys are included in the join fields 
        val joinsOnLeftKey = 
resolvedLeftKeys.forall(resolvedLeftJoinKeys.contains)
        val joinsOnRightKey = 
resolvedRightKeys.forall(resolvedRightJoinKeys.contains)
    
        if (joinsOnLeftKey && joinsOnRightKey) {
          // forward both keys
        } else if (joinsOnLeftKey) {
          // forward right keys
        } else if (joinsOnRightKey) {
          // forward left keys
        } else {
          // do not forward any keys
        }
    
    When forwarding keys, we have to make sure that all fields that are equal 
resolve to the same ancestor field. This means if we have a condition such as 
`WHERE l_key1 == r_key1 && l._key1 = r_nonKey1`, all three fields become keys 
and have the same ancestor. I propose the lexicographical smallest field name. 
So we would have in this case `[(l_key1, l_key1), (r_key1, l_key1), (r_nonKey1, 
l_key1)]`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to