This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 5ea9cc11492 [FLINK-27718][hive] Fix fail to count mutiple fields 
excpetion in Hive dialect (#19406)
5ea9cc11492 is described below

commit 5ea9cc114921b3c1b2ab3cd8751ff4ef6c7c1329
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Sep 5 21:10:08 2022 +0800

    [FLINK-27718][hive] Fix fail to count mutiple fields excpetion in Hive 
dialect (#19406)
---
 .../table/functions/hive/HiveGenericUDAF.java      | 29 ++++++++++++++++++++--
 .../apache/flink/table/module/hive/HiveModule.java |  1 -
 .../planner/delegation/hive/HiveParserUtils.java   | 12 ++++++++-
 .../connectors/hive/HiveDialectQueryITCase.java    | 22 ++++++++++++++++
 .../flink/table/module/hive/HiveModuleTest.java    |  4 +--
 5 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 25e1ca058c3..031a6632294 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.UDAF;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
@@ -128,6 +129,30 @@ public class HiveGenericUDAF
 
     public GenericUDAFEvaluator createEvaluator(ObjectInspector[] 
inputInspectors)
             throws SemanticException {
+        // currently, we have no way to set `distinct`,`allColumns` of
+        // UDAFParameterInfo. so, assuming it's to be FALSE, FALSE
+        boolean distinct = Boolean.FALSE;
+        boolean allColumns = Boolean.FALSE;
+
+        // special case for Hive's GenericUDAFCount, we need to set 
`distinct`,`allColumns` for the
+        // function will do validation according to these two value.
+        if 
(hiveFunctionWrapper.getUDFClassName().equals(GenericUDAFCount.class.getName()))
 {
+            // set the value for `distinct`,`allColumns` according to the 
number of input arguments.
+            // it's fine for it'll try to call 
HiveParserUtils#getGenericUDAFEvaluator with the
+            // value for `distinct`,`allColumns` parsed in parse phase and once
+            // arrives here, it means the validation is passed
+            if (inputInspectors.length == 0) {
+                // if no arguments, it must be count(*)
+                allColumns = Boolean.TRUE;
+            } else if (inputInspectors.length > 1) {
+                // if the arguments are more than one, it must be distinct
+                // NOTE: only in Hive dialect, it must be distinct as it'll be 
validated by
+                // HiveParser. But in Flink SQL, it may not be distinct since 
there's no such the
+                // validation in Flink SQL.
+                distinct = Boolean.TRUE;
+            }
+        }
+
         SimpleGenericUDAFParameterInfo parameterInfo =
                 hiveShim.createUDAFParameterInfo(
                         inputInspectors,
@@ -141,14 +166,14 @@ public class HiveGenericUDAF
                         // function implementation
                         // is not expected to ensure the distinct property for 
the parameter values.
                         // That is handled by the framework.
-                        Boolean.FALSE,
+                        distinct,
                         // Returns true if the UDAF invocation was done via 
the wildcard syntax
                         // FUNCTION(*).
                         // Note that this is provided for informational 
purposes only and the
                         // function implementation
                         // is not expected to ensure the wildcard handling of 
the target relation.
                         // That is handled by the framework.
-                        Boolean.FALSE);
+                        allColumns);
 
         if (isUDAFBridgeRequired) {
             return new GenericUDAFBridge((UDAF) 
hiveFunctionWrapper.createFunction())
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index 6e8a21f270f..ba9254ac24f 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -51,7 +51,6 @@ public class HiveModule implements Module {
             Collections.unmodifiableSet(
                     new HashSet<>(
                             Arrays.asList(
-                                    "count",
                                     "cume_dist",
                                     "current_date",
                                     "current_timestamp",
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
index 54b8b8be1df..3356410114d 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
@@ -1607,6 +1607,8 @@ public class HiveParserUtils {
         final SqlAggFunction aggFunc =
                 HiveParserSqlFunctionConverter.getCalciteAggFn(
                         aggInfo.getUdfName(), aggInfo.isDistinct(), 
calciteArgTypes, aggFnRetType);
+        SqlAggFunction convertedAggFunction =
+                (SqlAggFunction) funcConverter.convertOperator(aggFunc);
 
         // If we have input arguments, set type to null (instead of 
aggFnRetType) to let
         // AggregateCall
@@ -1614,9 +1616,17 @@ public class HiveParserUtils {
         RelDataType type = null;
         if (aggInfo.isAllColumns() && argIndices.isEmpty()) {
             type = aggFnRetType;
+            if (funcConverter.hasOverloadedOp(
+                    convertedAggFunction, 
SqlFunctionCategory.USER_DEFINED_FUNCTION)) {
+                // it means the agg function will delegate to Hive's built-in 
function.
+                // Hive's UDAF always infer nullable datatype.
+                // so make it nullable to avoid inferred type of the agg 
function is different from
+                // the type it was given when it was created.
+                type = typeFactory.createTypeWithNullability(aggFnRetType, 
true);
+            }
         }
         return createAggregateCall(
-                (SqlAggFunction) funcConverter.convertOperator(aggFunc),
+                convertedAggFunction,
                 aggInfo.isDistinct(),
                 false,
                 false,
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 54bdb88a820..d305e34ef2d 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -816,6 +816,28 @@ public class HiveDialectQueryITCase {
         }
     }
 
+    @Test
+    public void testCount() throws Exception {
+        tableEnv.executeSql("create table abcd (a int, b int, c int, d int)");
+        tableEnv.executeSql(
+                        "insert into abcd values (null,35,23,6), (10, 100, 23, 
5), (10, 35, 23, 5)")
+                .await();
+        try {
+            List<Row> results =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql(
+                                            "select count(1), count(*), 
count(a),"
+                                                    + " count(distinct a,b), 
count(distinct b,d), count(distinct b, c) from abcd")
+                                    .collect());
+            assertThat(results.toString()).isEqualTo("[+I[3, 3, 2, 2, 3, 2]]");
+            assertThatThrownBy(() -> tableEnv.executeSql(" select count(a,b) 
from abcd"))
+                    .hasRootCauseInstanceOf(UDFArgumentException.class)
+                    .hasRootCauseMessage("DISTINCT keyword must be specified");
+        } finally {
+            tableEnv.executeSql("drop table abcd");
+        }
+    }
+
     private void runQFile(File qfile) throws Exception {
         QTest qTest = extractQTest(qfile);
         for (int i = 0; i < qTest.statements.size(); i++) {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 842e71eb82c..9b42d9f8d00 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -74,10 +74,10 @@ public class HiveModuleTest {
     private void verifyNumBuiltInFunctions(String hiveVersion, HiveModule 
hiveModule) {
         switch (hiveVersion) {
             case HIVE_VERSION_V2_3_9:
-                assertThat(hiveModule.listFunctions()).hasSize(276);
+                assertThat(hiveModule.listFunctions()).hasSize(277);
                 break;
             case HIVE_VERSION_V3_1_1:
-                assertThat(hiveModule.listFunctions()).hasSize(295);
+                assertThat(hiveModule.listFunctions()).hasSize(296);
                 break;
             default:
                 fail("Unknown test version " + hiveVersion);

Reply via email to