This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 268144c30fa [FLINK-36003][table-planner] Add CompiledPlan annotations 
to BatchExecExpand and BatchExecSortAggregate
268144c30fa is described below

commit 268144c30fab7704e713ccfcda0c3e6be3857660
Author: James Hughes <[email protected]>
AuthorDate: Wed Aug 14 01:37:52 2024 -0400

    [FLINK-36003][table-planner] Add CompiledPlan annotations to 
BatchExecExpand and BatchExecSortAggregate
---
 .../flink/table/test/program/SinkTestStep.java     |  50 +-
 .../plan/nodes/exec/batch/BatchExecExpand.java     |  31 ++
 .../nodes/exec/batch/BatchExecSortAggregate.java   |  59 ++-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   4 +
 .../ExpandBatchRestoreTest.java}                   |  13 +-
 .../{stream => common}/ExpandTestPrograms.java     |  13 +-
 .../plan/nodes/exec/stream/ExpandRestoreTest.java  |   1 +
 .../nodes/exec/testutils/BatchRestoreTestBase.java |   9 +-
 .../batch-exec-expand_1/expand/plan/expand.json    | 534 +++++++++++++++++++++
 9 files changed, 692 insertions(+), 22 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
index f708adaeab3..819a632fa5f 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
@@ -37,6 +37,10 @@ public final class SinkTestStep extends TableTestStep {
     public final @Nullable List<Row> expectedAfterRestore;
     public final @Nullable List<String> expectedBeforeRestoreStrings;
     public final @Nullable List<String> expectedAfterRestoreStrings;
+    // These are added for situations where we need to specify the output in 
batch.
+    // In many cases, the "expectBeforeRestore*" variables are sufficient.
+    public final @Nullable List<Row> expectedMaterializedRows;
+    public final @Nullable List<String> expectedMaterializedStrings;
     public final boolean testChangelogData;
 
     SinkTestStep(
@@ -49,20 +53,28 @@ public final class SinkTestStep extends TableTestStep {
             @Nullable List<Row> expectedAfterRestore,
             @Nullable List<String> expectedBeforeRestoreStrings,
             @Nullable List<String> expectedAfterRestoreStrings,
+            @Nullable List<Row> expectedMaterializedRows,
+            @Nullable List<String> expectedMaterializedStrings,
             boolean testChangelogData) {
         super(name, schemaComponents, distribution, partitionKeys, options);
-        if (expectedBeforeRestore != null && expectedAfterRestoreStrings != 
null) {
+        boolean hasRowsSet =
+                expectedBeforeRestore != null
+                        || expectedAfterRestore != null
+                        || expectedMaterializedRows != null;
+        boolean hasStringsSet =
+                expectedBeforeRestoreStrings != null
+                        || expectedAfterRestoreStrings != null
+                        || expectedMaterializedStrings != null;
+        if (hasRowsSet && hasStringsSet) {
             throw new IllegalArgumentException(
-                    "You can not mix Row/String representation in before/after 
restore data.");
-        }
-        if (expectedBeforeRestoreStrings != null && expectedAfterRestore != 
null) {
-            throw new IllegalArgumentException(
-                    "You can not mix Row/String representation in before/after 
restore data.");
+                    "You can not mix Row/String representations in restore 
data.");
         }
         this.expectedBeforeRestore = expectedBeforeRestore;
         this.expectedAfterRestore = expectedAfterRestore;
         this.expectedBeforeRestoreStrings = expectedBeforeRestoreStrings;
         this.expectedAfterRestoreStrings = expectedAfterRestoreStrings;
+        this.expectedMaterializedRows = expectedMaterializedRows;
+        this.expectedMaterializedStrings = expectedMaterializedStrings;
         this.testChangelogData = testChangelogData;
     }
 
@@ -101,6 +113,18 @@ public final class SinkTestStep extends TableTestStep {
         return data;
     }
 
+    public List<String> getExpectedMaterializedResultsAsStrings() {
+        if (expectedMaterializedStrings != null) {
+            return expectedMaterializedStrings;
+        }
+        if (expectedMaterializedRows != null) {
+            return expectedMaterializedRows.stream()
+                    .map(Row::toString)
+                    .collect(Collectors.toList());
+        }
+        return getExpectedAsStrings();
+    }
+
     @Override
     public TestKind getKind() {
         return expectedBeforeRestore == null && expectedBeforeRestoreStrings 
== null
@@ -119,6 +143,8 @@ public final class SinkTestStep extends TableTestStep {
 
         private List<Row> expectedBeforeRestore;
         private List<Row> expectedAfterRestore;
+        private List<Row> expectedMaterializedBeforeRows;
+        private List<String> expectedMaterializedBeforeStrings;
 
         private List<String> expectedBeforeRestoreStrings;
         private List<String> expectedAfterRestoreStrings;
@@ -157,6 +183,16 @@ public final class SinkTestStep extends TableTestStep {
             return this;
         }
 
+        public Builder expectedMaterializedRows(Row... expectedRows) {
+            this.expectedMaterializedBeforeRows = Arrays.asList(expectedRows);
+            return this;
+        }
+
+        public Builder expectedMaterializedStrings(String... expectedRows) {
+            this.expectedMaterializedBeforeStrings = 
Arrays.asList(expectedRows);
+            return this;
+        }
+
         public Builder testChangelogData() {
             this.testChangelogData = true;
             return this;
@@ -178,6 +214,8 @@ public final class SinkTestStep extends TableTestStep {
                     expectedAfterRestore,
                     expectedBeforeRestoreStrings,
                     expectedAfterRestoreStrings,
+                    expectedMaterializedBeforeRows,
+                    expectedMaterializedBeforeStrings,
                     testChangelogData);
         }
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
index c91e21c1da0..cff12bdfab8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
@@ -18,20 +18,31 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExpand;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexNode;
 
 import java.util.Collections;
 import java.util.List;
 
 /** Batch {@link ExecNode} that can expand one row to multiple rows based on 
given projects. */
+@ExecNodeMetadata(
+        name = "batch-exec-expand",
+        version = 1,
+        producedTransformations = CommonExecExpand.EXPAND_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecExpand extends CommonExecExpand implements 
BatchExecNode<RowData> {
 
     public BatchExecExpand(
@@ -50,4 +61,24 @@ public class BatchExecExpand extends CommonExecExpand 
implements BatchExecNode<R
                 outputType,
                 description);
     }
+
+    @JsonCreator
+    public BatchExecExpand(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_PROJECTS) List<List<RexNode>> projects,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(
+                id,
+                context,
+                persistedConfig,
+                projects,
+                false, // retainHeader
+                inputProperties,
+                outputType,
+                description);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
index ca715d7db89..031f5a5b892 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -31,6 +32,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -42,20 +44,50 @@ import 
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rel.core.AggregateCall;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 /** Batch {@link ExecNode} for (global) sort-based aggregate operator. */
+@ExecNodeMetadata(
+        name = "batch-exec-sort-aggregate",
+        version = 1,
+        producedTransformations = 
BatchExecSortAggregate.SORT_AGGREGATE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecSortAggregate extends ExecNodeBase<RowData>
         implements InputSortedExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
+    public static final String SORT_AGGREGATE_TRANSFORMATION = 
"sort-aggregate";
+
+    public static final String FIELD_NAME_GROUPING = "grouping";
+    public static final String FIELD_NAME_AUX_GROUPING = "auxGrouping";
+    public static final String FIELD_NAME_AGG_CALLS = "aggCalls";
+    public static final String FIELD_NAME_AGG_INPUT_ROW_TYPE = 
"aggInputRowType";
+    public static final String FIELD_NAME_IS_MERGE = "isMerge";
+    public static final String FIELD_NAME_IS_FINAL = "isFinal";
+
+    @JsonProperty(FIELD_NAME_GROUPING)
     private final int[] grouping;
+
+    @JsonProperty(FIELD_NAME_AUX_GROUPING)
     private final int[] auxGrouping;
+
+    @JsonProperty(FIELD_NAME_AGG_CALLS)
     private final AggregateCall[] aggCalls;
+
+    @JsonProperty(FIELD_NAME_AGG_INPUT_ROW_TYPE)
     private final RowType aggInputRowType;
+
+    @JsonProperty(FIELD_NAME_IS_MERGE)
     private final boolean isMerge;
+
+    @JsonProperty(FIELD_NAME_IS_FINAL)
     private final boolean isFinal;
 
     public BatchExecSortAggregate(
@@ -84,6 +116,30 @@ public class BatchExecSortAggregate extends 
ExecNodeBase<RowData>
         this.isFinal = isFinal;
     }
 
+    @JsonCreator
+    public BatchExecSortAggregate(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
+            @JsonProperty(FIELD_NAME_AUX_GROUPING) int[] auxGrouping,
+            @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
+            @JsonProperty(FIELD_NAME_AGG_INPUT_ROW_TYPE) RowType 
aggInputRowType,
+            @JsonProperty(FIELD_NAME_IS_MERGE) boolean isMerge,
+            @JsonProperty(FIELD_NAME_IS_FINAL) boolean isFinal,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.grouping = grouping;
+        this.auxGrouping = auxGrouping;
+        this.aggCalls = aggCalls;
+        this.aggInputRowType = aggInputRowType;
+        this.isMerge = isMerge;
+        this.isFinal = isFinal;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected Transformation<RowData> translateToPlanInternal(
@@ -133,8 +189,7 @@ public class BatchExecSortAggregate extends 
ExecNodeBase<RowData>
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(SORT_AGGREGATE_TRANSFORMATION, 
config),
                 new CodeGenOperatorFactory<>(generatedOperator),
                 InternalTypeInfo.of(outputRowType),
                 inputTransform.getParallelism(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 6582e767bc3..1fc15ce9850 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -31,12 +31,14 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExpand;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashJoin;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJoin;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
@@ -173,6 +175,8 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecLimit.class);
                     add(BatchExecUnion.class);
                     add(BatchExecHashAggregate.class);
+                    add(BatchExecExpand.class);
+                    add(BatchExecSortAggregate.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ExpandBatchRestoreTest.java
similarity index 75%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ExpandBatchRestoreTest.java
index b26edde9c40..fec2b19cd5b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ExpandBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.ExpandTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Collections;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecExpand}. */
-public class ExpandRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecExpand}. */
+public class ExpandBatchRestoreTest extends BatchRestoreTestBase {
 
-    public ExpandRestoreTest() {
-        super(StreamExecExpand.class);
+    public ExpandBatchRestoreTest() {
+        super(BatchExecExpand.class);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ExpandTestPrograms.java
similarity index 88%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ExpandTestPrograms.java
index 752449a913c..cbfeebe0b8f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ExpandTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.table.api.config.AggregatePhaseStrategy;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -26,10 +26,13 @@ import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
-/** {@link TableTestProgram} definitions for testing {@link StreamExecExpand}. 
*/
+/**
+ * {@link TableTestProgram} definitions for testing {@link BatchExecExpand} 
and {@link
+ * StreamExecExpand}.
+ */
 public class ExpandTestPrograms {
 
-    static final TableTestProgram EXPAND =
+    public static final TableTestProgram EXPAND =
             TableTestProgram.of("expand", "validates expand node")
                     .setupConfig(
                             
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
@@ -63,6 +66,10 @@ public class ExpandTestPrograms {
                                     .consumedAfterRestore(
                                             Row.of(5, 1L, null),
                                             Row.ofKind(RowKind.UPDATE_AFTER, 
5, 1L, "Hello there"))
+                                    .expectedMaterializedRows(
+                                            Row.of(1, 1L, "Hi"),
+                                            Row.of(2, 2L, "Hello"),
+                                            Row.of(5, 1L, "Hello there"))
                                     .build())
                     .runSql(
                             "insert into MySink select a, "
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
index b26edde9c40..1f6f80259de 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.ExpandTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
index df5930035ac..3dbcad99db5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
@@ -228,10 +228,9 @@ public abstract class BatchRestoreTestBase implements 
TableTestProgramRunner {
 
         compiledPlan.execute().await();
         for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
-            List<String> expectedResults = getExpectedResults(sinkTestStep, 
sinkTestStep.name);
-            assertThat(expectedResults)
-                    .containsExactlyInAnyOrder(
-                            sinkTestStep.getExpectedAsStrings().toArray(new 
String[0]));
+            List<String> actualResults = getActualResults(sinkTestStep, 
sinkTestStep.name);
+            List<String> expectResults = 
sinkTestStep.getExpectedMaterializedResultsAsStrings();
+            
assertThat(actualResults).containsExactlyInAnyOrderElementsOf(expectResults);
         }
     }
 
@@ -246,7 +245,7 @@ public abstract class BatchRestoreTestBase implements 
TableTestProgramRunner {
                 System.getProperty("user.dir"), metadata.name(), 
metadata.version(), program.id);
     }
 
-    private static List<String> getExpectedResults(SinkTestStep sinkTestStep, 
String tableName) {
+    private static List<String> getActualResults(SinkTestStep sinkTestStep, 
String tableName) {
         if (sinkTestStep.shouldTestChangelogData()) {
             return TestValuesTableFactory.getRawResultsAsStrings(tableName);
         } else {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-expand_1/expand/plan/expand.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-expand_1/expand/plan/expand.json
new file mode 100644
index 00000000000..5a3e5a23e25
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-expand_1/expand/plan/expand.json
@@ -0,0 +1,534 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "6ada245c-a285-4b9b-bf21-77e9e44b0cd2",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-expand_1",
+    "projects" : [ [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 0,
+      "type" : "BIGINT NOT NULL"
+    } ], [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "LITERAL",
+      "value" : null,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 1,
+      "type" : "BIGINT NOT NULL"
+    } ] ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e` 
BIGINT NOT NULL>",
+    "description" : "Expand(projects=[{a, b, c, 0 AS $e}, {a, null AS b, c, 1 
AS $e}])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1, 3 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e` 
BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[hash[a, b, $e]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      }, {
+        "index" : 1,
+        "isAscending" : true,
+        "nullIsLast" : false
+      }, {
+        "index" : 3,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e` 
BIGINT NOT NULL>",
+    "description" : "Sort(orderBy=[a ASC, b ASC, $e ASC])"
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "KEEP_INPUT_AS_IS",
+        "inputDistribution" : {
+          "type" : "HASH",
+          "keys" : [ 0, 1, 3 ]
+        },
+        "isStrict" : true
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e` 
BIGINT NOT NULL>",
+    "description" : "Exchange(distribution=[forward])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-sort-aggregate_1",
+    "grouping" : [ 0, 1, 3 ],
+    "auxGrouping" : [ ],
+    "aggCalls" : [ {
+      "name" : "c",
+      "internalName" : "$FIRST_VALUE$1",
+      "argList" : [ 2 ],
+      "filterArg" : -1,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "aggInputRowType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), 
`$e` BIGINT NOT NULL>",
+    "isMerge" : false,
+    "isFinal" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1, 3 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `$e` BIGINT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "SortAggregate(isMerge=[false], groupBy=[a, b, $e], 
select=[a, b, $e, FIRST_VALUE(c) AS c])"
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$=$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CASE$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$=$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 2,
+            "type" : "BIGINT NOT NULL"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "BIGINT NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 0,
+          "type" : "BIGINT NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 1,
+          "type" : "BIGINT NOT NULL"
+        } ],
+        "type" : "BIGINT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 0,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$=$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$CASE$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$=$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 2,
+            "type" : "BIGINT NOT NULL"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "BIGINT NOT NULL"
+          } ],
+          "type" : "BOOLEAN NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 0,
+          "type" : "BIGINT NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 1,
+          "type" : "BIGINT NOT NULL"
+        } ],
+        "type" : "BIGINT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0` 
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+    "description" : "Calc(select=[a, b, c, (CASE(($e = 0), 0, 1) = 0) AS $g_0, 
(CASE(($e = 0), 0, 1) = 1) AS $g_1])"
+  }, {
+    "id" : 7,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0` 
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+    "description" : "Exchange(distribution=[hash[a]])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-sort_1",
+    "configuration" : {
+      "table.exec.resource.sort.memory" : "128 mb",
+      "table.exec.sort.async-merge-enabled" : "true",
+      "table.exec.sort.max-num-file-handles" : "128",
+      "table.exec.spill-compression.block-size" : "64 kb",
+      "table.exec.spill-compression.enabled" : "true"
+    },
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0` 
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+    "description" : "Sort(orderBy=[a ASC])"
+  }, {
+    "id" : 13,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "KEEP_INPUT_AS_IS",
+        "inputDistribution" : {
+          "type" : "HASH",
+          "keys" : [ 0 ]
+        },
+        "isStrict" : true
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0` 
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+    "description" : "Exchange(distribution=[forward])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-sort-aggregate_1",
+    "grouping" : [ 0 ],
+    "auxGrouping" : [ ],
+    "aggCalls" : [ {
+      "name" : "b",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : 3,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "name" : "c",
+      "internalName" : "$MIN$1",
+      "argList" : [ 2 ],
+      "filterArg" : 4,
+      "distinct" : false,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "aggInputRowType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), 
`$g_0` BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+    "isMerge" : false,
+    "isFinal" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` BIGINT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "SortAggregate(isMerge=[false], groupBy=[a], select=[a, 
COUNT(b) FILTER $g_0 AS b, MIN(c) FILTER $g_1 AS c])"
+  }, {
+    "id" : 10,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT NOT NULL"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[CAST(a AS BIGINT) AS b, CAST(b AS BIGINT) AS 
a, c])"
+  }, {
+    "id" : 11,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT NOT NULL"
+            }, {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_b",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "b" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, a, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file


Reply via email to