This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1455e51c7cd [FLINK-26771][hive] Fix incomparable exception between
boolean type and numeric type in Hive dialect
1455e51c7cd is described below
commit 1455e51c7cd64264f5ddebd0b768efc109fab74f
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Aug 12 22:49:54 2022 +0800
[FLINK-26771][hive] Fix incomparable exception between boolean type and
numeric type in Hive dialect
This closes #19182
---
.../delegation/hive/HiveParserCalcitePlanner.java | 2 +-
.../delegation/hive/HiveParserDMLHelper.java | 3 +-
.../hive/HiveParserRexNodeConverter.java | 5 +-
.../hive/copy/HiveParserSqlFunctionConverter.java | 89 ++++++++++++++++++----
.../connectors/hive/HiveDialectQueryITCase.java | 18 +++++
5 files changed, 98 insertions(+), 19 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index 722da244fda..8b657cab3ce 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -2562,7 +2562,7 @@ public class HiveParserCalcitePlanner {
SqlOperator calciteOp =
HiveParserSqlFunctionConverter.getCalciteFn(
- genericUDTFName, argTypes, retType, false);
+ genericUDTFName, argTypes, retType, false,
funcConverter);
RexNode rexNode = cluster.getRexBuilder().makeCall(calciteOp,
operands);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
index 2786d25fc24..06d0e38220b 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
@@ -534,7 +534,8 @@ public class HiveParserDMLHelper {
udfName,
functionInfo.getGenericUDF(),
Collections.singletonList(srcRex.getType()),
- targetCalType),
+ targetCalType,
+ funcConverter),
srcRex);
if (!funcConverter.hasOverloadedOp(
cast.getOperator(),
SqlFunctionCategory.USER_DEFINED_FUNCTION)) {
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
index 7a05def528c..d7910ef38b7 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
@@ -264,7 +264,8 @@ public class HiveParserRexNodeConverter {
HiveGenericUDFArrayAccessStructField.NAME,
Arrays.asList(arrayDataType, accessFieldType),
retType,
- false);
+ false,
+ funcConverter);
return cluster.getRexBuilder().makeCall(calciteOp, rexNode,
accessedField);
} else {
@@ -562,7 +563,7 @@ public class HiveParserRexNodeConverter {
HiveParserTypeConverter.convert(func.getTypeInfo(),
cluster.getTypeFactory());
SqlOperator calciteOp =
HiveParserSqlFunctionConverter.getCalciteOperator(
- func.getFuncText(), func.getGenericUDF(), argTypes,
retType);
+ func.getFuncText(), func.getGenericUDF(), argTypes,
retType, funcConverter);
if (calciteOp.getKind() == SqlKind.CASE) {
// If it is a case operator, we need to rewrite it
childRexNodeLst = rewriteCaseChildren(func, childRexNodeLst);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
index 621729b307d..8f22c4e9a84 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.delegation.hive.copy;
import org.apache.flink.table.planner.delegation.hive.HiveParserIN;
+import org.apache.flink.table.planner.delegation.hive.SqlFunctionConverter;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import org.apache.calcite.rel.type.RelDataType;
@@ -31,6 +32,7 @@ import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlMonotonicBinaryOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.FamilyOperandTypeChecker;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
@@ -55,8 +57,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -82,7 +86,8 @@ public class HiveParserSqlFunctionConverter {
String funcTextName,
GenericUDF hiveUDF,
List<RelDataType> calciteArgTypes,
- RelDataType retType)
+ RelDataType retType,
+ SqlFunctionConverter functionConverter)
throws SemanticException {
// handle overloaded methods first
if (hiveUDF instanceof GenericUDFOPNegative) {
@@ -100,7 +105,11 @@ public class HiveParserSqlFunctionConverter {
name = FunctionRegistry.getNormalizedFunctionName(funcTextName);
}
return getCalciteFn(
- name, calciteArgTypes, retType,
FunctionRegistry.isDeterministic(hiveUDF));
+ name,
+ calciteArgTypes,
+ retType,
+ FunctionRegistry.isDeterministic(hiveUDF),
+ functionConverter);
}
// TODO: this is not valid. Function names for built-in UDFs are specified
in
@@ -385,7 +394,8 @@ public class HiveParserSqlFunctionConverter {
String hiveUdfName,
List<RelDataType> calciteArgTypes,
RelDataType calciteRetType,
- boolean deterministic) {
+ boolean deterministic,
+ SqlFunctionConverter functionConverter) {
SqlOperator calciteOp;
CalciteUDFInfo uInf = getUDFInfo(hiveUdfName, calciteArgTypes,
calciteRetType);
switch (hiveUdfName) {
@@ -415,24 +425,73 @@ public class HiveParserSqlFunctionConverter {
OperandTypes.PLUS_OPERATOR);
break;
default:
- calciteOp = HIVE_TO_CALCITE.get(hiveUdfName);
- if (null == calciteOp) {
- calciteOp =
- new CalciteSqlFn(
- uInf.udfName,
- uInf.identifier,
- SqlKind.OTHER_FUNCTION,
- uInf.returnTypeInference,
- uInf.operandTypeInference,
- uInf.operandTypeChecker,
- SqlFunctionCategory.USER_DEFINED_FUNCTION,
- deterministic);
+ // some functions should be handled as Hive UDF for
+ // the Hive specific logic
+ if (shouldHandledAsHiveUDF(uInf) && canHandledAsHiveUDF(uInf,
functionConverter)) {
+ calciteOp = asCalciteSqlFn(uInf, deterministic);
+ } else {
+ calciteOp = HIVE_TO_CALCITE.get(hiveUdfName);
+ if (null == calciteOp) {
+ calciteOp = asCalciteSqlFn(uInf, deterministic);
+ }
}
break;
}
return calciteOp;
}
+ private static CalciteSqlFn asCalciteSqlFn(CalciteUDFInfo uInf, boolean
deterministic) {
+ return new CalciteSqlFn(
+ uInf.udfName,
+ uInf.identifier,
+ SqlKind.OTHER_FUNCTION,
+ uInf.returnTypeInference,
+ uInf.operandTypeInference,
+ uInf.operandTypeChecker,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ deterministic);
+ }
+
+ private static boolean shouldHandledAsHiveUDF(CalciteUDFInfo
calciteUDFInfo) {
+ String udfName = calciteUDFInfo.udfName;
+ if (isCompareFunction(udfName)) {
+ // for hive, boolean type can be comparable to other
numeric/string types,
+ // but it's not supported in Flink, so it should be handled as
hive udf to
+ // be consistent with Hive.
+ Set<SqlTypeFamily> operandTypes = new HashSet<>();
+ operandTypes.add(
+ ((FamilyOperandTypeChecker)
calciteUDFInfo.operandTypeChecker)
+ .getOperandSqlTypeFamily(0));
+ operandTypes.add(
+ ((FamilyOperandTypeChecker)
calciteUDFInfo.operandTypeChecker)
+ .getOperandSqlTypeFamily(1));
+ return operandTypes.contains(SqlTypeFamily.BOOLEAN)
+ && (operandTypes.contains(SqlTypeFamily.NUMERIC)
+ || operandTypes.contains(SqlTypeFamily.CHARACTER));
+ }
+ return false;
+ }
+
+ private static boolean canHandledAsHiveUDF(
+ CalciteUDFInfo udfInfo, SqlFunctionConverter functionConverter) {
+ // if we can find the overloaded operator, the function can be handled
as Hive's UDF.
+ // otherwise, it can't be handled as Hive's UDF which can happen when
hive module is not
+ // loaded.
+ return functionConverter.hasOverloadedOp(
+ asCalciteSqlFn(udfInfo, false),
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+ }
+
+ private static boolean isCompareFunction(String udfName) {
+ return udfName.equals(">")
+ || udfName.equals(">=")
+ || udfName.equals("<")
+ || udfName.equals("<=")
+ || udfName.equals("=")
+ || udfName.equals("<>")
+ || udfName.equals("!=")
+ || udfName.equals("<=>");
+ }
+
public static SqlAggFunction getCalciteAggFn(
String hiveUdfName,
boolean isDistinct,
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 0817af4ff4e..4f8cfe09510 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -725,6 +725,24 @@ public class HiveDialectQueryITCase {
}
}
+ @Test
+ public void testBoolComparison() {
+ tableEnv.executeSql("CREATE TABLE tbool (id int, a int, b string, c
boolean)");
+ try {
+ tableEnv.executeSql(
+ "insert into tbool values (1, 1, '12', true), (2, 1,
'0.4', false)");
+ // test compare boolean with numeric/string type
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql(
+ "select id from tbool where a =
true and b != false and c = '1'")
+ .collect());
+ assertThat(results.toString()).isEqualTo("[+I[1]]");
+ } finally {
+ tableEnv.executeSql("drop table tbool");
+ }
+ }
+
private void runQFile(File qfile) throws Exception {
QTest qTest = extractQTest(qfile);
for (int i = 0; i < qTest.statements.size(); i++) {