This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 268144c30fa [FLINK-36003][table-planner] Add CompiledPlan annotations
to BatchExecExpand and BatchExecSortAggregate
268144c30fa is described below
commit 268144c30fab7704e713ccfcda0c3e6be3857660
Author: James Hughes <[email protected]>
AuthorDate: Wed Aug 14 01:37:52 2024 -0400
[FLINK-36003][table-planner] Add CompiledPlan annotations to
BatchExecExpand and BatchExecSortAggregate
---
.../flink/table/test/program/SinkTestStep.java | 50 +-
.../plan/nodes/exec/batch/BatchExecExpand.java | 31 ++
.../nodes/exec/batch/BatchExecSortAggregate.java | 59 ++-
.../planner/plan/utils/ExecNodeMetadataUtil.java | 4 +
.../ExpandBatchRestoreTest.java} | 13 +-
.../{stream => common}/ExpandTestPrograms.java | 13 +-
.../plan/nodes/exec/stream/ExpandRestoreTest.java | 1 +
.../nodes/exec/testutils/BatchRestoreTestBase.java | 9 +-
.../batch-exec-expand_1/expand/plan/expand.json | 534 +++++++++++++++++++++
9 files changed, 692 insertions(+), 22 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
index f708adaeab3..819a632fa5f 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
@@ -37,6 +37,10 @@ public final class SinkTestStep extends TableTestStep {
public final @Nullable List<Row> expectedAfterRestore;
public final @Nullable List<String> expectedBeforeRestoreStrings;
public final @Nullable List<String> expectedAfterRestoreStrings;
+ // These are added for situations where we need to specify the output in
batch.
+ // In many cases, the "expectBeforeRestore*" variables are sufficient.
+ public final @Nullable List<Row> expectedMaterializedRows;
+ public final @Nullable List<String> expectedMaterializedStrings;
public final boolean testChangelogData;
SinkTestStep(
@@ -49,20 +53,28 @@ public final class SinkTestStep extends TableTestStep {
@Nullable List<Row> expectedAfterRestore,
@Nullable List<String> expectedBeforeRestoreStrings,
@Nullable List<String> expectedAfterRestoreStrings,
+ @Nullable List<Row> expectedMaterializedRows,
+ @Nullable List<String> expectedMaterializedStrings,
boolean testChangelogData) {
super(name, schemaComponents, distribution, partitionKeys, options);
- if (expectedBeforeRestore != null && expectedAfterRestoreStrings !=
null) {
+ boolean hasRowsSet =
+ expectedBeforeRestore != null
+ || expectedAfterRestore != null
+ || expectedMaterializedRows != null;
+ boolean hasStringsSet =
+ expectedBeforeRestoreStrings != null
+ || expectedAfterRestoreStrings != null
+ || expectedMaterializedStrings != null;
+ if (hasRowsSet && hasStringsSet) {
throw new IllegalArgumentException(
- "You can not mix Row/String representation in before/after
restore data.");
- }
- if (expectedBeforeRestoreStrings != null && expectedAfterRestore !=
null) {
- throw new IllegalArgumentException(
- "You can not mix Row/String representation in before/after
restore data.");
+ "You can not mix Row/String representations in restore
data.");
}
this.expectedBeforeRestore = expectedBeforeRestore;
this.expectedAfterRestore = expectedAfterRestore;
this.expectedBeforeRestoreStrings = expectedBeforeRestoreStrings;
this.expectedAfterRestoreStrings = expectedAfterRestoreStrings;
+ this.expectedMaterializedRows = expectedMaterializedRows;
+ this.expectedMaterializedStrings = expectedMaterializedStrings;
this.testChangelogData = testChangelogData;
}
@@ -101,6 +113,18 @@ public final class SinkTestStep extends TableTestStep {
return data;
}
+ public List<String> getExpectedMaterializedResultsAsStrings() {
+ if (expectedMaterializedStrings != null) {
+ return expectedMaterializedStrings;
+ }
+ if (expectedMaterializedRows != null) {
+ return expectedMaterializedRows.stream()
+ .map(Row::toString)
+ .collect(Collectors.toList());
+ }
+ return getExpectedAsStrings();
+ }
+
@Override
public TestKind getKind() {
return expectedBeforeRestore == null && expectedBeforeRestoreStrings
== null
@@ -119,6 +143,8 @@ public final class SinkTestStep extends TableTestStep {
private List<Row> expectedBeforeRestore;
private List<Row> expectedAfterRestore;
+ private List<Row> expectedMaterializedBeforeRows;
+ private List<String> expectedMaterializedBeforeStrings;
private List<String> expectedBeforeRestoreStrings;
private List<String> expectedAfterRestoreStrings;
@@ -157,6 +183,16 @@ public final class SinkTestStep extends TableTestStep {
return this;
}
+ public Builder expectedMaterializedRows(Row... expectedRows) {
+ this.expectedMaterializedBeforeRows = Arrays.asList(expectedRows);
+ return this;
+ }
+
+ public Builder expectedMaterializedStrings(String... expectedRows) {
+ this.expectedMaterializedBeforeStrings =
Arrays.asList(expectedRows);
+ return this;
+ }
+
public Builder testChangelogData() {
this.testChangelogData = true;
return this;
@@ -178,6 +214,8 @@ public final class SinkTestStep extends TableTestStep {
expectedAfterRestore,
expectedBeforeRestoreStrings,
expectedAfterRestoreStrings,
+ expectedMaterializedBeforeRows,
+ expectedMaterializedBeforeStrings,
testChangelogData);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
index c91e21c1da0..cff12bdfab8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExpand.java
@@ -18,20 +18,31 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExpand;
import org.apache.flink.table.types.logical.RowType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexNode;
import java.util.Collections;
import java.util.List;
/** Batch {@link ExecNode} that can expand one row to multiple rows based on
given projects. */
+@ExecNodeMetadata(
+ name = "batch-exec-expand",
+ version = 1,
+ producedTransformations = CommonExecExpand.EXPAND_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecExpand extends CommonExecExpand implements
BatchExecNode<RowData> {
public BatchExecExpand(
@@ -50,4 +61,24 @@ public class BatchExecExpand extends CommonExecExpand
implements BatchExecNode<R
outputType,
description);
}
+
+ @JsonCreator
+ public BatchExecExpand(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_PROJECTS) List<List<RexNode>> projects,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ persistedConfig,
+ projects,
+ false, // retainHeader
+ inputProperties,
+ outputType,
+ description);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
index ca715d7db89..031f5a5b892 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortAggregate.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -31,6 +32,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -42,20 +44,50 @@ import
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rel.core.AggregateCall;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
/** Batch {@link ExecNode} for (global) sort-based aggregate operator. */
+@ExecNodeMetadata(
+ name = "batch-exec-sort-aggregate",
+ version = 1,
+ producedTransformations =
BatchExecSortAggregate.SORT_AGGREGATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecSortAggregate extends ExecNodeBase<RowData>
implements InputSortedExecNode<RowData>,
SingleTransformationTranslator<RowData> {
+ public static final String SORT_AGGREGATE_TRANSFORMATION =
"sort-aggregate";
+
+ public static final String FIELD_NAME_GROUPING = "grouping";
+ public static final String FIELD_NAME_AUX_GROUPING = "auxGrouping";
+ public static final String FIELD_NAME_AGG_CALLS = "aggCalls";
+ public static final String FIELD_NAME_AGG_INPUT_ROW_TYPE =
"aggInputRowType";
+ public static final String FIELD_NAME_IS_MERGE = "isMerge";
+ public static final String FIELD_NAME_IS_FINAL = "isFinal";
+
+ @JsonProperty(FIELD_NAME_GROUPING)
private final int[] grouping;
+
+ @JsonProperty(FIELD_NAME_AUX_GROUPING)
private final int[] auxGrouping;
+
+ @JsonProperty(FIELD_NAME_AGG_CALLS)
private final AggregateCall[] aggCalls;
+
+ @JsonProperty(FIELD_NAME_AGG_INPUT_ROW_TYPE)
private final RowType aggInputRowType;
+
+ @JsonProperty(FIELD_NAME_IS_MERGE)
private final boolean isMerge;
+
+ @JsonProperty(FIELD_NAME_IS_FINAL)
private final boolean isFinal;
public BatchExecSortAggregate(
@@ -84,6 +116,30 @@ public class BatchExecSortAggregate extends
ExecNodeBase<RowData>
this.isFinal = isFinal;
}
+ @JsonCreator
+ public BatchExecSortAggregate(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
+ @JsonProperty(FIELD_NAME_AUX_GROUPING) int[] auxGrouping,
+ @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
+ @JsonProperty(FIELD_NAME_AGG_INPUT_ROW_TYPE) RowType
aggInputRowType,
+ @JsonProperty(FIELD_NAME_IS_MERGE) boolean isMerge,
+ @JsonProperty(FIELD_NAME_IS_FINAL) boolean isFinal,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+
+ super(id, context, persistedConfig, inputProperties, outputType,
description);
+ this.grouping = grouping;
+ this.auxGrouping = auxGrouping;
+ this.aggCalls = aggCalls;
+ this.aggInputRowType = aggInputRowType;
+ this.isMerge = isMerge;
+ this.isFinal = isFinal;
+ }
+
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(
@@ -133,8 +189,7 @@ public class BatchExecSortAggregate extends
ExecNodeBase<RowData>
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(SORT_AGGREGATE_TRANSFORMATION,
config),
new CodeGenOperatorFactory<>(generatedOperator),
InternalTypeInfo.of(outputRowType),
inputTransform.getParallelism(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 6582e767bc3..1fc15ce9850 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -31,12 +31,14 @@ import
org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExpand;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashJoin;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJoin;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
@@ -173,6 +175,8 @@ public final class ExecNodeMetadataUtil {
add(BatchExecLimit.class);
add(BatchExecUnion.class);
add(BatchExecHashAggregate.class);
+ add(BatchExecExpand.class);
+ add(BatchExecSortAggregate.class);
}
};
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ExpandBatchRestoreTest.java
similarity index 75%
copy from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
copy to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ExpandBatchRestoreTest.java
index b26edde9c40..fec2b19cd5b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/ExpandBatchRestoreTest.java
@@ -16,19 +16,20 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
-import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.ExpandTestPrograms;
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
import java.util.Collections;
import java.util.List;
-/** Restore tests for {@link StreamExecExpand}. */
-public class ExpandRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecExpand}. */
+public class ExpandBatchRestoreTest extends BatchRestoreTestBase {
- public ExpandRestoreTest() {
- super(StreamExecExpand.class);
+ public ExpandBatchRestoreTest() {
+ super(BatchExecExpand.class);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ExpandTestPrograms.java
similarity index 88%
rename from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
rename to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ExpandTestPrograms.java
index 752449a913c..cbfeebe0b8f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/ExpandTestPrograms.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -26,10 +26,13 @@ import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
-/** {@link TableTestProgram} definitions for testing {@link StreamExecExpand}.
*/
+/**
+ * {@link TableTestProgram} definitions for testing {@link BatchExecExpand}
and {@link
+ * StreamExecExpand}.
+ */
public class ExpandTestPrograms {
- static final TableTestProgram EXPAND =
+ public static final TableTestProgram EXPAND =
TableTestProgram.of("expand", "validates expand node")
.setupConfig(
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
@@ -63,6 +66,10 @@ public class ExpandTestPrograms {
.consumedAfterRestore(
Row.of(5, 1L, null),
Row.ofKind(RowKind.UPDATE_AFTER,
5, 1L, "Hello there"))
+ .expectedMaterializedRows(
+ Row.of(1, 1L, "Hi"),
+ Row.of(2, 2L, "Hello"),
+ Row.of(5, 1L, "Hello there"))
.build())
.runSql(
"insert into MySink select a, "
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
index b26edde9c40..1f6f80259de 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandRestoreTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.ExpandTestPrograms;
import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
index df5930035ac..3dbcad99db5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
@@ -228,10 +228,9 @@ public abstract class BatchRestoreTestBase implements
TableTestProgramRunner {
compiledPlan.execute().await();
for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
- List<String> expectedResults = getExpectedResults(sinkTestStep,
sinkTestStep.name);
- assertThat(expectedResults)
- .containsExactlyInAnyOrder(
- sinkTestStep.getExpectedAsStrings().toArray(new
String[0]));
+ List<String> actualResults = getActualResults(sinkTestStep,
sinkTestStep.name);
+ List<String> expectResults =
sinkTestStep.getExpectedMaterializedResultsAsStrings();
+
assertThat(actualResults).containsExactlyInAnyOrderElementsOf(expectResults);
}
}
@@ -246,7 +245,7 @@ public abstract class BatchRestoreTestBase implements
TableTestProgramRunner {
System.getProperty("user.dir"), metadata.name(),
metadata.version(), program.id);
}
- private static List<String> getExpectedResults(SinkTestStep sinkTestStep,
String tableName) {
+ private static List<String> getActualResults(SinkTestStep sinkTestStep,
String tableName) {
if (sinkTestStep.shouldTestChangelogData()) {
return TestValuesTableFactory.getRawResultsAsStrings(tableName);
} else {
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-expand_1/expand/plan/expand.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-expand_1/expand/plan/expand.json
new file mode 100644
index 00000000000..5a3e5a23e25
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-expand_1/expand/plan/expand.json
@@ -0,0 +1,534 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, MyTable]], fields=[a, b, c])",
+ "dynamicFilteringDataListenerID" : "6ada245c-a285-4b9b-bf21-77e9e44b0cd2",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "batch-exec-expand_1",
+ "projects" : [ [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "BIGINT NOT NULL"
+ } ], [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : null,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "BIGINT NOT NULL"
+ } ] ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e`
BIGINT NOT NULL>",
+ "description" : "Expand(projects=[{a, b, c, 0 AS $e}, {a, null AS b, c, 1
AS $e}])"
+ }, {
+ "id" : 3,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1, 3 ]
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e`
BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[hash[a, b, $e]])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 4,
+ "type" : "batch-exec-sort_1",
+ "configuration" : {
+ "table.exec.resource.sort.memory" : "128 mb",
+ "table.exec.sort.async-merge-enabled" : "true",
+ "table.exec.sort.max-num-file-handles" : "128",
+ "table.exec.spill-compression.block-size" : "64 kb",
+ "table.exec.spill-compression.enabled" : "true"
+ },
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : true,
+ "nullIsLast" : false
+ }, {
+ "index" : 1,
+ "isAscending" : true,
+ "nullIsLast" : false
+ }, {
+ "index" : 3,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e`
BIGINT NOT NULL>",
+ "description" : "Sort(orderBy=[a ASC, b ASC, $e ASC])"
+ }, {
+ "id" : 12,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "KEEP_INPUT_AS_IS",
+ "inputDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1, 3 ]
+ },
+ "isStrict" : true
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$e`
BIGINT NOT NULL>",
+ "description" : "Exchange(distribution=[forward])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 5,
+ "type" : "batch-exec-sort-aggregate_1",
+ "grouping" : [ 0, 1, 3 ],
+ "auxGrouping" : [ ],
+ "aggCalls" : [ {
+ "name" : "c",
+ "internalName" : "$FIRST_VALUE$1",
+ "argList" : [ 2 ],
+ "filterArg" : -1,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "aggInputRowType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647),
`$e` BIGINT NOT NULL>",
+ "isMerge" : false,
+ "isFinal" : true,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0, 1, 3 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `$e` BIGINT NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "SortAggregate(isMerge=[false], groupBy=[a, b, $e],
select=[a, b, $e, FIRST_VALUE(c) AS c])"
+ }, {
+ "id" : 6,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CASE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CASE$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$=$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0`
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+ "description" : "Calc(select=[a, b, c, (CASE(($e = 0), 0, 1) = 0) AS $g_0,
(CASE(($e = 0), 0, 1) = 1) AS $g_1])"
+ }, {
+ "id" : 7,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0`
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+ "description" : "Exchange(distribution=[hash[a]])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 8,
+ "type" : "batch-exec-sort_1",
+ "configuration" : {
+ "table.exec.resource.sort.memory" : "128 mb",
+ "table.exec.sort.async-merge-enabled" : "true",
+ "table.exec.sort.max-num-file-handles" : "128",
+ "table.exec.spill-compression.block-size" : "64 kb",
+ "table.exec.spill-compression.enabled" : "true"
+ },
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0`
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+ "description" : "Sort(orderBy=[a ASC])"
+ }, {
+ "id" : 13,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "KEEP_INPUT_AS_IS",
+ "inputDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "isStrict" : true
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `$g_0`
BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+ "description" : "Exchange(distribution=[forward])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 9,
+ "type" : "batch-exec-sort-aggregate_1",
+ "grouping" : [ 0 ],
+ "auxGrouping" : [ ],
+ "aggCalls" : [ {
+ "name" : "b",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ 1 ],
+ "filterArg" : 3,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "name" : "c",
+ "internalName" : "$MIN$1",
+ "argList" : [ 2 ],
+ "filterArg" : 4,
+ "distinct" : false,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "aggInputRowType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647),
`$g_0` BOOLEAN NOT NULL, `$g_1` BOOLEAN NOT NULL>",
+ "isMerge" : false,
+ "isFinal" : true,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` BIGINT NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "SortAggregate(isMerge=[false], groupBy=[a], select=[a,
COUNT(b) FILTER $g_0 AS b, MIN(c) FILTER $g_1 AS c])"
+ }, {
+ "id" : 10,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[CAST(a AS BIGINT) AS b, CAST(b AS BIGINT) AS
a, c])"
+ }, {
+ "id" : 11,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT NOT NULL"
+ }, {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ],
+ "primaryKey" : {
+ "name" : "PK_b",
+ "type" : "PRIMARY_KEY",
+ "columns" : [ "b" ]
+ }
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `a` BIGINT, `c` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink],
fields=[b, a, c])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 12,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 13,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file