Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5140#discussion_r158737260
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
 ---
    @@ -142,50 +143,47 @@ class DataStreamWindowJoin(
             s"${joinConditionToString(schema.relDataType, joinCondition, 
getExpressionString)}), " +
             s"join: (${joinSelectionToString(schema.relDataType)})"
     
    -    joinType match {
    -      case JoinRelType.INNER =>
    -        if (relativeWindowSize < 0) {
    -          LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
    -            " please check the join conditions.")
    -          createEmptyInnerJoin(leftDataStream, rightDataStream, 
returnTypeInfo)
    -        } else {
    -          if (isRowTime) {
    -            createRowTimeInnerJoin(
    -              leftDataStream,
    -              rightDataStream,
    -              returnTypeInfo,
    -              joinOpName,
    -              joinFunction.name,
    -              joinFunction.code,
    -              leftKeys,
    -              rightKeys
    -            )
    -          } else {
    -            createProcTimeInnerJoin(
    -              leftDataStream,
    -              rightDataStream,
    -              returnTypeInfo,
    -              joinOpName,
    -              joinFunction.name,
    -              joinFunction.code,
    -              leftKeys,
    -              rightKeys
    -            )
    -          }
    -        }
    -      case JoinRelType.FULL =>
    -        throw new TableException(
    -          "Full join between stream and stream is not supported yet.")
    -      case JoinRelType.LEFT =>
    -        throw new TableException(
    -          "Left join between stream and stream is not supported yet.")
    -      case JoinRelType.RIGHT =>
    -        throw new TableException(
    -          "Right join between stream and stream is not supported yet.")
    +    val flinkJoinType = joinType match {
    +      case JoinRelType.INNER => JoinType.INNER
    +      case JoinRelType.FULL => JoinType.FULL_OUTER
    +      case JoinRelType.LEFT => JoinType.LEFT_OUTER
    +      case JoinRelType.RIGHT => JoinType.RIGHT_OUTER
    +    }
    +
    +    if (relativeWindowSize < 0) {
    +      LOG.warn(s"The relative window size $relativeWindowSize is 
negative," +
    +        " please check the join conditions.")
    +      createEmptyJoin(leftDataStream, rightDataStream, returnTypeInfo)
    --- End diff --
    
    Empty outer joins need to be handled differently than empty inner joins 
because the records of the outer side(s) must be preserved and padded with 
nulls. Hence, we need to pass the join type and the generated code.


---

Reply via email to