This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 4368f1d  [FLINK-14547][table-planner-blink] Fix UDF cannot in the join 
condition in blink planner for Table API.
4368f1d is described below

commit 4368f1da8598f993523dd18edef4b90f88eab4a1
Author: huangxingbo <[email protected]>
AuthorDate: Mon Oct 28 18:23:04 2019 +0800

    [FLINK-14547][table-planner-blink] Fix UDF cannot in the join condition in 
blink planner for Table API.
    
    Currently, UDF can not be used in the join condition for Table API in blink 
planner(SQL is ok).
    
    This closes #10016.
---
 .../table/planner/plan/QueryOperationConverter.java    |  8 +++++++-
 .../flink/table/planner/plan/batch/table/JoinTest.xml  | 18 ++++++++++++++++++
 .../table/planner/plan/batch/table/JoinTest.scala      | 10 ++++++++++
 3 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index e0f4763..92f7d38 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -464,9 +464,15 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                return new RexNodeExpression(convertedNode, 
((ResolvedExpression) expr).getOutputDataType());
                        }).collect(Collectors.toList());
 
-                       CallExpression newCall = new CallExpression(
+                       CallExpression newCall;
+                       if (callExpression.getObjectIdentifier().isPresent()) {
+                               newCall = new CallExpression(
                                        
callExpression.getObjectIdentifier().get(), 
callExpression.getFunctionDefinition(), newChildren,
                                        callExpression.getOutputDataType());
+                       } else {
+                               newCall = new CallExpression(
+                                       callExpression.getFunctionDefinition(), 
newChildren, callExpression.getOutputDataType());
+                       }
                        return convertExprToRexNode(newCall);
                }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml
index 0bbbc52..1de3388 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/JoinTest.xml
@@ -277,4 +277,22 @@ Calc(select=[b, y])
 ]]>
     </Resource>
   </TestCase>
+       <TestCase name="testUDFInJoinCondition">
+               <Resource name="planBefore">
+                       <![CDATA[
+LogicalJoin(condition=[AND(=($1, $4), 
=(org$apache$flink$table$planner$plan$batch$table$JoinTest$Merger$$ad6edb4d4c8a8ac04216f9aeaab1e36f($0,
 $3), 10))], joinType=[inner])
+:- LogicalTableScan(table=[[default_catalog, default_database, left, source: 
[TestTableSource(a, b, c)]]])
++- LogicalTableScan(table=[[default_catalog, default_database, right, source: 
[TestTableSource(d, e, f)]]])
+]]>
+               </Resource>
+               <Resource name="planAfter">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[AND(=(b, e), =(Merger$(a, d), 10))], 
select=[a, b, c, d, e, f], build=[right])
+:- Exchange(distribution=[hash[b]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, left, source: 
[TestTableSource(a, b, c)]]], fields=[a, b, c])
++- Exchange(distribution=[hash[e]])
+   +- TableSourceScan(table=[[default_catalog, default_database, right, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+]]>
+               </Resource>
+       </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
index 5bfa82f..5c129d0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/JoinTest.scala
@@ -199,6 +199,16 @@ class JoinTest extends TableTestBase {
       .where('a < 'd)
       .select('c, 'g))
   }
+
+  @Test
+  def testUDFInJoinCondition(): Unit = {
+    val util = batchTestUtil()
+    val ds1 = util.addTableSource[(Int, Long, String)]("left",'a, 'b, 'c)
+    val ds2 = util.addTableSource[(Int, Long, String)]("right",'d, 'e, 'f)
+
+    val joinT = ds1.join(ds2, 'b === 'e && Merger('a, 'd) === 10)
+    util.verifyPlan(joinT)
+  }
 }
 
 object JoinTest {

Reply via email to