This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 5ea9cc11492 [FLINK-27718][hive] Fix fail to count mutiple fields
excpetion in Hive dialect (#19406)
5ea9cc11492 is described below
commit 5ea9cc114921b3c1b2ab3cd8751ff4ef6c7c1329
Author: yuxia Luo <[email protected]>
AuthorDate: Mon Sep 5 21:10:08 2022 +0800
[FLINK-27718][hive] Fix fail to count mutiple fields excpetion in Hive
dialect (#19406)
---
.../table/functions/hive/HiveGenericUDAF.java | 29 ++++++++++++++++++++--
.../apache/flink/table/module/hive/HiveModule.java | 1 -
.../planner/delegation/hive/HiveParserUtils.java | 12 ++++++++-
.../connectors/hive/HiveDialectQueryITCase.java | 22 ++++++++++++++++
.../flink/table/module/hive/HiveModuleTest.java | 4 +--
5 files changed, 62 insertions(+), 6 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 25e1ca058c3..031a6632294 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;
@@ -128,6 +129,30 @@ public class HiveGenericUDAF
public GenericUDAFEvaluator createEvaluator(ObjectInspector[]
inputInspectors)
throws SemanticException {
+ // currently, we have no way to set `distinct`,`allColumns` of
+ // UDAFParameterInfo. so, assuming it's to be FALSE, FALSE
+ boolean distinct = Boolean.FALSE;
+ boolean allColumns = Boolean.FALSE;
+
+ // special case for Hive's GenericUDAFCount, we need to set
`distinct`,`allColumns` for the
+ // function will do validation according to these two value.
+ if
(hiveFunctionWrapper.getUDFClassName().equals(GenericUDAFCount.class.getName()))
{
+ // set the value for `distinct`,`allColumns` according to the
number of input arguments.
+ // it's fine for it'll try to call
HiveParserUtils#getGenericUDAFEvaluator with the
+ // value for `distinct`,`allColumns` parsed in parse phase and once
+ // arrives here, it means the validation is passed
+ if (inputInspectors.length == 0) {
+ // if no arguments, it must be count(*)
+ allColumns = Boolean.TRUE;
+ } else if (inputInspectors.length > 1) {
+ // if the arguments are more than one, it must be distinct
+ // NOTE: only in Hive dialect, it must be distinct as it'll be
validated by
+ // HiveParser. But in Flink SQL, it may not be distinct since
there's no such the
+ // validation in Flink SQL.
+ distinct = Boolean.TRUE;
+ }
+ }
+
SimpleGenericUDAFParameterInfo parameterInfo =
hiveShim.createUDAFParameterInfo(
inputInspectors,
@@ -141,14 +166,14 @@ public class HiveGenericUDAF
// function implementation
// is not expected to ensure the distinct property for
the parameter values.
// That is handled by the framework.
- Boolean.FALSE,
+ distinct,
// Returns true if the UDAF invocation was done via
the wildcard syntax
// FUNCTION(*).
// Note that this is provided for informational
purposes only and the
// function implementation
// is not expected to ensure the wildcard handling of
the target relation.
// That is handled by the framework.
- Boolean.FALSE);
+ allColumns);
if (isUDAFBridgeRequired) {
return new GenericUDAFBridge((UDAF)
hiveFunctionWrapper.createFunction())
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index 6e8a21f270f..ba9254ac24f 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -51,7 +51,6 @@ public class HiveModule implements Module {
Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
- "count",
"cume_dist",
"current_date",
"current_timestamp",
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
index 54b8b8be1df..3356410114d 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
@@ -1607,6 +1607,8 @@ public class HiveParserUtils {
final SqlAggFunction aggFunc =
HiveParserSqlFunctionConverter.getCalciteAggFn(
aggInfo.getUdfName(), aggInfo.isDistinct(),
calciteArgTypes, aggFnRetType);
+ SqlAggFunction convertedAggFunction =
+ (SqlAggFunction) funcConverter.convertOperator(aggFunc);
// If we have input arguments, set type to null (instead of
aggFnRetType) to let
// AggregateCall
@@ -1614,9 +1616,17 @@ public class HiveParserUtils {
RelDataType type = null;
if (aggInfo.isAllColumns() && argIndices.isEmpty()) {
type = aggFnRetType;
+ if (funcConverter.hasOverloadedOp(
+ convertedAggFunction,
SqlFunctionCategory.USER_DEFINED_FUNCTION)) {
+ // it means the agg function will delegate to Hive's built-in
function.
+ // Hive's UDAF always infer nullable datatype.
+ // so make it nullable to avoid inferred type of the agg
function is different from
+ // the type it was given when it was created.
+ type = typeFactory.createTypeWithNullability(aggFnRetType,
true);
+ }
}
return createAggregateCall(
- (SqlAggFunction) funcConverter.convertOperator(aggFunc),
+ convertedAggFunction,
aggInfo.isDistinct(),
false,
false,
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 54bdb88a820..d305e34ef2d 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -816,6 +816,28 @@ public class HiveDialectQueryITCase {
}
}
+ @Test
+ public void testCount() throws Exception {
+ tableEnv.executeSql("create table abcd (a int, b int, c int, d int)");
+ tableEnv.executeSql(
+ "insert into abcd values (null,35,23,6), (10, 100, 23,
5), (10, 35, 23, 5)")
+ .await();
+ try {
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql(
+ "select count(1), count(*),
count(a),"
+ + " count(distinct a,b),
count(distinct b,d), count(distinct b, c) from abcd")
+ .collect());
+ assertThat(results.toString()).isEqualTo("[+I[3, 3, 2, 2, 3, 2]]");
+ assertThatThrownBy(() -> tableEnv.executeSql(" select count(a,b)
from abcd"))
+ .hasRootCauseInstanceOf(UDFArgumentException.class)
+ .hasRootCauseMessage("DISTINCT keyword must be specified");
+ } finally {
+ tableEnv.executeSql("drop table abcd");
+ }
+ }
+
private void runQFile(File qfile) throws Exception {
QTest qTest = extractQTest(qfile);
for (int i = 0; i < qTest.statements.size(); i++) {
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 842e71eb82c..9b42d9f8d00 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -74,10 +74,10 @@ public class HiveModuleTest {
private void verifyNumBuiltInFunctions(String hiveVersion, HiveModule
hiveModule) {
switch (hiveVersion) {
case HIVE_VERSION_V2_3_9:
- assertThat(hiveModule.listFunctions()).hasSize(276);
+ assertThat(hiveModule.listFunctions()).hasSize(277);
break;
case HIVE_VERSION_V3_1_1:
- assertThat(hiveModule.listFunctions()).hasSize(295);
+ assertThat(hiveModule.listFunctions()).hasSize(296);
break;
default:
fail("Unknown test version " + hiveVersion);