[ 
https://issues.apache.org/jira/browse/FLINK-6094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121700#comment-16121700
 ] 

ASF GitHub Bot commented on FLINK-6094:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r132229209
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
    @@ -90,40 +96,86 @@ object UpdatingPlanChecker {
                   // resolve names of input fields
                   .map(io => (inNames.get(io._1), io._2))
     
    -            // filter by input keys
    -            val outKeys = inOutNames.filter(io => 
keys.get.contains(io._1)).map(_._2)
    -            // check if all keys have been preserved
    -            if (outKeys.nonEmpty && outKeys.length == keys.get.length) {
    +            // filter by input keyAncestors
    +            val outKeyAncesters = inOutNames
    +              .filter(io => keyAncestors.get.map(e => 
e._1).contains(io._1))
    +              .map(io => (io._2, keyAncestors.get.find(ka => ka._1 == 
io._1).get._2))
    +
    +            // check if all keyAncestors have been preserved
    +            if (outKeyAncesters.nonEmpty &&
    +              outKeyAncesters.map(ka => ka._2).distinct.length ==
    +                keyAncestors.get.map(ka => ka._2).distinct.length) {
                   // all key have been preserved (but possibly renamed)
    -              keys = Some(outKeys.toArray)
    +              Some(outKeyAncesters.toList)
                 } else {
                   // some (or all) keys have been removed. Keys are no longer 
unique and removed
    -              keys = None
    +              None
                 }
    +          } else {
    +            None
               }
    +
             case _: DataStreamOverAggregate =>
    -          super.visit(node, ordinal, parent)
    -        // keys are always forwarded by Over aggregate
    +          // keyAncestors are always forwarded by Over aggregate
    +          visit(node.getInput(0))
             case a: DataStreamGroupAggregate =>
    -          // get grouping keys
    +          // get grouping keyAncestors
               val groupKeys = 
a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
    -          keys = Some(groupKeys.toArray)
    +          Some(groupKeys.map(e => (e, e)).toList)
             case w: DataStreamGroupWindowAggregate =>
    -          // get grouping keys
    +          // get grouping keyAncestors
               val groupKeys =
                 
w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
               // get window start and end time
               val windowStartEnd = w.getWindowProperties.map(_.name)
               // we have only a unique key if at least one window property is 
selected
               if (windowStartEnd.nonEmpty) {
    -            keys = Some(groupKeys ++ windowStartEnd)
    +            Some((groupKeys ++ windowStartEnd).map(e => (e, e)).toList)
    +          } else {
    +            None
    +          }
    +
    +        case j: DataStreamJoin =>
    +          val leftKeyAncestors = visit(j.getLeft)
    +          val rightKeyAncestors = visit(j.getRight)
    +          if (!leftKeyAncestors.isDefined || !rightKeyAncestors.isDefined) 
{
    +            None
    +          } else {
    +            // both left and right contain keys
    +            val leftJoinKeys =
    +              j.getLeft.getRowType.getFieldNames.asScala.zipWithIndex
    +              .filter(e => j.getJoinInfo.leftKeys.contains(e._2))
    +              .map(e => e._1)
    +            val rightJoinKeys =
    +              j.getRight.getRowType.getFieldNames.asScala.zipWithIndex
    +                .filter(e => j.getJoinInfo.rightKeys.contains(e._2))
    +                .map(e => e._1)
    +
    +            val leftKeys = leftKeyAncestors.get.map(e => e._1)
    +            val rightKeys = rightKeyAncestors.get.map(e => e._1)
    +
    +            //1. join key = left key = right key
    +            if (leftJoinKeys == leftKeys && rightJoinKeys == rightKeys) {
    +              Some(leftKeyAncestors.get ::: (rightKeyAncestors.get.map(e 
=> (e._1)) zip
    --- End diff --
    
    `map(e => (e._1))` -> `map(_._1)`


> Implement stream-stream proctime non-window  inner join
> -------------------------------------------------------
>
>                 Key: FLINK-6094
>                 URL: https://issues.apache.org/jira/browse/FLINK-6094
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> This includes:
> 1.Implement stream-stream proctime non-window  inner join
> 2.Implement the retract process logic for join



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to