This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1455e51c7cd [FLINK-26771][hive] Fix incomparable exception between 
boolean type and numeric type in Hive dialect
1455e51c7cd is described below

commit 1455e51c7cd64264f5ddebd0b768efc109fab74f
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Aug 12 22:49:54 2022 +0800

    [FLINK-26771][hive] Fix incomparable exception between boolean type and 
numeric type in Hive dialect
    
    This closes #19182
---
 .../delegation/hive/HiveParserCalcitePlanner.java  |  2 +-
 .../delegation/hive/HiveParserDMLHelper.java       |  3 +-
 .../hive/HiveParserRexNodeConverter.java           |  5 +-
 .../hive/copy/HiveParserSqlFunctionConverter.java  | 89 ++++++++++++++++++----
 .../connectors/hive/HiveDialectQueryITCase.java    | 18 +++++
 5 files changed, 98 insertions(+), 19 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index 722da244fda..8b657cab3ce 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -2562,7 +2562,7 @@ public class HiveParserCalcitePlanner {
 
         SqlOperator calciteOp =
                 HiveParserSqlFunctionConverter.getCalciteFn(
-                        genericUDTFName, argTypes, retType, false);
+                        genericUDTFName, argTypes, retType, false, 
funcConverter);
 
         RexNode rexNode = cluster.getRexBuilder().makeCall(calciteOp, 
operands);
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
index 2786d25fc24..06d0e38220b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserDMLHelper.java
@@ -534,7 +534,8 @@ public class HiveParserDMLHelper {
                                             udfName,
                                             functionInfo.getGenericUDF(),
                                             
Collections.singletonList(srcRex.getType()),
-                                            targetCalType),
+                                            targetCalType,
+                                            funcConverter),
                                     srcRex);
             if (!funcConverter.hasOverloadedOp(
                     cast.getOperator(), 
SqlFunctionCategory.USER_DEFINED_FUNCTION)) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
index 7a05def528c..d7910ef38b7 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
@@ -264,7 +264,8 @@ public class HiveParserRexNodeConverter {
                                 HiveGenericUDFArrayAccessStructField.NAME,
                                 Arrays.asList(arrayDataType, accessFieldType),
                                 retType,
-                                false);
+                                false,
+                                funcConverter);
 
                 return cluster.getRexBuilder().makeCall(calciteOp, rexNode, 
accessedField);
             } else {
@@ -562,7 +563,7 @@ public class HiveParserRexNodeConverter {
                 HiveParserTypeConverter.convert(func.getTypeInfo(), 
cluster.getTypeFactory());
         SqlOperator calciteOp =
                 HiveParserSqlFunctionConverter.getCalciteOperator(
-                        func.getFuncText(), func.getGenericUDF(), argTypes, 
retType);
+                        func.getFuncText(), func.getGenericUDF(), argTypes, 
retType, funcConverter);
         if (calciteOp.getKind() == SqlKind.CASE) {
             // If it is a case operator, we need to rewrite it
             childRexNodeLst = rewriteCaseChildren(func, childRexNodeLst);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
index 621729b307d..8f22c4e9a84 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserSqlFunctionConverter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.delegation.hive.copy;
 
 import org.apache.flink.table.planner.delegation.hive.HiveParserIN;
+import org.apache.flink.table.planner.delegation.hive.SqlFunctionConverter;
 import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
 
 import org.apache.calcite.rel.type.RelDataType;
@@ -31,6 +32,7 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlMonotonicBinaryOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.FamilyOperandTypeChecker;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
@@ -55,8 +57,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -82,7 +86,8 @@ public class HiveParserSqlFunctionConverter {
             String funcTextName,
             GenericUDF hiveUDF,
             List<RelDataType> calciteArgTypes,
-            RelDataType retType)
+            RelDataType retType,
+            SqlFunctionConverter functionConverter)
             throws SemanticException {
         // handle overloaded methods first
         if (hiveUDF instanceof GenericUDFOPNegative) {
@@ -100,7 +105,11 @@ public class HiveParserSqlFunctionConverter {
             name = FunctionRegistry.getNormalizedFunctionName(funcTextName);
         }
         return getCalciteFn(
-                name, calciteArgTypes, retType, 
FunctionRegistry.isDeterministic(hiveUDF));
+                name,
+                calciteArgTypes,
+                retType,
+                FunctionRegistry.isDeterministic(hiveUDF),
+                functionConverter);
     }
 
     // TODO: this is not valid. Function names for built-in UDFs are specified 
in
@@ -385,7 +394,8 @@ public class HiveParserSqlFunctionConverter {
             String hiveUdfName,
             List<RelDataType> calciteArgTypes,
             RelDataType calciteRetType,
-            boolean deterministic) {
+            boolean deterministic,
+            SqlFunctionConverter functionConverter) {
         SqlOperator calciteOp;
         CalciteUDFInfo uInf = getUDFInfo(hiveUdfName, calciteArgTypes, 
calciteRetType);
         switch (hiveUdfName) {
@@ -415,24 +425,73 @@ public class HiveParserSqlFunctionConverter {
                                 OperandTypes.PLUS_OPERATOR);
                 break;
             default:
-                calciteOp = HIVE_TO_CALCITE.get(hiveUdfName);
-                if (null == calciteOp) {
-                    calciteOp =
-                            new CalciteSqlFn(
-                                    uInf.udfName,
-                                    uInf.identifier,
-                                    SqlKind.OTHER_FUNCTION,
-                                    uInf.returnTypeInference,
-                                    uInf.operandTypeInference,
-                                    uInf.operandTypeChecker,
-                                    SqlFunctionCategory.USER_DEFINED_FUNCTION,
-                                    deterministic);
+                // some functions should be handled as Hive UDF for
+                // the Hive specific logic
+                if (shouldHandledAsHiveUDF(uInf) && canHandledAsHiveUDF(uInf, 
functionConverter)) {
+                    calciteOp = asCalciteSqlFn(uInf, deterministic);
+                } else {
+                    calciteOp = HIVE_TO_CALCITE.get(hiveUdfName);
+                    if (null == calciteOp) {
+                        calciteOp = asCalciteSqlFn(uInf, deterministic);
+                    }
                 }
                 break;
         }
         return calciteOp;
     }
 
+    private static CalciteSqlFn asCalciteSqlFn(CalciteUDFInfo uInf, boolean 
deterministic) {
+        return new CalciteSqlFn(
+                uInf.udfName,
+                uInf.identifier,
+                SqlKind.OTHER_FUNCTION,
+                uInf.returnTypeInference,
+                uInf.operandTypeInference,
+                uInf.operandTypeChecker,
+                SqlFunctionCategory.USER_DEFINED_FUNCTION,
+                deterministic);
+    }
+
+    private static boolean shouldHandledAsHiveUDF(CalciteUDFInfo 
calciteUDFInfo) {
+        String udfName = calciteUDFInfo.udfName;
+        if (isCompareFunction(udfName)) {
+            // for hive, boolean type can be comparable to other 
numeric/string types,
+            // but it's not supported in Flink, so it should be handled as 
hive udf to
+            // be consistent with Hive.
+            Set<SqlTypeFamily> operandTypes = new HashSet<>();
+            operandTypes.add(
+                    ((FamilyOperandTypeChecker) 
calciteUDFInfo.operandTypeChecker)
+                            .getOperandSqlTypeFamily(0));
+            operandTypes.add(
+                    ((FamilyOperandTypeChecker) 
calciteUDFInfo.operandTypeChecker)
+                            .getOperandSqlTypeFamily(1));
+            return operandTypes.contains(SqlTypeFamily.BOOLEAN)
+                    && (operandTypes.contains(SqlTypeFamily.NUMERIC)
+                            || operandTypes.contains(SqlTypeFamily.CHARACTER));
+        }
+        return false;
+    }
+
+    private static boolean canHandledAsHiveUDF(
+            CalciteUDFInfo udfInfo, SqlFunctionConverter functionConverter) {
+        // if we can find the overloaded operator, the function can be handled 
as Hive's UDF.
+        // otherwise, it can't be handled as Hive's UDF which can happen when 
hive module is not
+        // loaded.
+        return functionConverter.hasOverloadedOp(
+                asCalciteSqlFn(udfInfo, false), 
SqlFunctionCategory.USER_DEFINED_FUNCTION);
+    }
+
+    private static boolean isCompareFunction(String udfName) {
+        return udfName.equals(">")
+                || udfName.equals(">=")
+                || udfName.equals("<")
+                || udfName.equals("<=")
+                || udfName.equals("=")
+                || udfName.equals("<>")
+                || udfName.equals("!=")
+                || udfName.equals("<=>");
+    }
+
     public static SqlAggFunction getCalciteAggFn(
             String hiveUdfName,
             boolean isDistinct,
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 0817af4ff4e..4f8cfe09510 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -725,6 +725,24 @@ public class HiveDialectQueryITCase {
         }
     }
 
+    @Test
+    public void testBoolComparison() {
+        tableEnv.executeSql("CREATE TABLE tbool (id int, a int, b string, c 
boolean)");
+        try {
+            tableEnv.executeSql(
+                    "insert into tbool values (1, 1, '12', true), (2, 1, 
'0.4', false)");
+            // test compare boolean with numeric/string type
+            List<Row> results =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql(
+                                            "select id from tbool where a = 
true and b != false and c = '1'")
+                                    .collect());
+            assertThat(results.toString()).isEqualTo("[+I[1]]");
+        } finally {
+            tableEnv.executeSql("drop table tbool");
+        }
+    }
+
     private void runQFile(File qfile) throws Exception {
         QTest qTest = extractQTest(qfile);
         for (int i = 0; i < qTest.statements.size(); i++) {

Reply via email to