[
https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16003018#comment-16003018
]
ASF GitHub Bot commented on FLINK-5256:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3673#discussion_r115536343
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
---
@@ -138,33 +144,60 @@ class DataSetSingleRowJoin(
val condition = codeGenerator.generateExpression(joinCondition)
val joinMethodBody =
- s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- |
${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
+ if (joinType == JoinRelType.INNER) {
+ s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ |
${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
+ } else {
+ val singleNode =
+ if (!leftIsSingle) {
+ rightNode
+ }
+ else {
+ leftNode
+ }
+
+ val notSuitedToCondition = singleNode
+ .getRowType
+ .getFieldList
+ .map(field => getRowType.getFieldNames.indexOf(field.getName))
+ .map(i => s"${conversion.resultTerm}.setField($i,null);")
+
+ s"""
+ |${condition.code}
+ |${conversion.code}
+ |if(!${condition.resultTerm}){
+ |${notSuitedToCondition.mkString("\n")}
+ |}
+
|${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+ |""".stripMargin
+ }
val genFunction = codeGenerator.generateFunction(
ruleDescription,
classOf[FlatJoinFunction[Row, Row, Row]],
joinMethodBody,
returnType)
- if (firstIsSingle) {
- new MapJoinRightRunner[Row, Row, Row](
- genFunction.name,
- genFunction.code,
- genFunction.returnType,
- broadcastInputSetName)
- } else {
- new MapJoinLeftRunner[Row, Row, Row](
- genFunction.name,
- genFunction.code,
- genFunction.returnType,
- broadcastInputSetName)
- }
+ if (!leftIsSingle) {
--- End diff --
un-indent by 2 spaces
> Extend DataSetSingleRowJoin to support Left and Right joins
> -----------------------------------------------------------
>
> Key: FLINK-5256
> URL: https://issues.apache.org/jira/browse/FLINK-5256
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: Dmytro Shkvyra
>
> The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary
> inner joins where one input is a single row.
> I found that Calcite translates certain subqueries into non-equi left and
> right joins with single input. These cases can be handled if the
> {{DataSetSingleRowJoin}} is extended to support outer joins on the
> non-single-row input, i.e., left joins if the right side is single input and
> vice versa.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)