This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fac3ac786674f9b6ce5716902e74b1533ccb1c0a
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Tue Nov 7 17:05:20 2023 -0800

    [FLINK-33480] Remove GroupAggJsonPlanTest & GroupAggJsonPlanITCase
---
 .../exec/stream/GroupAggregateJsonPlanTest.java    | 182 ------
 .../jsonplan/GroupAggregateJsonPlanITCase.java     | 205 -------
 ...tDistinctAggCalls[isMiniBatchEnabled=false].out | 345 ------------
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 607 ---------------------
 ...gCallsWithGroupBy[isMiniBatchEnabled=false].out | 288 ----------
 ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out | 367 -------------
 ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 314 -----------
 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 402 --------------
 ...erDefinedAggCalls[isMiniBatchEnabled=false].out | 281 ----------
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 304 -----------
 10 files changed, 3295 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
deleted file mode 100644
index e655a3527ea..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.CountDistinct;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-/** Test json serialization/deserialization for group aggregate. */
-@ExtendWith(ParameterizedTestExtension.class)
-class GroupAggregateJsonPlanTest extends TableTestBase {
-
-    @Parameter private boolean isMiniBatchEnabled;
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @Parameters(name = "isMiniBatchEnabled={0}")
-    private static List<Boolean> testData() {
-        return Arrays.asList(true, false);
-    }
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-        if (isMiniBatchEnabled) {
-            tEnv.getConfig()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
true)
-                    .set(
-                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(10))
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
-        } else {
-            
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
false);
-        }
-
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int not null,\n"
-                        + "  c varchar,\n"
-                        + "  d bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableDdl);
-    }
-
-    @TestTemplate
-    void testSimpleAggCallsWithGroupBy() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  b bigint,\n"
-                        + "  cnt_a bigint,\n"
-                        + "  max_b bigint,\n"
-                        + "  min_c varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select b, "
-                        + "count(a) as cnt_a, "
-                        + "max(b) filter (where b > 1) as max_b, "
-                        + "min(c) as min_c "
-                        + "from MyTable group by b");
-    }
-
-    @TestTemplate
-    void testSimpleAggWithoutGroupBy() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  avg_a double,\n"
-                        + "  cnt bigint,\n"
-                        + "  cnt_b bigint,\n"
-                        + "  min_b bigint,\n"
-                        + "  max_c varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select "
-                        + "avg(a) as avg_a, "
-                        + "count(*) as cnt, "
-                        + "count(b) as cnt_b, "
-                        + "min(b) as min_b, "
-                        + "max(c) filter (where a > 1) as max_c "
-                        + "from MyTable");
-    }
-
-    @TestTemplate
-    void testDistinctAggCalls() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  d bigint,\n"
-                        + "  cnt_a1 bigint,\n"
-                        + "  cnt_a2 bigint,\n"
-                        + "  sum_a bigint,\n"
-                        + "  sum_b int,\n"
-                        + "  avg_b double,\n"
-                        + "  cnt_c bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select d, "
-                        + "count(distinct a) filter (where b > 10) as cnt_a1, "
-                        + "count(distinct a) as cnt_a2, "
-                        + "sum(distinct a) as sum_a, "
-                        + "sum(distinct b) as sum_b, "
-                        + "avg(b) as avg_b, "
-                        + "count(distinct c) as cnt_d "
-                        + "from MyTable group by d");
-    }
-
-    @TestTemplate
-    void testUserDefinedAggCalls() {
-        tEnv.createTemporaryFunction("my_sum1", new VarSum1AggFunction());
-        tEnv.createFunction("my_avg", WeightedAvg.class);
-        tEnv.createTemporarySystemFunction("my_sum2", 
VarSum2AggFunction.class);
-        tEnv.createTemporarySystemFunction("my_count", new CountDistinct());
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  b bigint,\n"
-                        + "  a1 bigint,\n"
-                        + "  a2 bigint,\n"
-                        + "  a3 bigint,\n"
-                        + "  c1 bigint\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'sink-insert-only' = 'false',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "insert into MySink select "
-                        + "b, "
-                        + "my_sum1(b, 10) as a1, "
-                        + "my_sum2(5, b) as a2, "
-                        + "my_avg(d, a) as a3, "
-                        + "my_count(c) as c1 "
-                        + "from MyTable group by b");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
deleted file mode 100644
index 9b2f9b01f7a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction;
-import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg;
-import org.apache.flink.table.planner.runtime.utils.TestData;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
-import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.List;
-
-/** Test for group aggregate json plan. */
-@ExtendWith(ParameterizedTestExtension.class)
-class GroupAggregateJsonPlanITCase extends JsonPlanTestBase {
-
-    @Parameter private boolean isMiniBatchEnabled;
-
-    @Parameters(name = "isMiniBatchEnabled={0}")
-    private static List<Boolean> testData() {
-        return Arrays.asList(true, false);
-    }
-
-    @BeforeEach
-    @Override
-    protected void setup() throws Exception {
-        super.setup();
-        if (isMiniBatchEnabled) {
-            tableEnv.getConfig()
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
true)
-                    .set(
-                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(10))
-                    .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L);
-        } else {
-            
tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
false);
-        }
-    }
-
-    @TestTemplate
-    void testSimpleAggCallsWithGroupBy() throws Exception {
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.smallData3()),
-                "a int",
-                "b bigint",
-                "c varchar");
-        createTestNonInsertOnlyValuesSinkTable(
-                "MySink",
-                "b bigint",
-                "cnt bigint",
-                "avg_a double",
-                "min_c varchar",
-                "primary key (b) not enforced");
-        compileSqlAndExecutePlan(
-                        "insert into MySink select b, "
-                                + "count(*) as cnt, "
-                                + "avg(a) filter (where a > 1) as avg_a, "
-                                + "min(c) as min_c "
-                                + "from MyTable group by b")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, 
Hello]"), result);
-    }
-
-    @TestTemplate
-    void testDistinctAggCalls() throws Exception {
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.data2()),
-                "a int",
-                "b bigint",
-                "c int",
-                "d varchar",
-                "e bigint");
-        createTestNonInsertOnlyValuesSinkTable(
-                "MySink",
-                "e bigint",
-                "cnt_a1 bigint",
-                "cnt_a2 bigint",
-                "sum_a bigint",
-                "sum_b bigint",
-                "avg_b double",
-                "cnt_d bigint",
-                "primary key (e) not enforced");
-        compileSqlAndExecutePlan(
-                        "insert into MySink select e, "
-                                + "count(distinct a) filter (where b > 10) as 
cnt_a1, "
-                                + "count(distinct a) as cnt_a2, "
-                                + "sum(distinct a) as sum_a, "
-                                + "sum(distinct b) as sum_b, "
-                                + "avg(b) as avg_b, "
-                                + "count(distinct d) as concat_d "
-                                + "from MyTable group by e")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList(
-                        "+I[1, 1, 4, 12, 32, 6.0, 5]",
-                        "+I[2, 1, 4, 14, 57, 8.0, 7]",
-                        "+I[3, 1, 2, 8, 31, 10.0, 3]"),
-                result);
-    }
-
-    @TestTemplate
-    void testUserDefinedAggCallsWithoutMerge() throws Exception {
-        tableEnv.createTemporaryFunction("my_sum1", new VarSum1AggFunction());
-        tableEnv.createFunction("my_avg", WeightedAvg.class);
-        tableEnv.createTemporarySystemFunction("my_sum2", 
VarSum2AggFunction.class);
-
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.data2()),
-                "a int",
-                "b bigint",
-                "c int",
-                "d varchar",
-                "e bigint");
-        createTestNonInsertOnlyValuesSinkTable(
-                "MySink",
-                "d bigint",
-                "s1 bigint",
-                "s2 bigint",
-                "s3 bigint",
-                "primary key (d) not enforced");
-
-        compileSqlAndExecutePlan(
-                        "insert into MySink select "
-                                + "e, "
-                                + "my_sum1(c, 10) as s1, "
-                                + "my_sum2(5, c) as s2, "
-                                + "my_avg(e, a) as s3 "
-                                + "from MyTable group by e")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList("+I[1, 77, 0, 1]", "+I[2, 120, 0, 2]", "+I[3, 
58, 0, 3]"), result);
-    }
-
-    @TestTemplate
-    void testUserDefinedAggCallsWithMerge() throws Exception {
-        tableEnv.createFunction("my_avg", 
JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class);
-        tableEnv.createTemporarySystemFunction(
-                "my_concat_agg", 
JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
-
-        createTestValuesSourceTable(
-                "MyTable",
-                JavaScalaConversionUtil.toJava(TestData.data2()),
-                "a int",
-                "b bigint",
-                "c int",
-                "d varchar",
-                "e bigint");
-        createTestNonInsertOnlyValuesSinkTable(
-                "MySink", "d bigint", "s1 bigint", "c1 varchar", "primary key 
(d) not enforced");
-
-        compileSqlAndExecutePlan(
-                        "insert into MySink select "
-                                + "e, "
-                                + "my_avg(e, a) as s1, "
-                                + "my_concat_agg(d) as c1 "
-                                + "from MyTable group by e")
-                .await();
-
-        List<String> result = 
TestValuesTableFactory.getResultsAsStrings("MySink");
-        assertResult(
-                Arrays.asList(
-                        "+I[1, 1, Hallo Welt wie|Hallo|GHI|EFG|DEF]",
-                        "+I[2, 2, Hallo Welt wie gehts?|Hallo 
Welt|ABC|FGH|CDE|JKL|KLM]",
-                        "+I[3, 3, HIJ|IJK|BCD]"),
-                result);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
deleted file mode 100644
index 72e7811b337..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
+++ /dev/null
@@ -1,345 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$>$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 10,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BOOLEAN NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `b` 
INT NOT NULL, `c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[d, a, (b > 10) AS $f2, b, c])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `b` 
INT NOT NULL, `c` VARCHAR(2147483647)>",
-    "description" : "Exchange(distribution=[hash[d]])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "cnt_a1",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : 2,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "cnt_a2",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "sum_a",
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "sum_b",
-      "internalName" : "$SUM$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : "avg_b",
-      "internalName" : "$AVG$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : "cnt_d",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 4 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false, false, false ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT NOT NULL, `cnt_a2` BIGINT 
NOT NULL, `sum_a` BIGINT, `sum_b` INT NOT NULL, `avg_b` INT NOT NULL, `cnt_d` 
BIGINT NOT NULL>",
-    "description" : "GroupAggregate(groupBy=[d], select=[d, COUNT(DISTINCT a) 
FILTER $f2 AS cnt_a1, COUNT(DISTINCT a) AS cnt_a2, SUM(DISTINCT a) AS sum_a, 
SUM(DISTINCT b) AS sum_b, AVG(b) AS avg_b, COUNT(DISTINCT c) AS cnt_d])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 4,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 5,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "DOUBLE"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 6,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` 
BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>",
-    "description" : "Calc(select=[d, CAST(cnt_a1 AS BIGINT) AS cnt_a1, 
CAST(cnt_a2 AS BIGINT) AS cnt_a2, sum_a, CAST(sum_b AS INTEGER) AS sum_b, 
CAST(avg_b AS DOUBLE) AS avg_b, CAST(cnt_d AS BIGINT) AS cnt_c])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_a1",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "sum_a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "sum_b",
-              "dataType" : "INT"
-            }, {
-              "name" : "avg_b",
-              "dataType" : "DOUBLE"
-            }, {
-              "name" : "cnt_c",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` 
BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
deleted file mode 100644
index 7b78f56e5ed..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
+++ /dev/null
@@ -1,607 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 10000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` BIGINT>",
-    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$>$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 10,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BOOLEAN NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `b` 
INT NOT NULL, `c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[d, a, (b > 10) AS $f2, b, c])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "cnt_a1",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : 2,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "cnt_a2",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "sum_a",
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "sum_b",
-      "internalName" : "$SUM$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : "avg_b",
-      "internalName" : "$AVG$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : "cnt_d",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 4 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false, false, false ],
-    "needRetraction" : false,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "d",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$1",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "sum$2",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "sum$3",
-        "fieldType" : "INT"
-      }, {
-        "name" : "sum$4",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$5",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$6",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      }, {
-        "name" : "distinct$1",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      }, {
-        "name" : "distinct$2",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "logicalType" : {
-              "type" : "STRUCTURED_TYPE",
-              "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-              "attributes" : [ {
-                "name" : "map",
-                "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>"
-              } ]
-            },
-            "fields" : [ {
-              "name" : "map",
-              "keyClass" : {
-                "conversionClass" : "org.apache.flink.table.data.StringData"
-              }
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "LocalGroupAggregate(groupBy=[d], select=[d, 
COUNT(distinct$0 a) FILTER $f2 AS count$0, COUNT(distinct$0 a) AS count$1, 
SUM(distinct$0 a) AS sum$2, SUM(distinct$1 b) AS sum$3, AVG(b) AS (sum$4, 
count$5), COUNT(distinct$2 c) AS count$6, DISTINCT(a) AS distinct$0, 
DISTINCT(b) AS distinct$1, DISTINCT(c) AS distinct$2])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "d",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$1",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "sum$2",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "sum$3",
-        "fieldType" : "INT"
-      }, {
-        "name" : "sum$4",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$5",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$6",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      }, {
-        "name" : "distinct$1",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      }, {
-        "name" : "distinct$2",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "logicalType" : {
-              "type" : "STRUCTURED_TYPE",
-              "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-              "attributes" : [ {
-                "name" : "map",
-                "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>"
-              } ]
-            },
-            "fields" : [ {
-              "name" : "map",
-              "keyClass" : {
-                "conversionClass" : "org.apache.flink.table.data.StringData"
-              }
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[d]])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "cnt_a1",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : 2,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "cnt_a2",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "sum_a",
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "sum_b",
-      "internalName" : "$SUM$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : "avg_b",
-      "internalName" : "$AVG$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT NOT NULL"
-    }, {
-      "name" : "cnt_d",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 4 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false, false, false ],
-    "localAggInputRowType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT 
NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT NOT NULL, `cnt_a2` BIGINT 
NOT NULL, `sum_a` BIGINT, `sum_b` INT NOT NULL, `avg_b` INT NOT NULL, `cnt_d` 
BIGINT NOT NULL>",
-    "description" : "GlobalGroupAggregate(groupBy=[d], select=[d, 
COUNT(distinct$0 count$0) AS cnt_a1, COUNT(distinct$0 count$1) AS cnt_a2, 
SUM(distinct$0 sum$2) AS sum_a, SUM(distinct$1 sum$3) AS sum_b, AVG((sum$4, 
count$5)) AS avg_b, COUNT(distinct$2 count$6) AS cnt_d])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 4,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 5,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "DOUBLE"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 6,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` 
BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>",
-    "description" : "Calc(select=[d, CAST(cnt_a1 AS BIGINT) AS cnt_a1, 
CAST(cnt_a2 AS BIGINT) AS cnt_a2, sum_a, CAST(sum_b AS INTEGER) AS sum_b, 
CAST(avg_b AS DOUBLE) AS avg_b, CAST(cnt_d AS BIGINT) AS cnt_c])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_a1",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "sum_a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "sum_b",
-              "dataType" : "INT"
-            }, {
-              "name" : "avg_b",
-              "dataType" : "DOUBLE"
-            }, {
-              "name" : "cnt_c",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` 
BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
deleted file mode 100644
index dc3ed1e0849..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
+++ /dev/null
@@ -1,288 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 1 ], [ 0 ], [ 2 ] ],
-        "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` 
VARCHAR(2147483647)> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` 
VARCHAR(2147483647)> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` 
VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[b, a, c], metadata=[]]], fields=[b, a, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$>$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 1,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BOOLEAN NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN NOT NULL, 
`c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[b, a, (b > 1) AS $f2, c])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN NOT NULL, 
`c` VARCHAR(2147483647)>",
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "cnt_a",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "max_b",
-      "internalName" : "$MAX$1",
-      "argList" : [ 0 ],
-      "filterArg" : 2,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT"
-    }, {
-      "name" : "min_c",
-      "internalName" : "$MIN$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `cnt_a` BIGINT NOT NULL, `max_b` 
INT, `min_c` VARCHAR(2147483647)>",
-    "description" : "GroupAggregate(groupBy=[b], select=[b, COUNT(a) AS cnt_a, 
MAX(b) FILTER $f2 AS max_b, MIN(c) AS min_c])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "INT"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` 
VARCHAR(2147483647)>",
-    "description" : "Calc(select=[CAST(b AS BIGINT) AS b, CAST(cnt_a AS 
BIGINT) AS cnt_a, CAST(max_b AS BIGINT) AS max_b, min_c])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "max_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "min_c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` 
VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, cnt_a, max_b, min_c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
deleted file mode 100644
index 1c39ad82ffd..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
+++ /dev/null
@@ -1,367 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 1 ], [ 0 ], [ 2 ] ],
-        "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` 
VARCHAR(2147483647)> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` 
VARCHAR(2147483647)> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` 
VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[b, a, c], metadata=[]]], fields=[b, a, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 10000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` 
VARCHAR(2147483647)>",
-    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$>$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 1,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BOOLEAN NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN NOT NULL, 
`c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[b, a, (b > 1) AS $f2, c])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "cnt_a",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "max_b",
-      "internalName" : "$MAX$1",
-      "argList" : [ 0 ],
-      "filterArg" : 2,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT"
-    }, {
-      "name" : "min_c",
-      "internalName" : "$MIN$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false ],
-    "needRetraction" : false,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `count$0` BIGINT, `max$1` INT, 
`min$2` VARCHAR(2147483647)>",
-    "description" : "LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS 
count$0, MAX(b) FILTER $f2 AS max$1, MIN(c) AS min$2])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `count$0` BIGINT, `max$1` INT, 
`min$2` VARCHAR(2147483647)>",
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "cnt_a",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "max_b",
-      "internalName" : "$MAX$1",
-      "argList" : [ 0 ],
-      "filterArg" : 2,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT"
-    }, {
-      "name" : "min_c",
-      "internalName" : "$MIN$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false ],
-    "localAggInputRowType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN 
NOT NULL, `c` VARCHAR(2147483647)>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `cnt_a` BIGINT NOT NULL, `max_b` 
INT, `min_c` VARCHAR(2147483647)>",
-    "description" : "GlobalGroupAggregate(groupBy=[b], select=[b, 
COUNT(count$0) AS cnt_a, MAX(max$1) AS max_b, MIN(min$2) AS min_c])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "INT"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` 
VARCHAR(2147483647)>",
-    "description" : "Calc(select=[CAST(b AS BIGINT) AS b, CAST(cnt_a AS 
BIGINT) AS cnt_a, CAST(max_b AS BIGINT) AS max_b, min_c])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "max_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "min_c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` 
VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, cnt_a, max_b, min_c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
deleted file mode 100644
index aebfcd2a9b5..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
+++ /dev/null
@@ -1,314 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "POSTFIX",
-      "internalName" : "$IS TRUE$1",
-      "operands" : [ {
-        "kind" : "CALL",
-        "syntax" : "BINARY",
-        "internalName" : "$>$1",
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 0,
-          "type" : "BIGINT"
-        }, {
-          "kind" : "LITERAL",
-          "value" : 1,
-          "type" : "INT NOT NULL"
-        } ],
-        "type" : "BOOLEAN"
-      } ],
-      "type" : "BOOLEAN NOT NULL"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`$f3` BOOLEAN NOT NULL>",
-    "description" : "Calc(select=[a, b, c, (a > 1) IS TRUE AS $f3])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "SINGLETON"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`$f3` BOOLEAN NOT NULL>",
-    "description" : "Exchange(distribution=[single])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ ],
-    "aggCalls" : [ {
-      "name" : "avg_a",
-      "internalName" : "$AVG$1",
-      "argList" : [ 0 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "cnt",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "min_b",
-      "internalName" : "$MIN$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT"
-    }, {
-      "name" : "max_c",
-      "internalName" : "$MAX$1",
-      "argList" : [ 2 ],
-      "filterArg" : 3,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`avg_a` BIGINT, `cnt` BIGINT NOT NULL, `min_b` INT, 
`max_c` VARCHAR(2147483647)>",
-    "description" : "GroupAggregate(select=[AVG(a) AS avg_a, COUNT(*) AS cnt, 
MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "BIGINT"
-      } ],
-      "type" : "DOUBLE"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "INT"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` 
BIGINT, `max_c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[CAST(avg_a AS DOUBLE) AS avg_a, CAST(cnt AS 
BIGINT) AS cnt, CAST(cnt AS BIGINT) AS cnt_b, CAST(min_b AS BIGINT) AS min_b, 
max_c])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "avg_a",
-              "dataType" : "DOUBLE"
-            }, {
-              "name" : "cnt",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "min_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "max_c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` 
BIGINT, `max_c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[avg_a, cnt, cnt_b, min_b, max_c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
deleted file mode 100644
index 3be27cfec5c..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
+++ /dev/null
@@ -1,402 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 10000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
-    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "POSTFIX",
-      "internalName" : "$IS TRUE$1",
-      "operands" : [ {
-        "kind" : "CALL",
-        "syntax" : "BINARY",
-        "internalName" : "$>$1",
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 0,
-          "type" : "BIGINT"
-        }, {
-          "kind" : "LITERAL",
-          "value" : 1,
-          "type" : "INT NOT NULL"
-        } ],
-        "type" : "BOOLEAN"
-      } ],
-      "type" : "BOOLEAN NOT NULL"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`$f3` BOOLEAN NOT NULL>",
-    "description" : "Calc(select=[a, b, c, (a > 1) IS TRUE AS $f3])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ ],
-    "aggCalls" : [ {
-      "name" : "avg_a",
-      "internalName" : "$AVG$1",
-      "argList" : [ 0 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "cnt",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "min_b",
-      "internalName" : "$MIN$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT"
-    }, {
-      "name" : "max_c",
-      "internalName" : "$MAX$1",
-      "argList" : [ 2 ],
-      "filterArg" : 3,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false ],
-    "needRetraction" : false,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`sum$0` BIGINT, `count$1` BIGINT, `count1$2` BIGINT, 
`min$3` INT, `max$4` VARCHAR(2147483647)>",
-    "description" : "LocalGroupAggregate(select=[AVG(a) AS (sum$0, count$1), 
COUNT(*) AS count1$2, MIN(b) AS min$3, MAX(c) FILTER $f3 AS max$4])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "SINGLETON"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`sum$0` BIGINT, `count$1` BIGINT, `count1$2` BIGINT, 
`min$3` INT, `max$4` VARCHAR(2147483647)>",
-    "description" : "Exchange(distribution=[single])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ ],
-    "aggCalls" : [ {
-      "name" : "avg_a",
-      "internalName" : "$AVG$1",
-      "argList" : [ 0 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "cnt",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "min_b",
-      "internalName" : "$MIN$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "INT"
-    }, {
-      "name" : "max_c",
-      "internalName" : "$MAX$1",
-      "argList" : [ 2 ],
-      "filterArg" : 3,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false ],
-    "localAggInputRowType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647), `$f3` BOOLEAN NOT NULL>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`avg_a` BIGINT, `cnt` BIGINT NOT NULL, `min_b` INT, 
`max_c` VARCHAR(2147483647)>",
-    "description" : "GlobalGroupAggregate(select=[AVG((sum$0, count$1)) AS 
avg_a, COUNT(count1$2) AS cnt, MIN(min$3) AS min_b, MAX(max$4) AS max_c])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "BIGINT"
-      } ],
-      "type" : "DOUBLE"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "BIGINT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "INT"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` 
BIGINT, `max_c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[CAST(avg_a AS DOUBLE) AS avg_a, CAST(cnt AS 
BIGINT) AS cnt, CAST(cnt AS BIGINT) AS cnt_b, CAST(min_b AS BIGINT) AS min_b, 
max_c])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "avg_a",
-              "dataType" : "DOUBLE"
-            }, {
-              "name" : "cnt",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "min_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "max_c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` 
BIGINT, `max_c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[avg_a, cnt, cnt_b, min_b, max_c])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
deleted file mode 100644
index 0777239ed59..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
+++ /dev/null
@@ -1,281 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "LITERAL",
-      "value" : 10,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "LITERAL",
-      "value" : 5,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT 
NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[b, 10 AS $f1, 5 AS $f2, d, a, c])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT 
NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "false",
-      "table.exec.mini-batch.size" : "-1"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "a1",
-      "catalogName" : "`default_catalog`.`default_database`.`my_sum1`",
-      "argList" : [ 0, 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "a2",
-      "systemName" : "my_sum2",
-      "argList" : [ 2, 0 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "a3",
-      "catalogName" : "`default_catalog`.`default_database`.`my_avg`",
-      "class" : 
"org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg",
-      "argList" : [ 3, 4 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "c1",
-      "systemName" : "my_count",
-      "argList" : [ 5 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `a1` BIGINT, `a2` BIGINT, `a3` 
BIGINT, `c1` BIGINT>",
-    "description" : "GroupAggregate(groupBy=[b], select=[b, my_sum1(b, $f1) AS 
a1, my_sum2($f2, b) AS a2, my_avg(d, a) AS a3, my_count(c) AS c1])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 4,
-      "type" : "BIGINT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, 
`c1` BIGINT>",
-    "description" : "Calc(select=[CAST(b AS BIGINT) AS b, a1, a2, a3, c1])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a1",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a3",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "c1",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, 
`c1` BIGINT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, a1, a2, a3, c1])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
deleted file mode 100644
index bdb25ed0498..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
+++ /dev/null
@@ -1,304 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 10000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` BIGINT>",
-    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "LITERAL",
-      "value" : 10,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "LITERAL",
-      "value" : 5,
-      "type" : "INT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT 
NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[b, 10 AS $f1, 5 AS $f2, d, a, c])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT 
NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "a1",
-      "catalogName" : "`default_catalog`.`default_database`.`my_sum1`",
-      "argList" : [ 0, 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "a2",
-      "systemName" : "my_sum2",
-      "argList" : [ 2, 0 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "a3",
-      "catalogName" : "`default_catalog`.`default_database`.`my_avg`",
-      "class" : 
"org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg",
-      "argList" : [ 3, 4 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : "c1",
-      "systemName" : "my_count",
-      "argList" : [ 5 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    } ],
-    "aggCallNeedRetractions" : [ false, false, false, false ],
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "groupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` INT NOT NULL, `a1` BIGINT, `a2` BIGINT, `a3` 
BIGINT, `c1` BIGINT>",
-    "description" : "GroupAggregate(groupBy=[b], select=[b, my_sum1(b, $f1) AS 
a1, my_sum2($f2, b) AS a2, my_avg(d, a) AS a3, my_count(c) AS c1])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "SPECIAL",
-      "internalName" : "$CAST$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 3,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 4,
-      "type" : "BIGINT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, 
`c1` BIGINT>",
-    "description" : "Calc(select=[CAST(b AS BIGINT) AS b, a1, a2, a3, c1])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a1",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a2",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a3",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "c1",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "sink-insert-only" : "false",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, 
`c1` BIGINT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, a1, a2, a3, c1])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}

Reply via email to