Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5327#discussion_r184995707
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
---
@@ -176,14 +179,34 @@ class DataStreamJoin(
body,
returnType)
- val coMapFun =
- new NonWindowInnerJoin(
- leftSchema.typeInfo,
- rightSchema.typeInfo,
- CRowTypeInfo(returnType),
- genFunction.name,
- genFunction.code,
- queryConfig)
+ val coMapFun = joinType match {
+ case JoinRelType.INNER =>
+ new NonWindowInnerJoin(
+ leftSchema.typeInfo,
+ rightSchema.typeInfo,
+ CRowTypeInfo(returnType),
+ genFunction.name,
+ genFunction.code,
+ queryConfig)
+ case JoinRelType.LEFT if joinInfo.isEqui =>
+ new NonWindowLeftRightJoin(
+ leftSchema.typeInfo,
+ rightSchema.typeInfo,
+ CRowTypeInfo(returnType),
+ genFunction.name,
+ genFunction.code,
+ joinType == JoinRelType.LEFT,
+ queryConfig)
+ case JoinRelType.LEFT =>
--- End diff --
I planed to add right join in FLINK-8429. It's ok to add right join in this
pr if you prefer.
---