This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit df71d07188e745553b8174297ec7989f05cebf7a Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Thu Jan 4 20:05:38 2024 -0800 [FLINK-34000] Implement restore tests for IncrementalGroupAgg node --- .../IncrementalGroupAggregateRestoreTest.java | 40 ++ .../IncrementalGroupAggregateTestPrograms.java | 119 +++++ .../plan/incremental-group-aggregate-complex.json | 573 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 20817 bytes .../plan/incremental-group-aggregate-simple.json | 373 ++++++++++++++ .../savepoint/_metadata | Bin 0 -> 14768 bytes 6 files changed, 1105 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java new file mode 100644 index 00000000000..250f50a38c7 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecIncrementalGroupAggregate}. */ +public class IncrementalGroupAggregateRestoreTest extends RestoreTestBase { + + public IncrementalGroupAggregateRestoreTest() { + super(StreamExecIncrementalGroupAggregate.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_SIMPLE, + IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_COMPLEX); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java new file mode 100644 index 00000000000..a1ca086d258 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.time.Duration; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecGroupAggregate}. */ +public class IncrementalGroupAggregateTestPrograms { + + static final Row[] BEFORE_DATA = { + Row.of(1, 1L, "hi"), Row.of(2, 2L, "hello"), Row.of(3, 2L, "hello world") + }; + + static final Row[] AFTER_DATA = { + Row.of(3, 2L, "foo"), Row.of(4, 4L, "bar"), Row.of(5, 2L, "foo bar") + }; + + static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c VARCHAR"}; + + static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_SIMPLE = + TableTestProgram.of( + "incremental-group-aggregate-simple", + "validates incremental group aggregation") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(1)) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) + .setupConfig( + OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("b BIGINT", "a BIGINT") + .consumedBeforeRestore("+I[1, 1]", "+I[2, 2]") + .consumedAfterRestore("-U[2, 2]", "+U[2, 3]", "+I[4, 1]") + .build()) + .runSql( + "INSERT INTO sink_t\n" + + " SELECT\n" + + " b,\n" + + " COUNT(DISTINCT a) AS a\n" + + " FROM source_t\n" + + " GROUP BY b") + .build(); + + static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_COMPLEX = + TableTestProgram.of( + "incremental-group-aggregate-complex", + "validates incremental group aggregation with multiple aggregations") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(1)) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L) + .setupConfig( + OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "b BIGINT", + "sum_b BIGINT", + "cnt_distinct_b BIGINT", + "cnt_1 BIGINT") + .consumedBeforeRestore("+I[1, 5, 2, 3]") + .consumedAfterRestore( + "+I[2, 2, 1, 1]", "-U[1, 5, 2, 3]", "+U[1, 9, 3, 4]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT\n" + + " b,\n" + + " SUM(b1),\n" + + " COUNT(DISTINCT b1),\n" + + " COUNT(1)\n" + + " FROM\n" + + " (\n" + + " SELECT\n" + + " a,\n" + + " COUNT(b) AS b,\n" + + " MAX(b) AS b1\n" + + " FROM source_t GROUP BY a\n" + + " )\n" + + " GROUP BY b") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json new file mode 100644 index 00000000000..e65797b211f --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json @@ -0,0 +1,573 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 10, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` INT, `b` BIGINT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[a, b], metadata=[]]], fields=[a, b])", + "inputProperties" : [ ] + }, { + "id" : 11, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 1000, + "mode" : "ProcTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT>", + "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])" + }, { + "id" : 12, + "type" : "stream-exec-local-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "b", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "b1", + "internalName" : "$MAX$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT" + } ], + "aggCallNeedRetractions" : [ false, false ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>", + "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS count$0, MAX(b) AS max$1])" + }, { + "id" : 13, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>", + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 14, + "type" : "stream-exec-global-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : "b", + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : "b1", + "internalName" : "$MAX$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT" + } ], + "aggCallNeedRetractions" : [ false, false ], + "localAggInputRowType" : "ROW<`a` INT, `b` BIGINT>", + "generateUpdateBefore" : true, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "globalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT NOT NULL, `b1` BIGINT>", + "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b, MAX(max$1) AS b1])" + }, { + "id" : 15, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "internalName" : "$MOD$1", + "operands" : [ { + "kind" : "CALL", + "internalName" : "$HASH_CODE$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "BIGINT" + } ], + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 1024, + "type" : "INT NOT NULL" + } ], + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, `$f2` INT>", + "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])" + }, { + "id" : 16, + "type" : "stream-exec-local-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0, 2 ], + "aggCalls" : [ { + "name" : null, + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ true, true, true ], + "needRetraction" : true, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT NOT NULL" + }, { + "name" : "$f2", + "fieldType" : "INT" + }, { + "name" : "sum$0", + "fieldType" : "BIGINT" + }, { + "name" : "count$1", + "fieldType" : "BIGINT" + }, { + "name" : "count$2", + "fieldType" : "BIGINT" + }, { + "name" : "count1$3", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS count1$3, DISTINCT(b1) AS distinct$0])" + }, { + "id" : 17, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT NOT NULL" + }, { + "name" : "$f2", + "fieldType" : "INT" + }, { + "name" : "sum$0", + "fieldType" : "BIGINT" + }, { + "name" : "count$1", + "fieldType" : "BIGINT" + }, { + "name" : "count$2", + "fieldType" : "BIGINT" + }, { + "name" : "count1$3", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "Exchange(distribution=[hash[b, $f2]])" + }, { + "id" : 18, + "type" : "stream-exec-incremental-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "partialAggGrouping" : [ 0, 1 ], + "finalAggGrouping" : [ 0 ], + "partialOriginalAggCalls" : [ { + "name" : null, + "internalName" : "$SUM$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "partialAggCallNeedRetractions" : [ true, true, true ], + "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, `$f2` INT>", + "partialAggNeedRetraction" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "incrementalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", + "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) AS count1$3])" + }, { + "id" : 19, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, `count$2` BIGINT, `count1$3` BIGINT>", + "description" : "Exchange(distribution=[hash[b]])" + }, { + "id" : 20, + "type" : "stream-exec-global-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : null, + "internalName" : "$SUM$1", + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT" + }, { + "name" : null, + "internalName" : "$$SUM0$1", + "argList" : [ 3 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + }, { + "name" : null, + "internalName" : "$$SUM0$1", + "argList" : [ 4 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ true, true, true ], + "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT, `$f2_0` BIGINT, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>", + "generateUpdateBefore" : true, + "needRetraction" : true, + "indexOfCountStar" : 2, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "globalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", + "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, $SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], indexOfCountStar=[2])" + }, { + "id" : 21, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "sum_b", + "dataType" : "BIGINT" + }, { + "name" : "cnt_distinct_b", + "dataType" : "BIGINT" + }, { + "name" : "cnt_1", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT NULL, `$f3` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[b, $f1, $f2, $f3])" + } ], + "edges" : [ { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 16, + "target" : 17, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 17, + "target" : 18, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 18, + "target" : 19, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 19, + "target" : 20, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 20, + "target" : 21, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata new file mode 100644 index 00000000000..e92f8f6214c Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json new file mode 100644 index 00000000000..e995a2b138a --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json @@ -0,0 +1,373 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 1 ], [ 0 ] ], + "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL" + } ] + }, + "outputType" : "ROW<`b` BIGINT, `a` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t, project=[b, a], metadata=[]]], fields=[b, a])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 1000, + "mode" : "ProcTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `a` INT>", + "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])" + }, { + "id" : 3, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + }, { + "kind" : "CALL", + "internalName" : "$MOD$1", + "operands" : [ { + "kind" : "CALL", + "internalName" : "$HASH_CODE$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "INT" + } ], + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 1024, + "type" : "INT NOT NULL" + } ], + "type" : "INT" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>", + "description" : "Calc(select=[b, a, MOD(HASH_CODE(a), 1024) AS $f2])" + }, { + "id" : 4, + "type" : "stream-exec-local-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0, 2 ], + "aggCalls" : [ { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false ], + "needRetraction" : false, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "$f2", + "fieldType" : "INT" + }, { + "name" : "count$0", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<INT, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "LocalGroupAggregate(groupBy=[b, $f2], partialFinalType=[PARTIAL], select=[b, $f2, COUNT(distinct$0 a) AS count$0, DISTINCT(a) AS distinct$0])" + }, { + "id" : 5, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "b", + "fieldType" : "BIGINT" + }, { + "name" : "$f2", + "fieldType" : "INT" + }, { + "name" : "count$0", + "fieldType" : "BIGINT" + }, { + "name" : "distinct$0", + "fieldType" : { + "type" : "RAW", + "class" : "org.apache.flink.table.api.dataview.MapView", + "externalDataType" : { + "type" : "STRUCTURED_TYPE", + "implementationClass" : "org.apache.flink.table.api.dataview.MapView", + "attributes" : [ { + "name" : "map", + "attributeType" : "MAP<INT, BIGINT NOT NULL>" + } ] + } + } + } ] + }, + "description" : "Exchange(distribution=[hash[b, $f2]])" + }, { + "id" : 6, + "type" : "stream-exec-incremental-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "partialAggGrouping" : [ 0, 1 ], + "finalAggGrouping" : [ 0 ], + "partialOriginalAggCalls" : [ { + "name" : null, + "syntax" : "FUNCTION_STAR", + "internalName" : "$COUNT$1", + "argList" : [ 1 ], + "filterArg" : -1, + "distinct" : true, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "partialAggCallNeedRetractions" : [ false ], + "partialLocalAggInputRowType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>", + "partialAggNeedRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "incrementalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>", + "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], finalAggGrouping=[b], select=[b, COUNT(distinct$0 count$0) AS count$0])" + }, { + "id" : 7, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>", + "description" : "Exchange(distribution=[hash[b]])" + }, { + "id" : 8, + "type" : "stream-exec-global-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "5" + }, + "grouping" : [ 0 ], + "aggCalls" : [ { + "name" : null, + "internalName" : "$$SUM0$1", + "argList" : [ 2 ], + "filterArg" : -1, + "distinct" : false, + "approximate" : false, + "ignoreNulls" : false, + "type" : "BIGINT NOT NULL" + } ], + "aggCallNeedRetractions" : [ false ], + "localAggInputRowType" : "ROW<`b` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT NULL>", + "generateUpdateBefore" : true, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "globalGroupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>", + "description" : "GlobalGroupAggregate(groupBy=[b], partialFinalType=[FINAL], select=[b, $SUM0(count$0) AS $f1])" + }, { + "id" : 9, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "a", + "dataType" : "BIGINT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[b, $f1])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata new file mode 100644 index 00000000000..b0e73b1a395 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata differ