This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fac3ac786674f9b6ce5716902e74b1533ccb1c0a Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Tue Nov 7 17:05:20 2023 -0800 [FLINK-33480] Remove GroupAggJsonPlanTest & GroupAggJsonPlanITCase --- .../exec/stream/GroupAggregateJsonPlanTest.java | 182 ------ .../jsonplan/GroupAggregateJsonPlanITCase.java | 205 ------- ...tDistinctAggCalls[isMiniBatchEnabled=false].out | 345 ------------ ...stDistinctAggCalls[isMiniBatchEnabled=true].out | 607 --------------------- ...gCallsWithGroupBy[isMiniBatchEnabled=false].out | 288 ---------- ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out | 367 ------------- ...AggWithoutGroupBy[isMiniBatchEnabled=false].out | 314 ----------- ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out | 402 -------------- ...erDefinedAggCalls[isMiniBatchEnabled=false].out | 281 ---------- ...serDefinedAggCalls[isMiniBatchEnabled=true].out | 304 ----------- 10 files changed, 3295 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java deleted file mode 100644 index e655a3527ea..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.CountDistinct; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.time.Duration; -import java.util.Arrays; -import java.util.List; - -/** Test json serialization/deserialization for group aggregate. */ -@ExtendWith(ParameterizedTestExtension.class) -class GroupAggregateJsonPlanTest extends TableTestBase { - - @Parameter private boolean isMiniBatchEnabled; - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @Parameters(name = "isMiniBatchEnabled={0}") - private static List<Boolean> testData() { - return Arrays.asList(true, false); - } - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - if (isMiniBatchEnabled) { - tEnv.getConfig() - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) - .set( - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, - Duration.ofSeconds(10)) - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L); - } else { - tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false); - } - - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " a bigint,\n" - + " b int not null,\n" - + " c varchar,\n" - + " d bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - } - - @TestTemplate - void testSimpleAggCallsWithGroupBy() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b bigint,\n" - + " cnt_a bigint,\n" - + " max_b bigint,\n" - + " min_c varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select b, " - + "count(a) as cnt_a, " - + "max(b) filter (where b > 1) as max_b, " - + "min(c) as min_c " - + "from MyTable group by b"); - } - - @TestTemplate - void testSimpleAggWithoutGroupBy() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " avg_a double,\n" - + " cnt bigint,\n" - + " cnt_b bigint,\n" - + " min_b bigint,\n" - + " max_c varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select " - + "avg(a) as avg_a, " - + "count(*) as cnt, " - + "count(b) as cnt_b, " - + "min(b) as min_b, " - + "max(c) filter (where a > 1) as max_c " - + "from MyTable"); - } - - @TestTemplate - void testDistinctAggCalls() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " d bigint,\n" - + " cnt_a1 bigint,\n" - + " cnt_a2 bigint,\n" - + " sum_a bigint,\n" - + " sum_b int,\n" - + " avg_b double,\n" - + " cnt_c bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select d, " - + "count(distinct a) filter (where b > 10) as cnt_a1, " - + "count(distinct a) as cnt_a2, " - + "sum(distinct a) as sum_a, " - + "sum(distinct b) as sum_b, " - + "avg(b) as avg_b, " - + "count(distinct c) as cnt_d " - + "from MyTable group by d"); - } - - @TestTemplate - void testUserDefinedAggCalls() { - tEnv.createTemporaryFunction("my_sum1", new VarSum1AggFunction()); - tEnv.createFunction("my_avg", WeightedAvg.class); - tEnv.createTemporarySystemFunction("my_sum2", VarSum2AggFunction.class); - tEnv.createTemporarySystemFunction("my_count", new CountDistinct()); - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " b bigint,\n" - + " a1 bigint,\n" - + " a2 bigint,\n" - + " a3 bigint,\n" - + " c1 bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "insert into MySink select " - + "b, " - + "my_sum1(b, 10) as a1, " - + "my_sum2(5, b) as a2, " - + "my_avg(d, a) as a3, " - + "my_count(c) as c1 " - + "from MyTable group by b"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java deleted file mode 100644 index 9b2f9b01f7a..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/GroupAggregateJsonPlanITCase.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum1AggFunction; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.VarSum2AggFunction; -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg; -import org.apache.flink.table.planner.runtime.utils.TestData; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.time.Duration; -import java.util.Arrays; -import java.util.List; - -/** Test for group aggregate json plan. */ -@ExtendWith(ParameterizedTestExtension.class) -class GroupAggregateJsonPlanITCase extends JsonPlanTestBase { - - @Parameter private boolean isMiniBatchEnabled; - - @Parameters(name = "isMiniBatchEnabled={0}") - private static List<Boolean> testData() { - return Arrays.asList(true, false); - } - - @BeforeEach - @Override - protected void setup() throws Exception { - super.setup(); - if (isMiniBatchEnabled) { - tableEnv.getConfig() - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) - .set( - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, - Duration.ofSeconds(10)) - .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L); - } else { - tableEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, false); - } - } - - @TestTemplate - void testSimpleAggCallsWithGroupBy() throws Exception { - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.smallData3()), - "a int", - "b bigint", - "c varchar"); - createTestNonInsertOnlyValuesSinkTable( - "MySink", - "b bigint", - "cnt bigint", - "avg_a double", - "min_c varchar", - "primary key (b) not enforced"); - compileSqlAndExecutePlan( - "insert into MySink select b, " - + "count(*) as cnt, " - + "avg(a) filter (where a > 1) as avg_a, " - + "min(c) as min_c " - + "from MyTable group by b") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, Hello]"), result); - } - - @TestTemplate - void testDistinctAggCalls() throws Exception { - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.data2()), - "a int", - "b bigint", - "c int", - "d varchar", - "e bigint"); - createTestNonInsertOnlyValuesSinkTable( - "MySink", - "e bigint", - "cnt_a1 bigint", - "cnt_a2 bigint", - "sum_a bigint", - "sum_b bigint", - "avg_b double", - "cnt_d bigint", - "primary key (e) not enforced"); - compileSqlAndExecutePlan( - "insert into MySink select e, " - + "count(distinct a) filter (where b > 10) as cnt_a1, " - + "count(distinct a) as cnt_a2, " - + "sum(distinct a) as sum_a, " - + "sum(distinct b) as sum_b, " - + "avg(b) as avg_b, " - + "count(distinct d) as concat_d " - + "from MyTable group by e") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList( - "+I[1, 1, 4, 12, 32, 6.0, 5]", - "+I[2, 1, 4, 14, 57, 8.0, 7]", - "+I[3, 1, 2, 8, 31, 10.0, 3]"), - result); - } - - @TestTemplate - void testUserDefinedAggCallsWithoutMerge() throws Exception { - tableEnv.createTemporaryFunction("my_sum1", new VarSum1AggFunction()); - tableEnv.createFunction("my_avg", WeightedAvg.class); - tableEnv.createTemporarySystemFunction("my_sum2", VarSum2AggFunction.class); - - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.data2()), - "a int", - "b bigint", - "c int", - "d varchar", - "e bigint"); - createTestNonInsertOnlyValuesSinkTable( - "MySink", - "d bigint", - "s1 bigint", - "s2 bigint", - "s3 bigint", - "primary key (d) not enforced"); - - compileSqlAndExecutePlan( - "insert into MySink select " - + "e, " - + "my_sum1(c, 10) as s1, " - + "my_sum2(5, c) as s2, " - + "my_avg(e, a) as s3 " - + "from MyTable group by e") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList("+I[1, 77, 0, 1]", "+I[2, 120, 0, 2]", "+I[3, 58, 0, 3]"), result); - } - - @TestTemplate - void testUserDefinedAggCallsWithMerge() throws Exception { - tableEnv.createFunction("my_avg", JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class); - tableEnv.createTemporarySystemFunction( - "my_concat_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class); - - createTestValuesSourceTable( - "MyTable", - JavaScalaConversionUtil.toJava(TestData.data2()), - "a int", - "b bigint", - "c int", - "d varchar", - "e bigint"); - createTestNonInsertOnlyValuesSinkTable( - "MySink", "d bigint", "s1 bigint", "c1 varchar", "primary key (d) not enforced"); - - compileSqlAndExecutePlan( - "insert into MySink select " - + "e, " - + "my_avg(e, a) as s1, " - + "my_concat_agg(d) as c1 " - + "from MyTable group by e") - .await(); - - List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink"); - assertResult( - Arrays.asList( - "+I[1, 1, Hallo Welt wie|Hallo|GHI|EFG|DEF]", - "+I[2, 2, Hallo Welt wie gehts?|Hallo Welt|ABC|FGH|CDE|JKL|KLM]", - "+I[3, 3, HIJ|IJK|BCD]"), - result); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out deleted file mode 100644 index 72e7811b337..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out +++ /dev/null @@ -1,345 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 10, - "type" : "INT NOT NULL" - } ], - "type" : "BOOLEAN NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[d, a, (b > 10) AS $f2, b, c])" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "Exchange(distribution=[hash[d]])" - }, { - "id" : 4, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "cnt_a1", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : 2, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "cnt_a2", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "sum_a", - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "sum_b", - "internalName" : "$SUM$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : "avg_b", - "internalName" : "$AVG$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : "cnt_d", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 4 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false, false, false, false, false, false ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT NOT NULL, `cnt_a2` BIGINT NOT NULL, `sum_a` BIGINT, `sum_b` INT NOT NULL, `avg_b` INT NOT NULL, `cnt_d` BIGINT NOT NULL>", - "description" : "GroupAggregate(groupBy=[d], select=[d, COUNT(DISTINCT a) FILTER $f2 AS cnt_a1, COUNT(DISTINCT a) AS cnt_a2, SUM(DISTINCT a) AS sum_a, SUM(DISTINCT b) AS sum_b, AVG(b) AS avg_b, COUNT(DISTINCT c) AS cnt_d])" - }, { - "id" : 5, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "INT NOT NULL" - } ], - "type" : "INT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 5, - "type" : "INT NOT NULL" - } ], - "type" : "DOUBLE" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 6, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>", - "description" : "Calc(select=[d, CAST(cnt_a1 AS BIGINT) AS cnt_a1, CAST(cnt_a2 AS BIGINT) AS cnt_a2, sum_a, CAST(sum_b AS INTEGER) AS sum_b, CAST(avg_b AS DOUBLE) AS avg_b, CAST(cnt_d AS BIGINT) AS cnt_c])" - }, { - "id" : 6, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "d", - "dataType" : "BIGINT" - }, { - "name" : "cnt_a1", - "dataType" : "BIGINT" - }, { - "name" : "cnt_a2", - "dataType" : "BIGINT" - }, { - "name" : "sum_a", - "dataType" : "BIGINT" - }, { - "name" : "sum_b", - "dataType" : "INT" - }, { - "name" : "avg_b", - "dataType" : "DOUBLE" - }, { - "name" : "cnt_c", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out deleted file mode 100644 index 7b78f56e5ed..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out +++ /dev/null @@ -1,607 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 10000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` BIGINT>", - "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 10, - "type" : "INT NOT NULL" - } ], - "type" : "BOOLEAN NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[d, a, (b > 10) AS $f2, b, c])" - }, { - "id" : 4, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "cnt_a1", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : 2, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "cnt_a2", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "sum_a", - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "sum_b", - "internalName" : "$SUM$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : "avg_b", - "internalName" : "$AVG$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : "cnt_d", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 4 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false, false, false, false, false, false ], - "needRetraction" : false, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "d", - "fieldType" : "BIGINT" - }, { - "name" : "count$0", - "fieldType" : "BIGINT" - }, { - "name" : "count$1", - "fieldType" : "BIGINT" - }, { - "name" : "sum$2", - "fieldType" : "BIGINT" - }, { - "name" : "sum$3", - "fieldType" : "INT" - }, { - "name" : "sum$4", - "fieldType" : "BIGINT" - }, { - "name" : "count$5", - "fieldType" : "BIGINT" - }, { - "name" : "count$6", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>" - } ] - } - } - }, { - "name" : "distinct$1", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>" - } ] - } - } - }, { - "name" : "distinct$2", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "logicalType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>" - } ] - }, - "fields" : [ { - "name" : "map", - "keyClass" : { - "conversionClass" : "org.apache.flink.table.data.StringData" - } - } ] - } - } - } ] - }, - "description" : "LocalGroupAggregate(groupBy=[d], select=[d, COUNT(distinct$0 a) FILTER $f2 AS count$0, COUNT(distinct$0 a) AS count$1, SUM(distinct$0 a) AS sum$2, SUM(distinct$1 b) AS sum$3, AVG(b) AS (sum$4, count$5), COUNT(distinct$2 c) AS count$6, DISTINCT(a) AS distinct$0, DISTINCT(b) AS distinct$1, DISTINCT(c) AS distinct$2])" - }, { - "id" : 5, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "d", - "fieldType" : "BIGINT" - }, { - "name" : "count$0", - "fieldType" : "BIGINT" - }, { - "name" : "count$1", - "fieldType" : "BIGINT" - }, { - "name" : "sum$2", - "fieldType" : "BIGINT" - }, { - "name" : "sum$3", - "fieldType" : "INT" - }, { - "name" : "sum$4", - "fieldType" : "BIGINT" - }, { - "name" : "count$5", - "fieldType" : "BIGINT" - }, { - "name" : "count$6", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>" - } ] - } - } - }, { - "name" : "distinct$1", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<INT NOT NULL, BIGINT NOT NULL>" - } ] - } - } - }, { - "name" : "distinct$2", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "logicalType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<VARCHAR(2147483647), BIGINT NOT NULL>" - } ] - }, - "fields" : [ { - "name" : "map", - "keyClass" : { - "conversionClass" : "org.apache.flink.table.data.StringData" - } - } ] - } - } - } ] - }, - "description" : "Exchange(distribution=[hash[d]])" - }, { - "id" : 6, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "cnt_a1", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : 2, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "cnt_a2", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "sum_a", - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "sum_b", - "internalName" : "$SUM$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : "avg_b", - "internalName" : "$AVG$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT NOT NULL" - }, { - "name" : "cnt_d", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 4 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false, false, false, false, false, false ], - "localAggInputRowType" : "ROW<`d` BIGINT, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT NOT NULL, `cnt_a2` BIGINT NOT NULL, `sum_a` BIGINT, `sum_b` INT NOT NULL, `avg_b` INT NOT NULL, `cnt_d` BIGINT NOT NULL>", - "description" : "GlobalGroupAggregate(groupBy=[d], select=[d, COUNT(distinct$0 count$0) AS cnt_a1, COUNT(distinct$0 count$1) AS cnt_a2, SUM(distinct$0 sum$2) AS sum_a, SUM(distinct$1 sum$3) AS sum_b, AVG((sum$4, count$5)) AS avg_b, COUNT(distinct$2 count$6) AS cnt_d])" - }, { - "id" : 7, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "INT NOT NULL" - } ], - "type" : "INT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 5, - "type" : "INT NOT NULL" - } ], - "type" : "DOUBLE" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 6, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>", - "description" : "Calc(select=[d, CAST(cnt_a1 AS BIGINT) AS cnt_a1, CAST(cnt_a2 AS BIGINT) AS cnt_a2, sum_a, CAST(sum_b AS INTEGER) AS sum_b, CAST(avg_b AS DOUBLE) AS avg_b, CAST(cnt_d AS BIGINT) AS cnt_c])" - }, { - "id" : 8, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "d", - "dataType" : "BIGINT" - }, { - "name" : "cnt_a1", - "dataType" : "BIGINT" - }, { - "name" : "cnt_a2", - "dataType" : "BIGINT" - }, { - "name" : "sum_a", - "dataType" : "BIGINT" - }, { - "name" : "sum_b", - "dataType" : "INT" - }, { - "name" : "avg_b", - "dataType" : "DOUBLE" - }, { - "name" : "cnt_c", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out deleted file mode 100644 index dc3ed1e0849..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out +++ /dev/null @@ -1,288 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 1 ], [ 0 ], [ 2 ] ], - "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" - } ] - }, - "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, a, c], metadata=[]]], fields=[b, a, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 1, - "type" : "INT NOT NULL" - } ], - "type" : "BOOLEAN NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[b, a, (b > 1) AS $f2, c])" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "Exchange(distribution=[hash[b]])" - }, { - "id" : 4, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "cnt_a", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "max_b", - "internalName" : "$MAX$1", - "argList" : [ 0 ], - "filterArg" : 2, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT" - }, { - "name" : "min_c", - "internalName" : "$MIN$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "VARCHAR(2147483647)" - } ], - "aggCallNeedRetractions" : [ false, false, false ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `cnt_a` BIGINT NOT NULL, `max_b` INT, `min_c` VARCHAR(2147483647)>", - "description" : "GroupAggregate(groupBy=[b], select=[b, COUNT(a) AS cnt_a, MAX(b) FILTER $f2 AS max_b, MIN(c) AS min_c])" - }, { - "id" : 5, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "INT" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` VARCHAR(2147483647)>", - "description" : "Calc(select=[CAST(b AS BIGINT) AS b, CAST(cnt_a AS BIGINT) AS cnt_a, CAST(max_b AS BIGINT) AS max_b, min_c])" - }, { - "id" : 6, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "cnt_a", - "dataType" : "BIGINT" - }, { - "name" : "max_b", - "dataType" : "BIGINT" - }, { - "name" : "min_c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, cnt_a, max_b, min_c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out deleted file mode 100644 index 1c39ad82ffd..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out +++ /dev/null @@ -1,367 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 1 ], [ 0 ], [ 2 ] ], - "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" - } ] - }, - "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, a, c], metadata=[]]], fields=[b, a, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 10000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 1, - "type" : "INT NOT NULL" - } ], - "type" : "BOOLEAN NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[b, a, (b > 1) AS $f2, c])" - }, { - "id" : 4, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "cnt_a", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "max_b", - "internalName" : "$MAX$1", - "argList" : [ 0 ], - "filterArg" : 2, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT" - }, { - "name" : "min_c", - "internalName" : "$MIN$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "VARCHAR(2147483647)" - } ], - "aggCallNeedRetractions" : [ false, false, false ], - "needRetraction" : false, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `count$0` BIGINT, `max$1` INT, `min$2` VARCHAR(2147483647)>", - "description" : "LocalGroupAggregate(groupBy=[b], select=[b, COUNT(a) AS count$0, MAX(b) FILTER $f2 AS max$1, MIN(c) AS min$2])" - }, { - "id" : 5, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `count$0` BIGINT, `max$1` INT, `min$2` VARCHAR(2147483647)>", - "description" : "Exchange(distribution=[hash[b]])" - }, { - "id" : 6, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "cnt_a", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "max_b", - "internalName" : "$MAX$1", - "argList" : [ 0 ], - "filterArg" : 2, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT" - }, { - "name" : "min_c", - "internalName" : "$MIN$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "VARCHAR(2147483647)" - } ], - "aggCallNeedRetractions" : [ false, false, false ], - "localAggInputRowType" : "ROW<`b` INT NOT NULL, `a` BIGINT, `$f2` BOOLEAN NOT NULL, `c` VARCHAR(2147483647)>", - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `cnt_a` BIGINT NOT NULL, `max_b` INT, `min_c` VARCHAR(2147483647)>", - "description" : "GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS cnt_a, MAX(max$1) AS max_b, MIN(min$2) AS min_c])" - }, { - "id" : 7, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "INT" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` VARCHAR(2147483647)>", - "description" : "Calc(select=[CAST(b AS BIGINT) AS b, CAST(cnt_a AS BIGINT) AS cnt_a, CAST(max_b AS BIGINT) AS max_b, min_c])" - }, { - "id" : 8, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "cnt_a", - "dataType" : "BIGINT" - }, { - "name" : "max_b", - "dataType" : "BIGINT" - }, { - "name" : "min_c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, cnt_a, max_b, min_c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out deleted file mode 100644 index aebfcd2a9b5..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out +++ /dev/null @@ -1,314 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "CALL", - "syntax" : "POSTFIX", - "internalName" : "$IS TRUE$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "LITERAL", - "value" : 1, - "type" : "INT NOT NULL" - } ], - "type" : "BOOLEAN" - } ], - "type" : "BOOLEAN NOT NULL" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `$f3` BOOLEAN NOT NULL>", - "description" : "Calc(select=[a, b, c, (a > 1) IS TRUE AS $f3])" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "SINGLETON" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `$f3` BOOLEAN NOT NULL>", - "description" : "Exchange(distribution=[single])" - }, { - "id" : 4, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ ], - "aggCalls" : [ { - "name" : "avg_a", - "internalName" : "$AVG$1", - "argList" : [ 0 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "cnt", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "min_b", - "internalName" : "$MIN$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT" - }, { - "name" : "max_c", - "internalName" : "$MAX$1", - "argList" : [ 2 ], - "filterArg" : 3, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "VARCHAR(2147483647)" - } ], - "aggCallNeedRetractions" : [ false, false, false, false ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`avg_a` BIGINT, `cnt` BIGINT NOT NULL, `min_b` INT, `max_c` VARCHAR(2147483647)>", - "description" : "GroupAggregate(select=[AVG(a) AS avg_a, COUNT(*) AS cnt, MIN(b) AS min_b, MAX(c) FILTER $f3 AS max_c])" - }, { - "id" : 5, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - } ], - "type" : "DOUBLE" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "INT" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` BIGINT, `max_c` VARCHAR(2147483647)>", - "description" : "Calc(select=[CAST(avg_a AS DOUBLE) AS avg_a, CAST(cnt AS BIGINT) AS cnt, CAST(cnt AS BIGINT) AS cnt_b, CAST(min_b AS BIGINT) AS min_b, max_c])" - }, { - "id" : 6, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "avg_a", - "dataType" : "DOUBLE" - }, { - "name" : "cnt", - "dataType" : "BIGINT" - }, { - "name" : "cnt_b", - "dataType" : "BIGINT" - }, { - "name" : "min_b", - "dataType" : "BIGINT" - }, { - "name" : "max_c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` BIGINT, `max_c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[avg_a, cnt, cnt_b, min_b, max_c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out deleted file mode 100644 index 3be27cfec5c..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out +++ /dev/null @@ -1,402 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ], [ 2 ] ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 10000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", - "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "CALL", - "syntax" : "POSTFIX", - "internalName" : "$IS TRUE$1", - "operands" : [ { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$>$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "LITERAL", - "value" : 1, - "type" : "INT NOT NULL" - } ], - "type" : "BOOLEAN" - } ], - "type" : "BOOLEAN NOT NULL" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `$f3` BOOLEAN NOT NULL>", - "description" : "Calc(select=[a, b, c, (a > 1) IS TRUE AS $f3])" - }, { - "id" : 4, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ ], - "aggCalls" : [ { - "name" : "avg_a", - "internalName" : "$AVG$1", - "argList" : [ 0 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "cnt", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "min_b", - "internalName" : "$MIN$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT" - }, { - "name" : "max_c", - "internalName" : "$MAX$1", - "argList" : [ 2 ], - "filterArg" : 3, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "VARCHAR(2147483647)" - } ], - "aggCallNeedRetractions" : [ false, false, false, false ], - "needRetraction" : false, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`sum$0` BIGINT, `count$1` BIGINT, `count1$2` BIGINT, `min$3` INT, `max$4` VARCHAR(2147483647)>", - "description" : "LocalGroupAggregate(select=[AVG(a) AS (sum$0, count$1), COUNT(*) AS count1$2, MIN(b) AS min$3, MAX(c) FILTER $f3 AS max$4])" - }, { - "id" : 5, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "SINGLETON" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`sum$0` BIGINT, `count$1` BIGINT, `count1$2` BIGINT, `min$3` INT, `max$4` VARCHAR(2147483647)>", - "description" : "Exchange(distribution=[single])" - }, { - "id" : 6, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ ], - "aggCalls" : [ { - "name" : "avg_a", - "internalName" : "$AVG$1", - "argList" : [ 0 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "cnt", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "min_b", - "internalName" : "$MIN$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "INT" - }, { - "name" : "max_c", - "internalName" : "$MAX$1", - "argList" : [ 2 ], - "filterArg" : 3, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "VARCHAR(2147483647)" - } ], - "aggCallNeedRetractions" : [ false, false, false, false ], - "localAggInputRowType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `$f3` BOOLEAN NOT NULL>", - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`avg_a` BIGINT, `cnt` BIGINT NOT NULL, `min_b` INT, `max_c` VARCHAR(2147483647)>", - "description" : "GlobalGroupAggregate(select=[AVG((sum$0, count$1)) AS avg_a, COUNT(count1$2) AS cnt, MIN(min$3) AS min_b, MAX(max$4) AS max_c])" - }, { - "id" : 7, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - } ], - "type" : "DOUBLE" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "INT" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` BIGINT, `max_c` VARCHAR(2147483647)>", - "description" : "Calc(select=[CAST(avg_a AS DOUBLE) AS avg_a, CAST(cnt AS BIGINT) AS cnt, CAST(cnt AS BIGINT) AS cnt_b, CAST(min_b AS BIGINT) AS min_b, max_c])" - }, { - "id" : 8, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "avg_a", - "dataType" : "DOUBLE" - }, { - "name" : "cnt", - "dataType" : "BIGINT" - }, { - "name" : "cnt_b", - "dataType" : "BIGINT" - }, { - "name" : "min_b", - "dataType" : "BIGINT" - }, { - "name" : "max_c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`avg_a` DOUBLE, `cnt` BIGINT, `cnt_b` BIGINT, `min_b` BIGINT, `max_c` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[avg_a, cnt, cnt_b, min_b, max_c])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out deleted file mode 100644 index 0777239ed59..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out +++ /dev/null @@ -1,281 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 10, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 5, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[b, 10 AS $f1, 5 AS $f2, d, a, c])" - }, { - "id" : 3, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "Exchange(distribution=[hash[b]])" - }, { - "id" : 4, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "false", - "table.exec.mini-batch.size" : "-1" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "a1", - "catalogName" : "`default_catalog`.`default_database`.`my_sum1`", - "argList" : [ 0, 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "a2", - "systemName" : "my_sum2", - "argList" : [ 2, 0 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "a3", - "catalogName" : "`default_catalog`.`default_database`.`my_avg`", - "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg", - "argList" : [ 3, 4 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "c1", - "systemName" : "my_count", - "argList" : [ 5 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - } ], - "aggCallNeedRetractions" : [ false, false, false, false ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", - "description" : "GroupAggregate(groupBy=[b], select=[b, my_sum1(b, $f1) AS a1, my_sum2($f2, b) AS a2, my_avg(d, a) AS a3, my_count(c) AS c1])" - }, { - "id" : 5, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "BIGINT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", - "description" : "Calc(select=[CAST(b AS BIGINT) AS b, a1, a2, a3, c1])" - }, { - "id" : 6, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "a1", - "dataType" : "BIGINT" - }, { - "name" : "a2", - "dataType" : "BIGINT" - }, { - "name" : "a3", - "dataType" : "BIGINT" - }, { - "name" : "c1", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, a1, a2, a3, c1])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out deleted file mode 100644 index bdb25ed0498..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out +++ /dev/null @@ -1,304 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 10000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` BIGINT>", - "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 10, - "type" : "INT NOT NULL" - }, { - "kind" : "LITERAL", - "value" : 5, - "type" : "INT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "Calc(select=[b, 10 AS $f1, 5 AS $f2, d, a, c])" - }, { - "id" : 4, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `$f1` INT NOT NULL, `$f2` INT NOT NULL, `d` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>", - "description" : "Exchange(distribution=[hash[b]])" - }, { - "id" : 5, - "type" : "stream-exec-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "a1", - "catalogName" : "`default_catalog`.`default_database`.`my_sum1`", - "argList" : [ 0, 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "a2", - "systemName" : "my_sum2", - "argList" : [ 2, 0 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "a3", - "catalogName" : "`default_catalog`.`default_database`.`my_avg`", - "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg", - "argList" : [ 3, 4 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : "c1", - "systemName" : "my_count", - "argList" : [ 5 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - } ], - "aggCallNeedRetractions" : [ false, false, false, false ], - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "groupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` INT NOT NULL, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", - "description" : "GroupAggregate(groupBy=[b], select=[b, my_sum1(b, $f1) AS a1, my_sum2($f2, b) AS a2, my_avg(d, a) AS a3, my_count(c) AS c1])" - }, { - "id" : 6, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$CAST$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT NOT NULL" - } ], - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "BIGINT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", - "description" : "Calc(select=[CAST(b AS BIGINT) AS b, a1, a2, a3, c1])" - }, { - "id" : 7, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "a1", - "dataType" : "BIGINT" - }, { - "name" : "a2", - "dataType" : "BIGINT" - }, { - "name" : "a3", - "dataType" : "BIGINT" - }, { - "name" : "c1", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "sink-insert-only" : "false", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, `c1` BIGINT>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[b, a1, a2, a3, c1])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}