This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 74c7188ae98 [FLINK-29722][hive] Supports native hive max function for
hive dialect
74c7188ae98 is described below
commit 74c7188ae9898b492c94a472d9d407bf4f8e0876
Author: fengli <[email protected]>
AuthorDate: Thu Jan 5 21:04:33 2023 +0800
[FLINK-29722][hive] Supports native hive max function for hive dialect
This closes #21605
---
.../hive/HiveDeclarativeAggregateFunction.java | 29 +++++
...MinAggFunction.java => HiveMaxAggFunction.java} | 59 ++++------
.../table/functions/hive/HiveMinAggFunction.java | 19 +---
.../table/functions/hive/HiveSumAggFunction.java | 4 +-
.../apache/flink/table/module/hive/HiveModule.java | 6 +-
.../connectors/hive/HiveDialectAggITCase.java | 126 ++++++++++++++++++---
.../connectors/hive/HiveDialectQueryPlanTest.java | 14 +++
.../explain/testMaxAggFunctionFallbackPlan.out | 21 ++++
.../resources/explain/testMaxAggFunctionPlan.out | 17 +++
9 files changed, 227 insertions(+), 68 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
index 5d316e2e97d..e184207d007 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.functions.hive;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.DeclarativeAggregateFunction;
import org.apache.flink.table.functions.FunctionKind;
@@ -26,9 +27,14 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import java.util.List;
import java.util.Optional;
+import static
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
+
/**
* API for hive aggregation functions that are expressed in terms of
expressions.
*
@@ -57,6 +63,29 @@ public abstract class HiveDeclarativeAggregateFunction
extends DeclarativeAggreg
.build();
}
+ protected void checkArgumentNum(List<DataType> arguments) {
+ if (arguments.size() != 1) {
+ throw new TableException("Exactly one argument is expected.");
+ }
+ }
+
+ protected void checkMinMaxArgumentType(LogicalType logicalType, String
functionName) {
+ // Flink doesn't support to compare nested type now, so here can't
support it, see
+ // ScalarOperatorGens#generateComparison for more detail
+ if (logicalType.is(LogicalTypeRoot.ARRAY)
+ || logicalType.is(LogicalTypeRoot.MAP)
+ || logicalType.is(LogicalTypeRoot.ROW)) {
+ throw new TableException(
+ String.format(
+ "Native hive %s aggregate function does not
support type: %s. "
+ + "Please set option '%s' to false to fall
back to Hive's own %s function.",
+ functionName,
+ logicalType.getTypeRoot(),
+ TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED.key(),
+ functionName));
+ }
+ }
+
@Override
public FunctionKind getKind() {
return FunctionKind.AGGREGATE;
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java
similarity index 60%
copy from
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
copy to
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java
index 2eade5a83e6..1e5ddc62e97 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java
@@ -23,19 +23,17 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
import static
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.greaterThan;
import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
-import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.lessThan;
import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
-/** built-in hive min aggregate function. */
-public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction {
+/** built-in hive max aggregate function. */
+public class HiveMaxAggFunction extends HiveDeclarativeAggregateFunction {
- private final UnresolvedReferenceExpression min = unresolvedRef("min");
+ private final UnresolvedReferenceExpression max = unresolvedRef("max");
private DataType resultType;
@Override
@@ -45,7 +43,7 @@ public class HiveMinAggFunction extends
HiveDeclarativeAggregateFunction {
@Override
public UnresolvedReferenceExpression[] aggBufferAttributes() {
- return new UnresolvedReferenceExpression[] {min};
+ return new UnresolvedReferenceExpression[] {max};
}
@Override
@@ -60,67 +58,54 @@ public class HiveMinAggFunction extends
HiveDeclarativeAggregateFunction {
@Override
public Expression[] initialValuesExpressions() {
- return new Expression[] {
- /* min */
- nullOf(getResultType())
- };
+ return new Expression[] {/* max = */ nullOf(getResultType())};
}
@Override
public Expression[] accumulateExpressions() {
return new Expression[] {
- /* min = */ ifThenElse(
+ /* max = */ ifThenElse(
isNull(operand(0)),
- min,
+ max,
ifThenElse(
- isNull(min),
+ isNull(max),
operand(0),
- ifThenElse(lessThan(operand(0), min), operand(0),
min)))
+ ifThenElse(greaterThan(operand(0), max),
operand(0), max)))
};
}
@Override
public Expression[] retractExpressions() {
- throw new TableException("Min aggregate function does not support
retraction.");
+ throw new TableException("Max aggregate function does not support
retraction.");
}
@Override
public Expression[] mergeExpressions() {
return new Expression[] {
- /* min = */ ifThenElse(
- isNull(mergeOperand(min)),
- min,
+ /* max = */ ifThenElse(
+ isNull(mergeOperand(max)),
+ max,
ifThenElse(
- isNull(min),
- mergeOperand(min),
- ifThenElse(lessThan(mergeOperand(min), min),
mergeOperand(min), min)))
+ isNull(max),
+ mergeOperand(max),
+ ifThenElse(
+ greaterThan(mergeOperand(max), max),
mergeOperand(max), max)))
};
}
@Override
public Expression getValueExpression() {
- return min;
+ return max;
}
@Override
public void setArguments(CallContext callContext) {
if (resultType == null) {
+ checkArgumentNum(callContext.getArgumentDataTypes());
// check argument type firstly
-
checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType());
+ checkMinMaxArgumentType(
+
callContext.getArgumentDataTypes().get(0).getLogicalType(), "max");
resultType = callContext.getArgumentDataTypes().get(0);
}
}
-
- private void checkArgumentType(LogicalType logicalType) {
- // Flink doesn't support to compare nested type now, so here can't
support it, see
- // ScalarOperatorGens#generateComparison for more detail
- if (logicalType.is(LogicalTypeRoot.ARRAY)
- || logicalType.is(LogicalTypeRoot.MAP)
- || logicalType.is(LogicalTypeRoot.ROW)) {
- throw new TableException(
- String.format(
- "Hive native min aggregate function does not
support type: '%s' now. Please re-check the data type.",
- logicalType.getTypeRoot()));
- }
- }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
index 2eade5a83e6..9dab1334f88 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
@@ -23,8 +23,6 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
import static
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
@@ -105,22 +103,11 @@ public class HiveMinAggFunction extends
HiveDeclarativeAggregateFunction {
@Override
public void setArguments(CallContext callContext) {
if (resultType == null) {
+ checkArgumentNum(callContext.getArgumentDataTypes());
// check argument type firstly
-
checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType());
+ checkMinMaxArgumentType(
+
callContext.getArgumentDataTypes().get(0).getLogicalType(), "min");
resultType = callContext.getArgumentDataTypes().get(0);
}
}
-
- private void checkArgumentType(LogicalType logicalType) {
- // Flink doesn't support to compare nested type now, so here can't
support it, see
- // ScalarOperatorGens#generateComparison for more detail
- if (logicalType.is(LogicalTypeRoot.ARRAY)
- || logicalType.is(LogicalTypeRoot.MAP)
- || logicalType.is(LogicalTypeRoot.ROW)) {
- throw new TableException(
- String.format(
- "Hive native min aggregate function does not
support type: '%s' now. Please re-check the data type.",
- logicalType.getTypeRoot()));
- }
- }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
index 713cd72e13a..e19674ed39f 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
@@ -106,6 +106,7 @@ public class HiveSumAggFunction extends
HiveDeclarativeAggregateFunction {
@Override
public void setArguments(CallContext callContext) {
if (resultType == null) {
+ checkArgumentNum(callContext.getArgumentDataTypes());
resultType =
initResultType(callContext.getArgumentDataTypes().get(0));
}
}
@@ -129,7 +130,8 @@ public class HiveSumAggFunction extends
HiveDeclarativeAggregateFunction {
case TIMESTAMP_WITHOUT_TIME_ZONE:
throw new TableException(
String.format(
- "Native hive sum aggregate function does not
support type: %s. Please set option '%s' to false.",
+ "Native hive sum aggregate function does not
support type: %s. "
+ + "Please set option '%s' to false to
fall back to Hive's own sum function.",
argsType,
TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED.key()));
default:
throw new TableException(
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index bb598891fef..d2b397ff416 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -27,6 +27,7 @@ import
org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFacto
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.HiveCountAggFunction;
+import org.apache.flink.table.functions.hive.HiveMaxAggFunction;
import org.apache.flink.table.functions.hive.HiveMinAggFunction;
import org.apache.flink.table.functions.hive.HiveSumAggFunction;
import org.apache.flink.table.module.Module;
@@ -87,7 +88,7 @@ public class HiveModule implements Module {
"tumble_start")));
static final Set<String> BUILTIN_NATIVE_AGG_FUNC =
- Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum",
"count", "min")));
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum",
"count", "min", "max")));
private final HiveFunctionDefinitionFactory factory;
private final String hiveVersion;
@@ -213,6 +214,9 @@ public class HiveModule implements Module {
case "min":
// We override Hive's min function by native implementation to
supports hash-agg
return Optional.of(new HiveMinAggFunction());
+ case "max":
+ // We override Hive's max function by native implementation to
supports hash-agg
+ return Optional.of(new HiveMaxAggFunction());
default:
throw new UnsupportedOperationException(
String.format(
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
index 3af77ad72b6..94aacfe0e50 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
@@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
import java.util.List;
import static
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
-import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -128,16 +127,10 @@ public class HiveDialectAggITCase {
assertThat(result7.toString()).isEqualTo("[+I[6.0, 10]]");
// test unsupported timestamp type
- assertThatThrownBy(
- () ->
- CollectionUtil.iteratorToList(
- tableEnv.executeSql("select sum(ts)
from test_sum")
- .collect()))
- .rootCause()
- .satisfiesAnyOf(
- anyCauseMatches(
- "Native hive sum aggregate function does not
support type: TIMESTAMP(9). "
- + "Please set option
'table.exec.hive.native-agg-function.enabled' to false."));
+ String expectedMessage =
+ "Native hive sum aggregate function does not support type:
TIMESTAMP(9). "
+ + "Please set option
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's
own sum function.";
+ assertSqlException("select sum(ts) from test_sum",
TableException.class, expectedMessage);
tableEnv.executeSql("drop table test_sum");
}
@@ -303,7 +296,8 @@ public class HiveDialectAggITCase {
"create table test_min_not_support_type(a array<int>,m
map<int, string>,s struct<f1:int,f2:string>)");
// test min with row type
String expectedRowMessage =
- "Hive native min aggregate function does not support type:
'ROW' now. Please re-check the data type.";
+ "Native hive min aggregate function does not support type:
ROW. "
+ + "Please set option
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's
own min function.";
assertSqlException(
"select min(s) from test_min_not_support_type",
TableException.class,
@@ -311,7 +305,8 @@ public class HiveDialectAggITCase {
// test min with array type
String expectedArrayMessage =
- "Hive native min aggregate function does not support type:
'ARRAY' now. Please re-check the data type.";
+ "Native hive min aggregate function does not support type:
ARRAY. "
+ + "Please set option
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's
own min function.";
assertSqlException(
"select min(a) from test_min_not_support_type",
TableException.class,
@@ -328,6 +323,111 @@ public class HiveDialectAggITCase {
tableEnv.executeSql("drop table test_min_not_support_type");
}
+ @Test
+ public void testMaxAggFunction() throws Exception {
+ tableEnv.executeSql(
+ "create table test_max(a int, b boolean, x string, y string, z
int, d decimal(10,5), e float, f double, ts timestamp, dt date, bar binary)");
+ tableEnv.executeSql(
+ "insert into test_max values (1, true, NULL, '2', 1,
1.11, 1.2, 1.3, '2021-08-04 16:26:33.4','2021-08-04', 'data1'), "
+ + "(1, false, NULL, 'b', 2, 2.22, 2.3, 2.4,
'2021-08-06 16:26:33.4','2021-08-07', 'data2'), "
+ + "(2, false, NULL, '4', 1, 3.33, 3.5, 3.6,
'2021-08-08 16:26:33.4','2021-08-08', 'data3'), "
+ + "(2, true, NULL, NULL, 4, 4.45, 4.7, 4.8,
'2021-08-10 16:26:33.4','2021-08-01', 'data4')")
+ .await();
+
+ // test max with all elements are null
+ List<Row> result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(x) from
test_max").collect());
+ assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+ // test max with some elements are null
+ List<Row> result2 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(y) from
test_max").collect());
+ assertThat(result2.toString()).isEqualTo("[+I[b]]");
+
+ // test max with some elements repeated
+ List<Row> result3 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(z) from
test_max").collect());
+ assertThat(result3.toString()).isEqualTo("[+I[4]]");
+
+ // test max with decimal type
+ List<Row> result4 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(d) from
test_max").collect());
+ assertThat(result4.toString()).isEqualTo("[+I[4.45000]]");
+
+ // test max with float type
+ List<Row> result5 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(e) from
test_max").collect());
+ assertThat(result5.toString()).isEqualTo("[+I[4.7]]");
+
+ // test max with double type
+ List<Row> result6 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(f) from
test_max").collect());
+ assertThat(result6.toString()).isEqualTo("[+I[4.8]]");
+
+ // test max with boolean type
+ List<Row> result7 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(b) from
test_max").collect());
+ assertThat(result7.toString()).isEqualTo("[+I[true]]");
+
+ // test max with timestamp type
+ List<Row> result8 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(ts) from
test_max").collect());
+
assertThat(result8.toString()).isEqualTo("[+I[2021-08-10T16:26:33.400]]");
+
+ // test max with date type
+ List<Row> result9 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(dt) from
test_max").collect());
+ assertThat(result9.toString()).isEqualTo("[+I[2021-08-08]]");
+
+ // test max with binary type
+ List<Row> result10 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select max(bar) from
test_max").collect());
+ assertThat(result10.toString()).isEqualTo("[+I[[100, 97, 116, 97,
52]]]");
+
+ tableEnv.executeSql("drop table test_max");
+
+ // test max with unsupported data type
+ tableEnv.executeSql(
+ "create table test_max_not_support_type(a array<int>,m
map<int, string>,s struct<f1:int,f2:string>)");
+ // test max with row type
+ String expectedRowMessage =
+ "Native hive max aggregate function does not support type:
ROW. "
+ + "Please set option
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's
own max function.";
+ assertSqlException(
+ "select max(s) from test_max_not_support_type",
+ TableException.class,
+ expectedRowMessage);
+
+ // test max with array type
+ String expectedArrayMessage =
+ "Native hive max aggregate function does not support type:
ARRAY. "
+ + "Please set option
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's
own max function.";
+ assertSqlException(
+ "select max(a) from test_max_not_support_type",
+ TableException.class,
+ expectedArrayMessage);
+
+ // test max with map type, hive also does not support map type
comparisons.
+ String expectedMapMessage =
+ "Cannot support comparison of map<> type or complex type
containing map<>.";
+ assertSqlException(
+ "select max(m) from test_max_not_support_type",
+ UDFArgumentTypeException.class,
+ expectedMapMessage);
+
+ tableEnv.executeSql("drop table test_max_not_support_type");
+ }
+
private void assertSqlException(
String sql, Class<?> expectedExceptionClz, String expectedMessage)
{
assertThatThrownBy(() -> tableEnv.executeSql(sql))
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
index 48cc8b913d1..a681d004f93 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
@@ -109,6 +109,20 @@ public class HiveDialectQueryPlanTest {
.isEqualTo(readFromResource("/explain/testMinAggFunctionFallbackPlan.out"));
}
+ @Test
+ public void testMaxAggFunctionPlan() {
+ // test explain
+ String sql = "select x, max(y) from foo group by x";
+ String actualPlan = explainSql(sql);
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMaxAggFunctionPlan.out"));
+
+ // test fallback to hive max udaf
+ tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED,
false);
+ String actualSortAggPlan = explainSql(sql);
+ assertThat(actualSortAggPlan)
+
.isEqualTo(readFromResource("/explain/testMaxAggFunctionFallbackPlan.out"));
+ }
+
private String explainSql(String sql) {
return (String)
CollectionUtil.iteratorToList(tableEnv.executeSql("explain " +
sql).collect())
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out
new file mode 100644
index 00000000000..e25fa76f0e0
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[max($1)])
+ +- LogicalProject($f0=[$0], $f1=[$1])
+ +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+ +- Exchange(distribution=[hash[x]])
+ +- LocalSortAggregate(groupBy=[x], select=[x, Partial_max(y) AS $f1])
+ +- Sort(orderBy=[x ASC])
+ +- TableSourceScan(table=[[test-catalog, default, foo]],
fields=[x, y])
+
+== Optimized Execution Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+ +- Exchange(distribution=[hash[x]])
+ +- LocalSortAggregate(groupBy=[x], select=[x, Partial_max(y) AS $f1])
+ +- Sort(orderBy=[x ASC])
+ +- TableSourceScan(table=[[test-catalog, default, foo]],
fields=[x, y])
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out
new file mode 100644
index 00000000000..5beaeb74681
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out
@@ -0,0 +1,17 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[max($1)])
+ +- LogicalProject($f0=[$0], $f1=[$1])
+ +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max(max$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+ +- LocalHashAggregate(groupBy=[x], select=[x, Partial_max(y) AS max$0])
+ +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])
+
+== Optimized Execution Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max(max$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+ +- LocalHashAggregate(groupBy=[x], select=[x, Partial_max(y) AS max$0])
+ +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])