This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e27a4cbc74beba7dff8a408dcff38d816ff70457
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Jan 9 09:52:50 2024 +0100

    Revert "[FLINK-34000] Implement restore tests for IncrementalGroupAgg node"
    
    This reverts commit df71d07188e745553b8174297ec7989f05cebf7a.
---
 .../IncrementalGroupAggregateRestoreTest.java      |  40 --
 .../IncrementalGroupAggregateTestPrograms.java     | 119 -----
 .../plan/incremental-group-aggregate-complex.json  | 573 ---------------------
 .../savepoint/_metadata                            | Bin 20817 -> 0 bytes
 .../plan/incremental-group-aggregate-simple.json   | 373 --------------
 .../savepoint/_metadata                            | Bin 14768 -> 0 bytes
 6 files changed, 1105 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java
deleted file mode 100644
index 250f50a38c7..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateRestoreTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
-import org.apache.flink.table.test.program.TableTestProgram;
-
-import java.util.Arrays;
-import java.util.List;
-
-/** Restore tests for {@link StreamExecIncrementalGroupAggregate}. */
-public class IncrementalGroupAggregateRestoreTest extends RestoreTestBase {
-
-    public IncrementalGroupAggregateRestoreTest() {
-        super(StreamExecIncrementalGroupAggregate.class);
-    }
-
-    @Override
-    public List<TableTestProgram> programs() {
-        return Arrays.asList(
-                
IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_SIMPLE,
-                
IncrementalGroupAggregateTestPrograms.INCREMENTAL_GROUP_AGGREGATE_COMPLEX);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java
deleted file mode 100644
index a1ca086d258..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalGroupAggregateTestPrograms.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.test.program.SinkTestStep;
-import org.apache.flink.table.test.program.SourceTestStep;
-import org.apache.flink.table.test.program.TableTestProgram;
-import org.apache.flink.types.Row;
-
-import java.time.Duration;
-
-/** {@link TableTestProgram} definitions for testing {@link 
StreamExecGroupAggregate}. */
-public class IncrementalGroupAggregateTestPrograms {
-
-    static final Row[] BEFORE_DATA = {
-        Row.of(1, 1L, "hi"), Row.of(2, 2L, "hello"), Row.of(3, 2L, "hello 
world")
-    };
-
-    static final Row[] AFTER_DATA = {
-        Row.of(3, 2L, "foo"), Row.of(4, 4L, "bar"), Row.of(5, 2L, "foo bar")
-    };
-
-    static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c VARCHAR"};
-
-    static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_SIMPLE =
-            TableTestProgram.of(
-                            "incremental-group-aggregate-simple",
-                            "validates incremental group aggregation")
-                    .setupTableSource(
-                            SourceTestStep.newBuilder("source_t")
-                                    .addSchema(SOURCE_SCHEMA)
-                                    .producedBeforeRestore(BEFORE_DATA)
-                                    .producedAfterRestore(AFTER_DATA)
-                                    .build())
-                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-                    .setupConfig(
-                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(1))
-                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
-                    .setupConfig(
-                            
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
-                    .setupTableSink(
-                            SinkTestStep.newBuilder("sink_t")
-                                    .addSchema("b BIGINT", "a BIGINT")
-                                    .consumedBeforeRestore("+I[1, 1]", "+I[2, 
2]")
-                                    .consumedAfterRestore("-U[2, 2]", "+U[2, 
3]", "+I[4, 1]")
-                                    .build())
-                    .runSql(
-                            "INSERT INTO sink_t\n"
-                                    + "     SELECT\n"
-                                    + "         b,\n"
-                                    + "         COUNT(DISTINCT a) AS a\n"
-                                    + "     FROM source_t\n"
-                                    + "     GROUP BY b")
-                    .build();
-
-    static final TableTestProgram INCREMENTAL_GROUP_AGGREGATE_COMPLEX =
-            TableTestProgram.of(
-                            "incremental-group-aggregate-complex",
-                            "validates incremental group aggregation with 
multiple aggregations")
-                    .setupTableSource(
-                            SourceTestStep.newBuilder("source_t")
-                                    .addSchema(SOURCE_SCHEMA)
-                                    .producedBeforeRestore(BEFORE_DATA)
-                                    .producedAfterRestore(AFTER_DATA)
-                                    .build())
-                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
-                    .setupConfig(
-                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
-                            Duration.ofSeconds(1))
-                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
-                    .setupConfig(
-                            
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
-                    .setupTableSink(
-                            SinkTestStep.newBuilder("sink_t")
-                                    .addSchema(
-                                            "b BIGINT",
-                                            "sum_b BIGINT",
-                                            "cnt_distinct_b BIGINT",
-                                            "cnt_1 BIGINT")
-                                    .consumedBeforeRestore("+I[1, 5, 2, 3]")
-                                    .consumedAfterRestore(
-                                            "+I[2, 2, 1, 1]", "-U[1, 5, 2, 
3]", "+U[1, 9, 3, 4]")
-                                    .build())
-                    .runSql(
-                            "INSERT INTO sink_t SELECT\n"
-                                    + "         b,\n"
-                                    + "         SUM(b1),\n"
-                                    + "         COUNT(DISTINCT b1),\n"
-                                    + "         COUNT(1)\n"
-                                    + "     FROM\n"
-                                    + "         (\n"
-                                    + "             SELECT\n"
-                                    + "                     a,\n"
-                                    + "                     COUNT(b) AS b,\n"
-                                    + "                     MAX(b) AS b1\n"
-                                    + "             FROM source_t GROUP BY a\n"
-                                    + "         )\n"
-                                    + "     GROUP BY b")
-                    .build();
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json
deleted file mode 100644
index e65797b211f..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/plan/incremental-group-aggregate-complex.json
+++ /dev/null
@@ -1,573 +0,0 @@
-{
-  "flinkVersion" : "1.19",
-  "nodes" : [ {
-    "id" : 10,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`source_t`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "INT"
-            }, {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ]
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` INT, `b` BIGINT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`a` INT, `b` BIGINT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[a, b], metadata=[]]], fields=[a, b])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 11,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 1000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` INT, `b` BIGINT>",
-    "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])"
-  }, {
-    "id" : 12,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "b",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "b1",
-      "internalName" : "$MAX$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    } ],
-    "aggCallNeedRetractions" : [ false, false ],
-    "needRetraction" : false,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>",
-    "description" : "LocalGroupAggregate(groupBy=[a], select=[a, COUNT(b) AS 
count$0, MAX(b) AS max$1])"
-  }, {
-    "id" : 13,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` INT, `count$0` BIGINT, `max$1` BIGINT>",
-    "description" : "Exchange(distribution=[hash[a]])"
-  }, {
-    "id" : 14,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : "b",
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : "b1",
-      "internalName" : "$MAX$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    } ],
-    "aggCallNeedRetractions" : [ false, false ],
-    "localAggInputRowType" : "ROW<`a` INT, `b` BIGINT>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` INT, `b` BIGINT NOT NULL, `b1` BIGINT>",
-    "description" : "GlobalGroupAggregate(groupBy=[a], select=[a, 
COUNT(count$0) AS b, MAX(max$1) AS b1])"
-  }, {
-    "id" : 15,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "CALL",
-      "internalName" : "$MOD$1",
-      "operands" : [ {
-        "kind" : "CALL",
-        "internalName" : "$HASH_CODE$1",
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 2,
-          "type" : "BIGINT"
-        } ],
-        "type" : "INT"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 1024,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, `$f2` INT>",
-    "description" : "Calc(select=[b, b1, MOD(HASH_CODE(b1), 1024) AS $f2])"
-  }, {
-    "id" : 16,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0, 2 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ true, true, true ],
-    "needRetraction" : true,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "b",
-        "fieldType" : "BIGINT NOT NULL"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT"
-      }, {
-        "name" : "sum$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$1",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$2",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count1$3",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "LocalGroupAggregate(groupBy=[b, $f2], 
partialFinalType=[PARTIAL], select=[b, $f2, SUM_RETRACT(b1) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 b1) AS count$2, COUNT_RETRACT(*) AS 
count1$3, DISTINCT(b1) AS distinct$0])"
-  }, {
-    "id" : 17,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0, 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "b",
-        "fieldType" : "BIGINT NOT NULL"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT"
-      }, {
-        "name" : "sum$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$1",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count$2",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "count1$3",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<BIGINT, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[b, $f2]])"
-  }, {
-    "id" : 18,
-    "type" : "stream-exec-incremental-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "partialAggGrouping" : [ 0, 1 ],
-    "finalAggGrouping" : [ 0 ],
-    "partialOriginalAggCalls" : [ {
-      "name" : null,
-      "internalName" : "$SUM$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "partialAggCallNeedRetractions" : [ true, true, true ],
-    "partialLocalAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `b1` BIGINT, 
`$f2` INT>",
-    "partialAggNeedRetraction" : true,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "incrementalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, 
`count$2` BIGINT, `count1$3` BIGINT>",
-    "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], 
finalAggGrouping=[b], select=[b, SUM_RETRACT((sum$0, count$1)) AS (sum$0, 
count$1), COUNT_RETRACT(distinct$0 count$2) AS count$2, COUNT_RETRACT(count1$3) 
AS count1$3])"
-  }, {
-    "id" : 19,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `sum$0` BIGINT, `count$1` BIGINT, 
`count$2` BIGINT, `count1$3` BIGINT>",
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "id" : 20,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "internalName" : "$SUM$1",
-      "argList" : [ 2 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT"
-    }, {
-      "name" : null,
-      "internalName" : "$$SUM0$1",
-      "argList" : [ 3 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    }, {
-      "name" : null,
-      "internalName" : "$$SUM0$1",
-      "argList" : [ 4 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ true, true, true ],
-    "localAggInputRowType" : "ROW<`b` BIGINT NOT NULL, `$f2` INT, `$f2_0` 
BIGINT, `$f3` BIGINT NOT NULL, `$f4` BIGINT NOT NULL>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : true,
-    "indexOfCountStar" : 2,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT 
NULL, `$f3` BIGINT NOT NULL>",
-    "description" : "GlobalGroupAggregate(groupBy=[b], 
partialFinalType=[FINAL], select=[b, SUM_RETRACT((sum$0, count$1)) AS $f1, 
$SUM0_RETRACT(count$2) AS $f2, $SUM0_RETRACT(count1$3) AS $f3], 
indexOfCountStar=[2])"
-  }, {
-    "id" : 21,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "sum_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_distinct_b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "cnt_1",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ]
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` BIGINT, `$f2` BIGINT NOT 
NULL, `$f3` BIGINT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[b, $f1, $f2, $f3])"
-  } ],
-  "edges" : [ {
-    "source" : 10,
-    "target" : 11,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 11,
-    "target" : 12,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 12,
-    "target" : 13,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 13,
-    "target" : 14,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 14,
-    "target" : 15,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 15,
-    "target" : 16,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 16,
-    "target" : 17,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 17,
-    "target" : 18,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 18,
-    "target" : 19,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 19,
-    "target" : 20,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 20,
-    "target" : 21,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata
deleted file mode 100644
index e92f8f6214c..00000000000
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-complex/savepoint/_metadata
 and /dev/null differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json
deleted file mode 100644
index e995a2b138a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/plan/incremental-group-aggregate-simple.json
+++ /dev/null
@@ -1,373 +0,0 @@
-{
-  "flinkVersion" : "1.19",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`source_t`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "INT"
-            }, {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ]
-        }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 1 ], [ 0 ] ],
-        "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
-      } ]
-    },
-    "outputType" : "ROW<`b` BIGINT, `a` INT>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[b, a], metadata=[]]], fields=[b, a])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-mini-batch-assigner_1",
-    "miniBatchInterval" : {
-      "interval" : 1000,
-      "mode" : "ProcTime"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `a` INT>",
-    "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 0,
-      "type" : "BIGINT"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 1,
-      "type" : "INT"
-    }, {
-      "kind" : "CALL",
-      "internalName" : "$MOD$1",
-      "operands" : [ {
-        "kind" : "CALL",
-        "internalName" : "$HASH_CODE$1",
-        "operands" : [ {
-          "kind" : "INPUT_REF",
-          "inputIndex" : 1,
-          "type" : "INT"
-        } ],
-        "type" : "INT"
-      }, {
-        "kind" : "LITERAL",
-        "value" : 1024,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>",
-    "description" : "Calc(select=[b, a, MOD(HASH_CODE(a), 1024) AS $f2])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-local-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0, 2 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false ],
-    "needRetraction" : false,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "b",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT"
-      }, {
-        "name" : "count$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<INT, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "LocalGroupAggregate(groupBy=[b, $f2], 
partialFinalType=[PARTIAL], select=[b, $f2, COUNT(distinct$0 a) AS count$0, 
DISTINCT(a) AS distinct$0])"
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0, 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "b",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "$f2",
-        "fieldType" : "INT"
-      }, {
-        "name" : "count$0",
-        "fieldType" : "BIGINT"
-      }, {
-        "name" : "distinct$0",
-        "fieldType" : {
-          "type" : "RAW",
-          "class" : "org.apache.flink.table.api.dataview.MapView",
-          "externalDataType" : {
-            "type" : "STRUCTURED_TYPE",
-            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
-            "attributes" : [ {
-              "name" : "map",
-              "attributeType" : "MAP<INT, BIGINT NOT NULL>"
-            } ]
-          }
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[b, $f2]])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-incremental-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "partialAggGrouping" : [ 0, 1 ],
-    "finalAggGrouping" : [ 0 ],
-    "partialOriginalAggCalls" : [ {
-      "name" : null,
-      "syntax" : "FUNCTION_STAR",
-      "internalName" : "$COUNT$1",
-      "argList" : [ 1 ],
-      "filterArg" : -1,
-      "distinct" : true,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "partialAggCallNeedRetractions" : [ false ],
-    "partialLocalAggInputRowType" : "ROW<`b` BIGINT, `a` INT, `$f2` INT>",
-    "partialAggNeedRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "incrementalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>",
-    "description" : "IncrementalGroupAggregate(partialAggGrouping=[b, $f2], 
finalAggGrouping=[b], select=[b, COUNT(distinct$0 count$0) AS count$0])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `count$0` BIGINT>",
-    "description" : "Exchange(distribution=[hash[b]])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-global-group-aggregate_1",
-    "configuration" : {
-      "table.exec.mini-batch.enabled" : "true",
-      "table.exec.mini-batch.size" : "5"
-    },
-    "grouping" : [ 0 ],
-    "aggCalls" : [ {
-      "name" : null,
-      "internalName" : "$$SUM0$1",
-      "argList" : [ 2 ],
-      "filterArg" : -1,
-      "distinct" : false,
-      "approximate" : false,
-      "ignoreNulls" : false,
-      "type" : "BIGINT NOT NULL"
-    } ],
-    "aggCallNeedRetractions" : [ false ],
-    "localAggInputRowType" : "ROW<`b` BIGINT, `$f2` INT, `$f2_0` BIGINT NOT 
NULL>",
-    "generateUpdateBefore" : true,
-    "needRetraction" : false,
-    "state" : [ {
-      "index" : 0,
-      "ttl" : "0 ms",
-      "name" : "globalGroupAggregateState"
-    } ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>",
-    "description" : "GlobalGroupAggregate(groupBy=[b], 
partialFinalType=[FINAL], select=[b, $SUM0(count$0) AS $f1])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "b",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ]
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
-    "inputUpsertKey" : [ 0 ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`b` BIGINT, `$f1` BIGINT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[b, $f1])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata
deleted file mode 100644
index b0e73b1a395..00000000000
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-incremental-group-aggregate_1/incremental-group-aggregate-simple/savepoint/_metadata
 and /dev/null differ


Reply via email to