This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8a1f66ca827 [FLINK-29721][hive] Supports native hive min function for
hive dialect
8a1f66ca827 is described below
commit 8a1f66ca827163b32387e0043f4362921f6c11a9
Author: Tartarus0zm <[email protected]>
AuthorDate: Fri Jan 6 16:09:38 2023 +0800
[FLINK-29721][hive] Supports native hive min function for hive dialect
This closes #21608
---
.../table/functions/hive/HiveMinAggFunction.java | 126 ++++++++++++++++++++
.../apache/flink/table/module/hive/HiveModule.java | 25 +++-
.../connectors/hive/HiveDialectAggITCase.java | 132 +++++++++++++++++----
.../connectors/hive/HiveDialectQueryPlanTest.java | 121 +++++++++++++++++++
.../explain/testMinAggFunctionFallbackPlan.out | 21 ++++
.../resources/explain/testMinAggFunctionPlan.out | 17 +++
6 files changed, 419 insertions(+), 23 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
new file mode 100644
index 00000000000..2eade5a83e6
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.lessThan;
+import static
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+
+/** built-in hive min aggregate function. */
+public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction {
+
+ private final UnresolvedReferenceExpression min = unresolvedRef("min");
+ private DataType resultType;
+
+ @Override
+ public int operandCount() {
+ return 1;
+ }
+
+ @Override
+ public UnresolvedReferenceExpression[] aggBufferAttributes() {
+ return new UnresolvedReferenceExpression[] {min};
+ }
+
+ @Override
+ public DataType[] getAggBufferTypes() {
+ return new DataType[] {getResultType()};
+ }
+
+ @Override
+ public DataType getResultType() {
+ return resultType;
+ }
+
+ @Override
+ public Expression[] initialValuesExpressions() {
+ return new Expression[] {
+ /* min */
+ nullOf(getResultType())
+ };
+ }
+
+ @Override
+ public Expression[] accumulateExpressions() {
+ return new Expression[] {
+ /* min = */ ifThenElse(
+ isNull(operand(0)),
+ min,
+ ifThenElse(
+ isNull(min),
+ operand(0),
+ ifThenElse(lessThan(operand(0), min), operand(0),
min)))
+ };
+ }
+
+ @Override
+ public Expression[] retractExpressions() {
+ throw new TableException("Min aggregate function does not support
retraction.");
+ }
+
+ @Override
+ public Expression[] mergeExpressions() {
+ return new Expression[] {
+ /* min = */ ifThenElse(
+ isNull(mergeOperand(min)),
+ min,
+ ifThenElse(
+ isNull(min),
+ mergeOperand(min),
+ ifThenElse(lessThan(mergeOperand(min), min),
mergeOperand(min), min)))
+ };
+ }
+
+ @Override
+ public Expression getValueExpression() {
+ return min;
+ }
+
+ @Override
+ public void setArguments(CallContext callContext) {
+ if (resultType == null) {
+ // check argument type firstly
+
checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType());
+ resultType = callContext.getArgumentDataTypes().get(0);
+ }
+ }
+
+ private void checkArgumentType(LogicalType logicalType) {
+ // Flink doesn't support to compare nested type now, so here can't
support it, see
+ // ScalarOperatorGens#generateComparison for more detail
+ if (logicalType.is(LogicalTypeRoot.ARRAY)
+ || logicalType.is(LogicalTypeRoot.MAP)
+ || logicalType.is(LogicalTypeRoot.ROW)) {
+ throw new TableException(
+ String.format(
+ "Hive native min aggregate function does not
support type: '%s' now. Please re-check the data type.",
+ logicalType.getTypeRoot()));
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index 1982ddb79ba..6ca6ca84dd9 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -26,6 +26,7 @@ import
org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import
org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.hive.HiveMinAggFunction;
import org.apache.flink.table.functions.hive.HiveSumAggFunction;
import org.apache.flink.table.module.Module;
import
org.apache.flink.table.module.hive.udf.generic.GenericUDFLegacyGroupingID;
@@ -84,6 +85,9 @@ public class HiveModule implements Module {
"tumble_rowtime",
"tumble_start")));
+ static final Set<String> BUILTIN_NATIVE_AGG_FUNC =
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum",
"min")));
+
private final HiveFunctionDefinitionFactory factory;
private final String hiveVersion;
private final HiveShim hiveShim;
@@ -141,9 +145,9 @@ public class HiveModule implements Module {
}
FunctionDefinitionFactory.Context context = () -> classLoader;
- // We override Hive's sum function by native implementation to
supports hash-agg
- if (isNativeAggFunctionEnabled() && name.equalsIgnoreCase("sum")) {
- return Optional.of(new HiveSumAggFunction());
+ // We override some Hive's function by native implementation to
supports hash-agg
+ if (isNativeAggFunctionEnabled() &&
BUILTIN_NATIVE_AGG_FUNC.contains(name.toLowerCase())) {
+ return getBuiltInNativeAggFunction(name.toLowerCase());
}
// We override Hive's grouping function. Refer to the implementation
for more details.
@@ -196,4 +200,19 @@ public class HiveModule implements Module {
private boolean isNativeAggFunctionEnabled() {
return config.get(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED);
}
+
+ private Optional<FunctionDefinition> getBuiltInNativeAggFunction(String
name) {
+ switch (name) {
+ case "sum":
+ // We override Hive's sum function by native implementation to
supports hash-agg
+ return Optional.of(new HiveSumAggFunction());
+ case "min":
+ // We override Hive's min function by native implementation to
supports hash-agg
+ return Optional.of(new HiveMinAggFunction());
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Built-in hive aggregate function doesn't
support %s yet!", name));
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
index c92bf6546f1..bd124c7d554 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.module.CoreModule;
@@ -28,6 +29,7 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -38,7 +40,6 @@ import java.util.List;
import static
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static
org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -76,19 +77,6 @@ public class HiveDialectAggITCase {
tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED,
true);
}
- @Test
- public void testSumAggFunctionPlan() {
- // test explain
- String actualPlan = explainSql("select x, sum(y) from foo group by x");
-
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testSumAggFunctionPlan.out"));
-
- // test fallback to hive sum udaf
- tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED,
false);
- String actualSortAggPlan = explainSql("select x, sum(y) from foo group
by x");
- assertThat(actualSortAggPlan)
-
.isEqualTo(readFromResource("/explain/testSumAggFunctionFallbackPlan.out"));
- }
-
@Test
public void testSimpleSumAggFunction() throws Exception {
tableEnv.executeSql(
@@ -148,7 +136,7 @@ public class HiveDialectAggITCase {
assertThatThrownBy(
() ->
CollectionUtil.iteratorToList(
- tableEnv.executeSql("select
sum(ts)from test_sum")
+ tableEnv.executeSql("select sum(ts)
from test_sum")
.collect()))
.rootCause()
.satisfiesAnyOf(
@@ -179,11 +167,115 @@ public class HiveDialectAggITCase {
tableEnv.executeSql("drop table test_sum_group");
}
- private String explainSql(String sql) {
- return (String)
- CollectionUtil.iteratorToList(tableEnv.executeSql("explain " +
sql).collect())
- .get(0)
- .getField(0);
+ @Test
+ public void testMinAggFunction() throws Exception {
+ tableEnv.executeSql(
+ "create table test_min(a int, b boolean, x string, y string, z
int, d decimal(10,5), e float, f double, ts timestamp, dt date, bar binary)");
+ tableEnv.executeSql(
+ "insert into test_min values (1, true, NULL, '2', 1,
1.11, 1.2, 1.3, '2021-08-04 16:26:33.4','2021-08-04', 'data1'), "
+ + "(1, false, NULL, 'b', 2, 2.22, 2.3, 2.4,
'2021-08-06 16:26:33.4','2021-08-07', 'data2'), "
+ + "(2, false, NULL, '4', 1, 3.33, 3.5, 3.6,
'2021-08-08 16:26:33.4','2021-08-08', 'data3'), "
+ + "(2, true, NULL, NULL, 4, 4.45, 4.7, 4.8,
'2021-08-10 16:26:33.4','2021-08-01', 'data4')")
+ .await();
+
+ // test min with all elements are null
+ List<Row> result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(x) from
test_min").collect());
+ assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+ // test min with some elements are null
+ List<Row> result2 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(y) from
test_min").collect());
+ assertThat(result2.toString()).isEqualTo("[+I[2]]");
+
+ // test min with some elements repeated
+ List<Row> result3 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(z) from
test_min").collect());
+ assertThat(result3.toString()).isEqualTo("[+I[1]]");
+
+ // test min with decimal type
+ List<Row> result4 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(d) from
test_min").collect());
+ assertThat(result4.toString()).isEqualTo("[+I[1.11000]]");
+
+ // test min with float type
+ List<Row> result5 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(e) from
test_min").collect());
+ assertThat(result5.toString()).isEqualTo("[+I[1.2]]");
+
+ // test min with double type
+ List<Row> result6 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(f) from
test_min").collect());
+ assertThat(result6.toString()).isEqualTo("[+I[1.3]]");
+
+ // test min with boolean type
+ List<Row> result7 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(b) from
test_min").collect());
+ assertThat(result7.toString()).isEqualTo("[+I[false]]");
+
+ // test min with timestamp type
+ List<Row> result8 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(ts) from
test_min").collect());
+
assertThat(result8.toString()).isEqualTo("[+I[2021-08-04T16:26:33.400]]");
+
+ // test min with date type
+ List<Row> result9 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(dt) from
test_min").collect());
+ assertThat(result9.toString()).isEqualTo("[+I[2021-08-01]]");
+
+ // test min with binary type
+ List<Row> result10 =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select min(bar) from
test_min").collect());
+ assertThat(result10.toString()).isEqualTo("[+I[[100, 97, 116, 97,
49]]]");
+
+ tableEnv.executeSql("drop table test_min");
+
+ // test min with unsupported data type
+ tableEnv.executeSql(
+ "create table test_min_not_support_type(a array<int>,m
map<int, string>,s struct<f1:int,f2:string>)");
+ // test min with row type
+ String expectedRowMessage =
+ "Hive native min aggregate function does not support type:
'ROW' now. Please re-check the data type.";
+ assertSqlException(
+ "select min(s) from test_min_not_support_type",
+ TableException.class,
+ expectedRowMessage);
+
+ // test min with array type
+ String expectedArrayMessage =
+ "Hive native min aggregate function does not support type:
'ARRAY' now. Please re-check the data type.";
+ assertSqlException(
+ "select min(a) from test_min_not_support_type",
+ TableException.class,
+ expectedArrayMessage);
+
+ // test min with map type, hive also does not support map type
comparisons.
+ String expectedMapMessage =
+ "Cannot support comparison of map<> type or complex type
containing map<>.";
+ assertSqlException(
+ "select min(m) from test_min_not_support_type",
+ UDFArgumentTypeException.class,
+ expectedMapMessage);
+
+ tableEnv.executeSql("drop table test_min_not_support_type");
+ }
+
+ private void assertSqlException(
+ String sql, Class<?> expectedExceptionClz, String expectedMessage)
{
+ assertThatThrownBy(() -> tableEnv.executeSql(sql))
+ .rootCause()
+ .isInstanceOf(expectedExceptionClz)
+ .hasMessage(expectedMessage);
}
private static TableEnvironment getTableEnvWithHiveCatalog() {
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
new file mode 100644
index 00000000000..69b8ca9179f
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.module.CoreModule;
+import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
+import static
org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test hive query plan. */
+public class HiveDialectQueryPlanTest {
+
+ private static HiveCatalog hiveCatalog;
+ private static TableEnvironment tableEnv;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ hiveCatalog = HiveTestUtils.createHiveCatalog();
+ // required by query like "src.`[k].*` from src"
+
hiveCatalog.getHiveConf().setVar(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT,
"none");
+ hiveCatalog.open();
+ tableEnv = getTableEnvWithHiveCatalog();
+
+ // create tables
+ tableEnv.executeSql("create table foo (x int, y int)");
+
+ HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "foo")
+ .addRow(new Object[] {1, 1})
+ .addRow(new Object[] {2, 2})
+ .addRow(new Object[] {3, 3})
+ .addRow(new Object[] {4, 4})
+ .addRow(new Object[] {5, 5})
+ .commit();
+ }
+
+ @Before
+ public void before() {
+ // enable native hive agg function
+ tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED,
true);
+ }
+
+ @Test
+ public void testSumAggFunctionPlan() {
+ // test explain
+ String actualPlan = explainSql("select x, sum(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testSumAggFunctionPlan.out"));
+
+ // test fallback to hive sum udaf
+ tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED,
false);
+ String actualSortAggPlan = explainSql("select x, sum(y) from foo group
by x");
+ assertThat(actualSortAggPlan)
+
.isEqualTo(readFromResource("/explain/testSumAggFunctionFallbackPlan.out"));
+ }
+
+ @Test
+ public void testMinAggFunctionPlan() {
+ // test explain
+ String actualPlan = explainSql("select x, min(y) from foo group by x");
+
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMinAggFunctionPlan.out"));
+
+ // test fallback to hive min udaf
+ tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED,
false);
+ String actualSortAggPlan = explainSql("select x, min(y) from foo group
by x");
+ assertThat(actualSortAggPlan)
+
.isEqualTo(readFromResource("/explain/testMinAggFunctionFallbackPlan.out"));
+ }
+
+ private String explainSql(String sql) {
+ return (String)
+ CollectionUtil.iteratorToList(tableEnv.executeSql("explain " +
sql).collect())
+ .get(0)
+ .getField(0);
+ }
+
+ private static TableEnvironment getTableEnvWithHiveCatalog() {
+ TableEnvironment tableEnv =
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+ tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tableEnv.useCatalog(hiveCatalog.getName());
+ // automatically load hive module in hive-compatible mode
+ HiveModule hiveModule =
+ new HiveModule(
+ hiveCatalog.getHiveVersion(),
+ tableEnv.getConfig(),
+ Thread.currentThread().getContextClassLoader());
+ CoreModule coreModule = CoreModule.INSTANCE;
+ for (String loaded : tableEnv.listModules()) {
+ tableEnv.unloadModule(loaded);
+ }
+ tableEnv.loadModule("hive", hiveModule);
+ tableEnv.loadModule("core", coreModule);
+ return tableEnv;
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionFallbackPlan.out
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionFallbackPlan.out
new file mode 100644
index 00000000000..1dc2488ba8f
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionFallbackPlan.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[min($1)])
+ +- LogicalProject($f0=[$0], $f1=[$1])
+ +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+ +- Exchange(distribution=[hash[x]])
+ +- LocalSortAggregate(groupBy=[x], select=[x, Partial_min(y) AS $f1])
+ +- Sort(orderBy=[x ASC])
+ +- TableSourceScan(table=[[test-catalog, default, foo]],
fields=[x, y])
+
+== Optimized Execution Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+ +- Exchange(distribution=[hash[x]])
+ +- LocalSortAggregate(groupBy=[x], select=[x, Partial_min(y) AS $f1])
+ +- Sort(orderBy=[x ASC])
+ +- TableSourceScan(table=[[test-catalog, default, foo]],
fields=[x, y])
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionPlan.out
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionPlan.out
new file mode 100644
index 00000000000..902e454afb5
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionPlan.out
@@ -0,0 +1,17 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[min($1)])
+ +- LogicalProject($f0=[$0], $f1=[$1])
+ +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min(min$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+ +- LocalHashAggregate(groupBy=[x], select=[x, Partial_min(y) AS min$0])
+ +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])
+
+== Optimized Execution Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min(min$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+ +- LocalHashAggregate(groupBy=[x], select=[x, Partial_min(y) AS min$0])
+ +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])