This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e27a4cbc74beba7dff8a408dcff38d816ff70457 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Jan 9 09:52:50 2024 +0100 Revert "[FLINK-34000] Implement restore tests for IncrementalGroupAgg node" This reverts commit df71d07188e745553b8174297ec7989f05cebf7a. --- .../IncrementalGroupAggregateRestoreTest.java | 40 -- .../IncrementalGroupAggregateTestPrograms.java | 119 ----- .../plan/incremental-group-aggregate-complex.json | 573 --------------------- .../savepoint/_metadata | Bin 20817 -> 0 bytes .../plan/incremental-group-aggregate-simple.json | 373 -------------- .../savepoint/_metadata | Bin 14768 -> 0 bytes 6 files changed, 1105 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java deleted file mode 100644 index 250f50a38c7..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; -import org.apache.flink.table.test.program.TableTestProgram; - -import java.util.Arrays; -import java.util.List; - -/** Restore tests for {@link StreamExecIncrementalGroupAggregate}. */ -public class IncrementalGroupAggregateRestoreTest extends RestoreTestBase { - - public IncrementalGroupAggregateRestoreTest() { - super(StreamExecIncrementalGroupAggregate.class); - } - - @Override - public List<TableTestProgram> programs() { - return Arrays.asList( - IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_SIMPLE, - IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_COMPLEX); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java deleted file mode 100644 index a1ca086d258..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.config.ExecutionConfigOptions; -import org.apache.flink.table.api.config.OptimizerConfigOptions; -import org.apache.flink.table.test.program.SinkTestStep; -import org.apache.flink.table.test.program.SourceTestStep; -import org.apache.flink.table.test.program.TableTestProgram; -import org.apache.flink.types.Row; - -import java.time.Duration; - -/** {@link TableTestProgram} definitions for testing {@link StreamExecGroupAggregate}. */ -public class IncrementalGroupAggregateTestPrograms { - - static final Row[] BEFORE_DATA = { - Row.of(1, 1L, "hi"), Row.of(2, 2L, "hello"), Row.of(3, 2L, "hello world") - }; - - static final Row[] AFTER_DATA = { - Row.of(3, 2L, "foo"), Row.of(4, 4L, "bar"), Row.of(5, 2L, "foo bar") - }; - - static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c VARCHAR"}; - - static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_SIMPLE = - TableTestProgram.of( - "incremental-group-aggregate-simple", - "validates incremental group aggregation") - .setupTableSource( - SourceTestStep.newBuilder("source_t") - .addSchema(SOURCE_SCHEMA) - .producedBeforeRestore(BEFORE_DATA) - .producedAfterRestore(AFTER_DATA) - .build()) - .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) - .setupConfig( - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, - Duration.ofSeconds(1)) - .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) - .setupConfig( - OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) - .setupTableSink( - SinkTestStep.newBuilder("sink_t") - .addSchema("b BIGINT", "a BIGINT") - .consumedBeforeRestore("+I[1, 1]", "+I[2, 2]") - .consumedAfterRestore("-U[2, 2]", "+U[2, 3]", "+I[4, 1]") - .build()) - .runSql( - "INSERT INTO sink_t\n" - + " SELECT\n" - + " b,\n" - + " COUNT(DISTINCT a) AS a\n" - + " FROM source_t\n" - + " GROUP BY b") - .build(); - - static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_COMPLEX = - TableTestProgram.of( - "incremental-group-aggregate-complex", - "validates incremental group aggregation with multiple aggregations") - .setupTableSource( - SourceTestStep.newBuilder("source_t") - .addSchema(SOURCE_SCHEMA) - .producedBeforeRestore(BEFORE_DATA) - .producedAfterRestore(AFTER_DATA) - .build()) - .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) - .setupConfig( - ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, - Duration.ofSeconds(1)) - .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) - .setupConfig( - OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) - .setupTableSink( - SinkTestStep.newBuilder("sink_t") - .addSchema( - "b BIGINT", - "sum_b BIGINT", - "cnt_distinct_b BIGINT", - "cnt_1 BIGINT") - .consumedBeforeRestore("+I[1, 5, 2, 3]") - .consumedAfterRestore( - "+I[2, 2, 1, 1]", "-U[1, 5, 2, 3]", "+U[1, 9, 3, 4]") - .build()) - .runSql( - "INSERT INTO sink_t SELECT\n" - + " b,\n" - + " SUM(b1),\n" - + " COUNT(DISTINCT b1),\n" - + " COUNT(1)\n" - + " FROM\n" - + " (\n" - + " SELECT\n" - + " a,\n" - + " COUNT(b) AS b,\n" - + " MAX(b) AS b1\n" - + " FROM source_t GROUP BY a\n" - + " )\n" - + " GROUP BY b") - .build(); -} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json deleted file mode 100644 index e65797b211f..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json +++ /dev/null @@ -1,573 +0,0 @@ -{ - "flinkVersion" : "1.19", - "nodes" : [ { - "id" : 10, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`source_t`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ] - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 1 ] ], - "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` INT, `b` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[a, b], metadata=[]]], fields=[a, b])", - "inputProperties" : [ ] - }, { - "id" : 11, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 1000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `b` BIGINT>", - "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])" - }, { - "id" : 12, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "b", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "b1", - "internalName" : "$MAX$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - } ], - "aggCallNeedRetractions" : [ false, false ], - "needRetraction" : false, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>", - "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0, MAX(b) AS max$1])" - }, { - "id" : 13, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>", - "description" : "Exchange(distribution=[hash[a]])" - }, { - "id" : 14, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : "b", - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : "b1", - "internalName" : "$MAX$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - } ], - "aggCallNeedRetractions" : [ false, false ], - "localAggInputRowType" : "ROW<`a` INT, `b` BIGINT>", - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `b` BIGINT NOT NULL, `b1` BIGINT>", - "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b, MAX(max$1) AS b1])" - }, { - "id" : 15, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT NOT NULL" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "internalName" : "$MOD$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$HASH_CODE$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "type" : "INT" - }, { - "kind" : "LITERAL", - "value" : 1024, - "type" : "INT NOT NULL" - } ], - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, `$f2` INT>", - "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])" - }, { - "id" : 16, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0, 2 ], - "aggCalls" : [ { - "name" : null, - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ true, true, true ], - "needRetraction" : true, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "b", - "fieldType" : "BIGINT NOT NULL" - }, { - "name" : "$f2", - "fieldType" : "INT" - }, { - "name" : "sum$0", - "fieldType" : "BIGINT" - }, { - "name" : "count$1", - "fieldType" : "BIGINT" - }, { - "name" : "count$2", - "fieldType" : "BIGINT" - }, { - "name" : "count1$3", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>" - } ] - } - } - } ] - }, - "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0])" - }, { - "id" : 17, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0, 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "b", - "fieldType" : "BIGINT NOT NULL" - }, { - "name" : "$f2", - "fieldType" : "INT" - }, { - "name" : "sum$0", - "fieldType" : "BIGINT" - }, { - "name" : "count$1", - "fieldType" : "BIGINT" - }, { - "name" : "count$2", - "fieldType" : "BIGINT" - }, { - "name" : "count1$3", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>" - } ] - } - } - } ] - }, - "description" : "Exchange(distribution=[hash[b, $f2]])" - }, { - "id" : 18, - "type" : "stream-exec-incremental-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "partialAggGrouping" : [ 0, 1 ], - "finalAggGrouping" : [ 0 ], - "partialOriginalAggCalls" : [ { - "name" : null, - "internalName" : "$SUM$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "partialAggCallNeedRetractions" : [ true, true, true ], - "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, `$f2` INT>", - "partialAggNeedRetraction" : true, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "incrementalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", - "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) AS count1$3])" - }, { - "id" : 19, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", - "description" : "Exchange(distribution=[hash[b]])" - }, { - "id" : 20, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : null, - "internalName" : "$SUM$1", - "argList" : [ 2 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT" - }, { - "name" : null, - "internalName" : "$$SUM0$1", - "argList" : [ 3 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - }, { - "name" : null, - "internalName" : "$$SUM0$1", - "argList" : [ 4 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ true, true, true ], - "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT, `$f2_0` BIGINT, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>", - "generateUpdateBefore" : true, - "needRetraction" : true, - "indexOfCountStar" : 2, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", - "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2])" - }, { - "id" : 21, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`sink_t`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "sum_b", - "dataType" : "BIGINT" - }, { - "name" : "cnt_distinct_b", - "dataType" : "BIGINT" - }, { - "name" : "cnt_1", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ] - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[b, $f1, $f2, $f3])" - } ], - "edges" : [ { - "source" : 10, - "target" : 11, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 11, - "target" : 12, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 12, - "target" : 13, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 13, - "target" : 14, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 14, - "target" : 15, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 15, - "target" : 16, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 16, - "target" : 17, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 17, - "target" : 18, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 18, - "target" : 19, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 19, - "target" : 20, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 20, - "target" : 21, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata deleted file mode 100644 index e92f8f6214c..00000000000 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata and /dev/null differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json deleted file mode 100644 index e995a2b138a..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json +++ /dev/null @@ -1,373 +0,0 @@ -{ - "flinkVersion" : "1.19", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`source_t`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ] - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 1 ], [ 0 ] ], - "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL" - } ] - }, - "outputType" : "ROW<`b` BIGINT, `a` INT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[b, a], metadata=[]]], fields=[b, a])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-mini-batch-assigner_1", - "miniBatchInterval" : { - "interval" : 1000, - "mode" : "ProcTime" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `a` INT>", - "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "BIGINT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT" - }, { - "kind" : "CALL", - "internalName" : "$MOD$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$HASH_CODE$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "INT" - } ], - "type" : "INT" - }, { - "kind" : "LITERAL", - "value" : 1024, - "type" : "INT NOT NULL" - } ], - "type" : "INT" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>", - "description" : "Calc(select=[b, a, MOD(HASH_CODE(a), 1024) AS $f2])" - }, { - "id" : 4, - "type" : "stream-exec-local-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0, 2 ], - "aggCalls" : [ { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false ], - "needRetraction" : false, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "b", - "fieldType" : "BIGINT" - }, { - "name" : "$f2", - "fieldType" : "INT" - }, { - "name" : "count$0", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<INT, BIGINT NOT NULL>" - } ] - } - } - } ] - }, - "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, COUNT(distinct$0 a) AS count$0, DISTINCT(a) AS distinct$0])" - }, { - "id" : 5, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0, 1 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "b", - "fieldType" : "BIGINT" - }, { - "name" : "$f2", - "fieldType" : "INT" - }, { - "name" : "count$0", - "fieldType" : "BIGINT" - }, { - "name" : "distinct$0", - "fieldType" : { - "type" : "RAW", - "class" : "org.apache.flink.table.api.dataview.MapView", - "externalDataType" : { - "type" : "STRUCTURED_TYPE", - "implementationClass" : "org.apache.flink.table.api.dataview.MapView", - "attributes" : [ { - "name" : "map", - "attributeType" : "MAP<INT, BIGINT NOT NULL>" - } ] - } - } - } ] - }, - "description" : "Exchange(distribution=[hash[b, $f2]])" - }, { - "id" : 6, - "type" : "stream-exec-incremental-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "partialAggGrouping" : [ 0, 1 ], - "finalAggGrouping" : [ 0 ], - "partialOriginalAggCalls" : [ { - "name" : null, - "syntax" : "FUNCTION_STAR", - "internalName" : "$COUNT$1", - "argList" : [ 1 ], - "filterArg" : -1, - "distinct" : true, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "partialAggCallNeedRetractions" : [ false ], - "partialLocalAggInputRowType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>", - "partialAggNeedRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "incrementalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>", - "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, COUNT(distinct$0 count$0) AS count$0])" - }, { - "id" : 7, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>", - "description" : "Exchange(distribution=[hash[b]])" - }, { - "id" : 8, - "type" : "stream-exec-global-group-aggregate_1", - "configuration" : { - "table.exec.mini-batch.enabled" : "true", - "table.exec.mini-batch.size" : "5" - }, - "grouping" : [ 0 ], - "aggCalls" : [ { - "name" : null, - "internalName" : "$$SUM0$1", - "argList" : [ 2 ], - "filterArg" : -1, - "distinct" : false, - "approximate" : false, - "ignoreNulls" : false, - "type" : "BIGINT NOT NULL" - } ], - "aggCallNeedRetractions" : [ false ], - "localAggInputRowType" : "ROW<`b` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT NULL>", - "generateUpdateBefore" : true, - "needRetraction" : false, - "state" : [ { - "index" : 0, - "ttl" : "0 ms", - "name" : "globalGroupAggregateState" - } ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>", - "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, $SUM0(count$0) AS $f1])" - }, { - "id" : 9, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`sink_t`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "b", - "dataType" : "BIGINT" - }, { - "name" : "a", - "dataType" : "BIGINT" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ] - } - } - }, - "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], - "inputUpsertKey" : [ 0 ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>", - "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[b, $f1])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata deleted file mode 100644 index b0e73b1a395..00000000000 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata and /dev/null differ