This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fc5730ab2de [FLINK-26474][hive] Fold exprNode to fix the issue of
failing to call some hive udf required constant parameters with implicit
constant passed
fc5730ab2de is described below
commit fc5730ab2dee219a4875e78312174c0364579013
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Aug 31 13:39:34 2022 +0800
[FLINK-26474][hive] Fold exprNode to fix the issue of failing to call some
hive udf required constant parameters with implicit constant passed
This closes #18975
---
.../delegation/hive/HiveParserCalcitePlanner.java | 3 +-
.../hive/HiveParserRexNodeConverter.java | 48 ++++++++++++++++++----
.../hive/HiveParserTypeCheckProcFactory.java | 21 ++++++++++
.../connectors/hive/HiveDialectQueryITCase.java | 2 +-
.../src/test/resources/query-test/udf.q | 17 ++++++++
5 files changed, 81 insertions(+), 10 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index e1e5a452d93..4244dbde980 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -2307,7 +2307,8 @@ public class HiveParserCalcitePlanner {
} else {
// Case when this is an expression
HiveParserTypeCheckCtx typeCheckCtx =
- new HiveParserTypeCheckCtx(inputRR,
frameworkConfig, cluster);
+ new HiveParserTypeCheckCtx(
+ inputRR, true, true, frameworkConfig,
cluster);
// We allow stateful functions in the SELECT list (but
nowhere else)
typeCheckCtx.setAllowStatefulFunctions(true);
if (!qbp.getDestToGroupBy().isEmpty()) {
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
index 17ea24a6e4a..54775ed6c55 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserRexNodeConverter.java
@@ -60,6 +60,8 @@ import org.apache.calcite.util.TimestampString;
import org.apache.hadoop.hive.common.type.Decimal128;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
@@ -456,9 +458,21 @@ public class HiveParserRexNodeConverter {
default:
if (hiveShim.isIntervalYearMonthType(hiveTypeCategory)) {
// Calcite year-month literal value is months as BigDecimal
- BigDecimal totalMonths =
- BigDecimal.valueOf(
- ((HiveParserIntervalYearMonth)
value).getTotalMonths());
+ BigDecimal totalMonths;
+ if (value instanceof HiveParserIntervalYearMonth) {
+ totalMonths =
+ BigDecimal.valueOf(
+ ((HiveParserIntervalYearMonth)
value).getTotalMonths());
+ } else if (value instanceof HiveIntervalYearMonth) {
+ totalMonths =
+ BigDecimal.valueOf(
+ ((HiveIntervalYearMonth)
value).getTotalMonths());
+ } else {
+ throw new SemanticException(
+ String.format(
+ "Unexpected class %s for Hive's
interval day time type",
+ value.getClass().getName()));
+ }
calciteLiteral =
rexBuilder.makeIntervalLiteral(
totalMonths,
@@ -467,12 +481,30 @@ public class HiveParserRexNodeConverter {
} else if (hiveShim.isIntervalDayTimeType(hiveTypeCategory)) {
// Calcite day-time interval is millis value as BigDecimal
// Seconds converted to millis
- BigDecimal secsValueBd =
- BigDecimal.valueOf(
- ((HiveParserIntervalDayTime)
value).getTotalSeconds() * 1000);
+ BigDecimal secsValueBd;
+ // Nanos converted to millis
+ BigDecimal nanosValueBd;
+ if (value instanceof HiveParserIntervalDayTime) {
+ secsValueBd =
+ BigDecimal.valueOf(
+ ((HiveParserIntervalDayTime)
value).getTotalSeconds()
+ * 1000);
+ nanosValueBd =
+ BigDecimal.valueOf(
+ ((HiveParserIntervalDayTime)
value).getNanos(), 6);
+ } else if (value instanceof HiveIntervalDayTime) {
+ secsValueBd =
+ BigDecimal.valueOf(
+ ((HiveIntervalDayTime)
value).getTotalSeconds() * 1000);
+ nanosValueBd =
+ BigDecimal.valueOf(((HiveIntervalDayTime)
value).getNanos(), 6);
+ } else {
+ throw new SemanticException(
+ String.format(
+ "Unexpected class %s for Hive's
interval day time type.",
+ value.getClass().getName()));
+ }
// Nanos converted to millis
- BigDecimal nanosValueBd =
- BigDecimal.valueOf(((HiveParserIntervalDayTime)
value).getNanos(), 6);
calciteLiteral =
rexBuilder.makeIntervalLiteral(
secsValueBd.add(nanosValueBd),
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java
index 859bf3353e9..ef2d1b45fe7 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java
@@ -77,6 +77,7 @@ import
org.apache.hadoop.hive.ql.udf.generic.GenericUDFInternalInterval;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFNvl;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNegative;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen;
@@ -1257,11 +1258,31 @@ public class HiveParserTypeCheckProcFactory {
if (FunctionRegistry.isOpPositive(desc)) {
assert (desc.getChildren().size() == 1);
desc = desc.getChildren().get(0);
+ } else if (getGenericUDFClassFromExprDesc(desc) ==
GenericUDFOPNegative.class) {
+ // UDFOPNegative should always be folded.
+ assert (desc.getChildren().size() == 1);
+ ExprNodeDesc input = desc.getChildren().get(0);
+ if (input instanceof ExprNodeConstantDesc
+ && desc instanceof ExprNodeGenericFuncDesc) {
+ ExprNodeDesc constantExpr =
+
ConstantPropagateProcFactory.foldExpr((ExprNodeGenericFuncDesc) desc);
+ if (constantExpr != null) {
+ desc = constantExpr;
+ }
+ }
}
assert (desc != null);
return desc;
}
+ private Class<? extends GenericUDF>
getGenericUDFClassFromExprDesc(ExprNodeDesc desc) {
+ if (!(desc instanceof ExprNodeGenericFuncDesc)) {
+ return null;
+ }
+ ExprNodeGenericFuncDesc genericFuncDesc =
(ExprNodeGenericFuncDesc) desc;
+ return genericFuncDesc.getGenericUDF().getClass();
+ }
+
// try to create an ExprNodeDesc with a SqlOperator
private ExprNodeDesc convertSqlOperator(
String funcText, List<ExprNodeDesc> children,
HiveParserTypeCheckCtx ctx)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 97bb586b4b5..293492d4b2d 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -788,7 +788,7 @@ public class HiveDialectQueryITCase {
timestamp))
.collect());
assertThat(results.toString())
- .isEqualTo(String.format("[+I[%s]]",
expectTimeStampDecimal.toFormatString(8)));
+ .isEqualTo(String.format("[+I[%s]]",
expectTimeStampDecimal));
// test insert timestamp type to decimal type directly
tableEnv.executeSql("create table t1 (c1 DECIMAL(38,6))");
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/query-test/udf.q
b/flink-connectors/flink-connector-hive/src/test/resources/query-test/udf.q
new file mode 100644
index 00000000000..f6d8cd6a6fc
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/resources/query-test/udf.q
@@ -0,0 +1,17 @@
+-- SORT_QUERY_RESULTS
+
+select bround(55.0, -1);
+
+[+I[60]]
+
+select bround(55.0, +1);
+
+[+I[55]]
+
+select round(123.45, -2);
+
+[+I[100]]
+
+select sha2('ABC', cast(null as int));
+
+[+I[null]]