Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4471#discussion_r152833454
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 ---
    @@ -116,14 +135,100 @@ object UpdatingPlanChecker {
               val windowStartEnd = w.getWindowProperties.map(_.name)
               // we have only a unique key if at least one window property is 
selected
               if (windowStartEnd.nonEmpty) {
    -            keys = Some(groupKeys ++ windowStartEnd)
    +            val smallestAttribute = windowStartEnd.sorted.head
    +            Some((groupKeys.map(e => (e, e)) ++ windowStartEnd.map((_, 
smallestAttribute))).toList)
    +          } else {
    +            None
    +          }
    +
    +        case j: DataStreamJoin =>
    +          val joinType = j.getJoinType
    +          joinType match {
    +            case JoinRelType.INNER => {
    +              // get key(s) for inner join
    +              val lInputKeys = visit(j.getLeft)
    +              val rInputKeys = visit(j.getRight)
    +              if (lInputKeys.isEmpty || rInputKeys.isEmpty) {
    +                None
    +              } else {
    +                // Output of inner join must have keys if left and right 
both contain key(s).
    +                // Key groups from both side will be merged by join 
equi-predicates
    +                val lFieldNames: Seq[String] = 
j.getLeft.getRowType.getFieldNames
    +                val rFieldNames: Seq[String] = 
j.getRight.getRowType.getFieldNames
    +                val lJoinKeys: Seq[String] = 
j.getJoinInfo.leftKeys.map(lFieldNames.get(_))
    +                val rJoinKeys: Seq[String] = 
j.getJoinInfo.rightKeys.map(rFieldNames.get(_))
    +
    +                getOutputKeysForInnerJoin(
    +                  lFieldNames ++ rFieldNames,
    +                  lInputKeys.get ++ rInputKeys.get,
    +                  lJoinKeys.zip(rJoinKeys).toList
    +                )
    +              }
    +            }
    +            case _ => throw new UnsupportedOperationException(
    +              s"An Unsupported JoinType [ $joinType ]")
               }
             case _: DataStreamRel =>
    -          // anything else does not forward keys or might duplicate key, 
so we can stop
    -          keys = None
    +          // anything else does not forward keys, so we can stop
    +          None
           }
         }
     
    -  }
     
    +    def getOutputKeysForInnerJoin(
    +        inNames: Seq[String],
    +        inKeys: List[(String, String)],
    +        joinKeys: List[(String, String)])
    +    : Option[List[(String, String)]] = {
    +
    +      val nameToGroups = mutable.HashMap.empty[String,String]
    +
    +      // merge two groups
    +      def merge(nameA: String, nameB: String): Unit = {
    +        val ga: String = findGroup(nameA);
    --- End diff --
    
    Remove semicolons.


---

Reply via email to