[ 
https://issues.apache.org/jira/browse/FLINK-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16212509#comment-16212509
 ] 

ASF GitHub Bot commented on FLINK-7755:
---------------------------------------

Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4858#discussion_r145938918
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
    @@ -156,65 +163,394 @@ class DataSetJoin(
         val leftDataSet = 
left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
         val rightDataSet = 
right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
    -    val (joinOperator, nullCheck) = joinType match {
    -      case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
    -      case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), 
true)
    -      case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), 
true)
    -      case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), 
true)
    +    joinType match {
    +      case JoinRelType.INNER =>
    +        addInnerJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.LEFT =>
    +        addLeftOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.RIGHT =>
    +        addRightOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
    +      case JoinRelType.FULL =>
    +        addFullOuterJoin(
    +          leftDataSet,
    +          rightDataSet,
    +          leftKeys.toArray,
    +          rightKeys.toArray,
    +          returnType,
    +          config)
         }
    +  }
     
    -    if (nullCheck && !config.getNullCheck) {
    -      throw TableException("Null check in TableConfig must be enabled for 
outer joins.")
    -    }
    +  private def addInnerJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
     
         val generator = new FunctionCodeGenerator(
           config,
    -      nullCheck,
    -      leftDataSet.getType,
    -      Some(rightDataSet.getType))
    +      false,
    +      left.getType,
    +      Some(right.getType))
         val conversion = generator.generateConverterResultExpression(
    -      returnType,
    +      resultType,
           joinRowType.getFieldNames)
     
    -    var body = ""
    +    val condition = generator.generateExpression(joinCondition)
    +    val body =
    +      s"""
    +         |${condition.code}
    +         |if (${condition.resultTerm}) {
    +         |  ${conversion.code}
    +         |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
    +         |}
    +         |""".stripMargin
     
    -    if (joinInfo.isEqui) {
    -      // only equality condition
    -      body = s"""
    -           |${conversion.code}
    -           |${generator.collectorTerm}.collect(${conversion.resultTerm});
    -           |""".stripMargin
    -    }
    -    else {
    -      val nonEquiPredicates = 
joinInfo.getRemaining(this.cluster.getRexBuilder)
    -      val condition = generator.generateExpression(nonEquiPredicates)
    -      body = s"""
    -           |${condition.code}
    -           |if (${condition.resultTerm}) {
    -           |  ${conversion.code}
    -           |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
    -           |}
    -           |""".stripMargin
    -    }
         val genFunction = generator.generateFunction(
           ruleDescription,
           classOf[FlatJoinFunction[Row, Row, Row]],
           body,
    -      returnType)
    +      resultType)
     
         val joinFun = new FlatJoinRunner[Row, Row, Row](
           genFunction.name,
           genFunction.code,
           genFunction.returnType)
     
    -    val joinOpName =
    -      s"where: (${joinConditionToString(joinRowType, joinCondition, 
getExpressionString)}), " +
    -        s"join: (${joinSelectionToString(joinRowType)})"
    +    left.join(right)
    +      .where(leftKeys: _*)
    +      .equalTo(rightKeys: _*)
    +      .`with`(joinFun)
    +      .name(getJoinOpName)
    +  }
    +
    +  private def addLeftOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for 
outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
    +
    +    // replace field names by indexed names for easier key handling
    +    val leftType = new 
RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +    val rightType = right.getType.asInstanceOf[RowTypeInfo]
    +
    +    // partition and sort left input
    +    // this step ensures we can reuse the sorting for all following 
operations
    +    // (groupBy->join->groupBy)
    +    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, 
leftKeys)
    +
    +    // deduplicate the rows of the left input
    +    val deduplicatedRowsLeft: DataSet[Row] = 
deduplicateRows(partitionedSortedLeft, leftType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
    +    val joinFun = new LeftOuterJoinRunner(predFun.name, predFun.code, 
joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join 
pairs
    +    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
    +    val joinPairs = deduplicatedRowsLeft.leftOuterJoin(right, 
JoinHint.REPARTITION_SORT_MERGE)
    +      .where(nestedLeftKeys: _*)
    +      .equalTo(rightKeys: _*)
    +      .`with`(joinFun)
    +      .withForwardedFieldsFirst("f0->f0")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunction to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, 
resultType, config)
    +    val reduceFun = new LeftOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // convert join pairs to result.
    +    // This step ensures we preserve the rows of the left input.
    +    joinPairs
    +      .groupBy("f0")
    +      .reduceGroup(reduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +  }
    +
    +  private def addRightOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for 
outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
     
    -    joinOperator
    -      .where(leftKeys.toArray: _*)
    -      .equalTo(rightKeys.toArray: _*)
    +    // replace field names by indexed names for easier key handling
    +    val leftType = left.getType.asInstanceOf[RowTypeInfo]
    +    val rightType = new 
RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +
    +    // partition and sort right input
    +    // this step ensures we can reuse the sorting for all following 
operations
    +    // (groupBy->join->groupBy)
    +    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, 
rightKeys)
    +
    +    // deduplicate the rows of the right input
    +    val deduplicatedRowsRight: DataSet[Row] = 
deduplicateRows(partitionedSortedRight, rightType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT)
    +    val joinFun = new RightOuterJoinRunner(predFun.name, predFun.code, 
joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join 
pairs
    +    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
    +    val joinPairs = left.rightOuterJoin(deduplicatedRowsRight, 
JoinHint.REPARTITION_SORT_MERGE)
    +      .where(leftKeys: _*)
    +      .equalTo(nestedRightKeys: _*)
           .`with`(joinFun)
    +      .withForwardedFieldsSecond("f0->f1")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunction to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, 
resultType, config)
    +    val reduceFun = new RightOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // convert join pairs to result
    +    // This step ensures we preserve the rows of the right input.
    +    joinPairs
    +      .groupBy("f1")
    +      .reduceGroup(reduceFun)
           .name(joinOpName)
    +      .returns(resultType)
       }
    +
    +  private def addFullOuterJoin(
    +      left: DataSet[Row],
    +      right: DataSet[Row],
    +      leftKeys: Array[Int],
    +      rightKeys: Array[Int],
    +      resultType: TypeInformation[Row],
    +      config: TableConfig): DataSet[Row] = {
    +
    +    if (!config.getNullCheck) {
    +      throw TableException("Null check in TableConfig must be enabled for 
outer joins.")
    +    }
    +
    +    val joinOpName = getJoinOpName
    +
    +    // replace field names by indexed names for easier key handling
    +    val leftType = new 
RowTypeInfo(left.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +    val rightType = new 
RowTypeInfo(right.getType.asInstanceOf[RowTypeInfo].getFieldTypes: _*)
    +
    +    // partition and sort left and right input
    +    // this step ensures we can reuse the sorting for all following 
operations
    +    // (groupBy->join->groupBy), except the second grouping to preserve 
right rows.
    +    val partitionedSortedLeft: DataSet[Row] = partitionAndSort(left, 
leftKeys)
    +    val partitionedSortedRight: DataSet[Row] = partitionAndSort(right, 
rightKeys)
    +
    +    // deduplicate the rows of the left and right input
    +    val deduplicatedRowsLeft: DataSet[Row] = 
deduplicateRows(partitionedSortedLeft, leftType)
    +    val deduplicatedRowsRight: DataSet[Row] = 
deduplicateRows(partitionedSortedRight, rightType)
    +
    +    // create JoinFunction to evaluate join predicate
    +    val predFun = generatePredicateFunction(leftType, rightType, config)
    +    val joinOutType = new RowTypeInfo(leftType, rightType, Types.INT, 
Types.INT)
    +    val joinFun = new FullOuterJoinRunner(predFun.name, predFun.code, 
joinOutType)
    +
    +    // join left and right inputs, evaluate join predicate, and emit join 
pairs
    +    val nestedLeftKeys = leftKeys.map(i => s"f0.f$i")
    +    val nestedRightKeys = rightKeys.map(i => s"f0.f$i")
    +    val joinPairs = deduplicatedRowsLeft
    +      .fullOuterJoin(deduplicatedRowsRight, 
JoinHint.REPARTITION_SORT_MERGE)
    +      .where(nestedLeftKeys: _*)
    +      .equalTo(nestedRightKeys: _*)
    +      .`with`(joinFun)
    +      .withForwardedFieldsFirst("f0->f0")
    +      .withForwardedFieldsSecond("f0->f1")
    +      .name(joinOpName)
    +
    +    // create GroupReduceFunctions to generate the join result
    +    val convFun = generateConversionFunction(leftType, rightType, 
resultType, config)
    +    val leftReduceFun = new LeftFullOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +    val rightReduceFun = new RightFullOuterJoinGroupReduceRunner(
    +      convFun.name,
    +      convFun.code,
    +      convFun.returnType)
    +
    +    // compute joined (left + right) and left preserved (left + null)
    +    val joinedAndLeftPreserved = joinPairs
    +      // filter for pairs with left row
    +      .filter(new FilterFunction[Row](){
    +        override def filter(row: Row): Boolean = row.getField(0) != null})
    +      .groupBy("f0")
    +      .reduceGroup(leftReduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +
    +    // compute right preserved (null + right)
    +    val rightPreserved = joinPairs
    +      // filter for pairs with right row
    +      .filter(new FilterFunction[Row](){
    +        override def filter(row: Row): Boolean = row.getField(1) != null})
    +      .groupBy("f1")
    +      .reduceGroup(rightReduceFun)
    +      .name(joinOpName)
    +      .returns(resultType)
    +
    +    // union joined (left + right), left preserved (left + null), and 
right preserved (null + right)
    +    joinedAndLeftPreserved.union(rightPreserved)
    +  }
    +
    +  private def getJoinOpName: String = {
    +    s"where: (${joinConditionToString(joinRowType, joinCondition, 
getExpressionString)}), " +
    +      s"join: (${joinSelectionToString(joinRowType)})"
    +  }
    +
    +  /** Returns an array of indicies with some indicies being a prefix. */
    +  private def getFullIndiciesWithPrefix(keys: Array[Int], numFields: Int): 
Array[Int] = {
    +    // get indicies of all fields which are not keys
    +    val nonKeys = (0 until numFields).filter(i => !keys.contains(i))
    --- End diff --
    
    Could be simplified with '_'.


> Null values are not correctly handled by batch inner and outer joins
> --------------------------------------------------------------------
>
>                 Key: FLINK-7755
>                 URL: https://issues.apache.org/jira/browse/FLINK-7755
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> Join predicates of batch joins are not correctly evaluated according to 
> three-value logic.
> This affects inner as well as outer joins.
> The problem is that some equality predicates are only evaluated by the 
> internal join algorithms of Flink which are based on {{TypeComparator}}. The 
> field {{TypeComparator}} for {{Row}} are implemented such that {{null == 
> null}} results in {{TRUE}} to ensure correct ordering and grouping. However, 
> three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
> null). The code generator implements this logic correctly, but for equality 
> predicates, no code is generated.
> For outer joins, the problem is a bit tricker because these do not support 
> code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a 
> solution for this issue.
> We also need to extend several of the existing tests and add null values to 
> ensure that the join logic is correctly implemented. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to