This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new c677334  [FLINK-16268][table-planner-blink] Failed to run rank over 
window with Hive built-in functions
c677334 is described below

commit c677334727bc388fd9fd051c7aac0cf6e595d82c
Author: Rui Li <[email protected]>
AuthorDate: Tue Nov 3 15:06:49 2020 +0800

    [FLINK-16268][table-planner-blink] Failed to run rank over window with Hive 
built-in functions
    
    This closes #13857
---
 .../flink/table/module/hive/HiveModuleTest.java     | 19 +++++++++++++++++++
 .../expressions/PlannerTypeInferenceUtilImpl.java   | 21 +++++++++++++++++++++
 .../planner/functions/utils/HiveFunctionUtils.java  |  2 +-
 .../functions/utils/UserDefinedFunctionUtils.scala  |  6 +++++-
 4 files changed, 46 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 343d818..be8a75c 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.hive.HiveSimpleUDF;
+import org.apache.flink.table.module.CoreModule;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
@@ -185,4 +186,22 @@ public class HiveModuleTest {
                results = Lists.newArrayList(tableEnv.sqlQuery("select 
length('')").execute().collect());
                assertEquals("[0]", results.toString());
        }
+
+       @Test
+       // tests to verify we have set arguments for hive udf before trying to 
get result type
+       public void testHiveUDFSetArguments() throws Exception {
+               TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+
+               tableEnv.unloadModule("core");
+               tableEnv.loadModule("hive", new HiveModule());
+               tableEnv.loadModule("core", CoreModule.INSTANCE);
+
+               String path = 
getClass().getResource("/csv/test.csv").toURI().toString();
+               tableEnv.executeSql(String.format(
+                               "create table src(x int,y int) with 
('connector'='filesystem','format'='csv','path'='%s')", path));
+
+               Lists.newArrayList(tableEnv.executeSql("select x from src where 
y is not null limit 10").collect());
+               Lists.newArrayList(tableEnv.executeSql("select count(distinct 
if(y is null, 0, y)) from src where x=-1 limit 1").collect());
+               Lists.newArrayList(tableEnv.executeSql("select x, rank() over 
(partition by x order by y) from src").collect());
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
index 4640377..d813ccb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
@@ -25,11 +25,15 @@ import 
org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.functions.utils.HiveFunctionUtils;
 import org.apache.flink.table.planner.typeutils.TypeCoercion;
 import org.apache.flink.table.planner.validate.ValidationFailure;
 import org.apache.flink.table.planner.validate.ValidationResult;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeInferenceUtil;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -74,6 +78,23 @@ public final class PlannerTypeInferenceUtilImpl implements 
PlannerTypeInferenceU
                                .map(ResolvedExpression::getOutputDataType)
                                .collect(Collectors.toList());
 
+                       if (plannerCall instanceof PlannerScalarFunctionCall) {
+                               ScalarFunction scalarFunction = 
((PlannerScalarFunctionCall) plannerCall).scalarFunction();
+                               // need to set arg types for Hive functions
+                               if 
(HiveFunctionUtils.isHiveFunc(scalarFunction)) {
+                                       LogicalType[] logicalTypes = 
expectedArgumentTypes.stream()
+                                                       
.map(DataType::getLogicalType).toArray(LogicalType[]::new);
+                                       Object[] constArgs = new 
Object[logicalTypes.length];
+                                       for (int i = 0; i < constArgs.length; 
i++) {
+                                               if (resolvedArgs.get(i) 
instanceof ValueLiteralExpression) {
+                                                       ValueLiteralExpression 
literalExpression = (ValueLiteralExpression) resolvedArgs.get(i);
+                                                       constArgs[i] = 
literalExpression.getValueAs(Object.class).orElse(null);
+                                               }
+                                       }
+                                       
HiveFunctionUtils.invokeSetArgs(scalarFunction, constArgs, logicalTypes);
+                               }
+                       }
+
                        return new TypeInferenceUtil.Result(
                                expectedArgumentTypes,
                                null,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
index 13a82cb..7939f47 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
@@ -51,7 +51,7 @@ public class HiveFunctionUtils {
 
        }
 
-       static Serializable invokeSetArgs(
+       public static Serializable invokeSetArgs(
                        Serializable function, Object[] constantArguments, 
LogicalType[] argTypes) {
                try {
                        // See hive HiveFunction
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index 471a39b..026ddc2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -467,7 +467,11 @@ object UserDefinedFunctionUtils {
       displayName: String,
       function: ScalarFunction,
       typeFactory: FlinkTypeFactory): SqlFunction = {
-    new ScalarSqlFunction(identifier, displayName, function, typeFactory)
+    if (HiveFunctionUtils.isHiveFunc(function)) {
+      new HiveScalarSqlFunction(identifier, function, typeFactory)
+    } else {
+      new ScalarSqlFunction(identifier, displayName, function, typeFactory)
+    }
   }
 
   /**

Reply via email to