This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0050e426bc0 [FLINK-35799][table] Add CompiledPlan annotations to
BatchExecCalc
0050e426bc0 is described below
commit 0050e426bc0640858c539d43032a4226eea479e5
Author: James Hughes <[email protected]>
AuthorDate: Mon Jul 15 10:41:15 2024 -0400
[FLINK-35799][table] Add CompiledPlan annotations to BatchExecCalc
---
.../flink/table/test/program/SinkTestStep.java | 2 +-
.../planner/plan/ExecNodeGraphInternalPlan.java | 6 +-
.../plan/nodes/exec/batch/BatchExecCalc.java | 34 +++
.../plan/nodes/exec/batch/BatchExecSink.java | 40 +++
.../nodes/exec/batch/BatchExecTableSourceScan.java | 30 +++
.../plan/nodes/exec/common/CommonExecSink.java | 3 +-
.../planner/plan/utils/ExecNodeMetadataUtil.java | 12 +-
.../table/planner/delegation/BatchPlanner.scala | 15 +-
.../table/planner/delegation/PlannerBase.scala | 59 ++++-
.../table/planner/delegation/StreamPlanner.scala | 53 ----
.../apache/flink/table/api/CompiledPlanITCase.java | 23 +-
.../CalcBatchRestoreTest.java} | 13 +-
.../exec/{stream => common}/CalcTestPrograms.java | 16 +-
.../plan/nodes/exec/stream/CalcRestoreTest.java | 1 +
.../planner/plan/nodes/exec/stream/MiscTests.java | 2 +-
...toreTestBase.java => BatchRestoreTestBase.java} | 190 +++------------
.../plan/nodes/exec/testutils/RestoreTestBase.java | 2 +-
.../plan/calc-filter-pushdown.json | 89 +++++++
.../calc-filter/plan/calc-filter.json | 138 +++++++++++
.../plan/calc-project-pushdown.json | 132 ++++++++++
.../calc-sarg/plan/calc-sarg.json | 129 ++++++++++
.../calc-simple/plan/calc-simple.json | 110 +++++++++
.../calc-udf-complex/plan/calc-udf-complex.json | 269 +++++++++++++++++++++
.../calc-udf-simple/plan/calc-udf-simple.json | 108 +++++++++
24 files changed, 1212 insertions(+), 264 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
index 5d57e6095b5..f708adaeab3 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/SinkTestStep.java
@@ -110,7 +110,7 @@ public final class SinkTestStep extends TableTestStep {
: TestKind.SINK_WITH_RESTORE_DATA;
}
- public boolean getTestChangelogData() {
+ public boolean shouldTestChangelogData() {
return testChangelogData;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
index b921535b8b8..59e587b3f27 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/ExecNodeGraphInternalPlan.java
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
import java.io.File;
import java.io.IOException;
@@ -102,10 +102,10 @@ public class ExecNodeGraphInternalPlan implements
InternalPlan {
@Override
public List<String> getSinkIdentifiers() {
return this.execNodeGraph.getRootNodes().stream()
- .filter(execNode -> execNode instanceof StreamExecSink)
+ .filter(execNode -> execNode instanceof CommonExecSink)
.map(
execNode ->
- ((StreamExecSink) execNode)
+ ((CommonExecSink) execNode)
.getTableSinkSpec()
.getContextResolvedTable()
.getIdentifier())
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
index 46ec2f0faef..ab9dbca87f1 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
@@ -28,12 +29,16 @@ import
org.apache.flink.table.planner.plan.fusion.spec.CalcFusionCodegenSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.types.logical.RowType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexNode;
import javax.annotation.Nullable;
@@ -43,6 +48,12 @@ import java.util.List;
import java.util.Optional;
/** Batch {@link ExecNode} for Calc. */
+@ExecNodeMetadata(
+ name = "batch-exec-calc",
+ version = 1,
+ producedTransformations = CommonExecCalc.CALC_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecCalc extends CommonExecCalc implements
BatchExecNode<RowData> {
public BatchExecCalc(
@@ -65,6 +76,29 @@ public class BatchExecCalc extends CommonExecCalc implements
BatchExecNode<RowDa
description);
}
+ @JsonCreator
+ public BatchExecCalc(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_PROJECTION) List<RexNode> projection,
+ @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ persistedConfig,
+ projection,
+ condition,
+ TableStreamOperator.class,
+ false, // retainHeader
+ inputProperties,
+ outputType,
+ description);
+ }
+
public boolean supportFusionCodegen() {
return true;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index 9a28b1ccf7f..c9a6e8c3c63 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.Column;
@@ -33,12 +34,16 @@ import
org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
import
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -46,6 +51,20 @@ import java.util.List;
/**
* Batch {@link ExecNode} to write data into an external sink defined by a
{@link DynamicTableSink}.
*/
+@ExecNodeMetadata(
+ name = "batch-exec-sink",
+ version = 1,
+ consumedOptions = {
+ "table.exec.sink.not-null-enforcer",
+ "table.exec.sink.type-length-enforcer",
+ },
+ producedTransformations = {
+ CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
+ CommonExecSink.PARTITIONER_TRANSFORMATION,
+ CommonExecSink.SINK_TRANSFORMATION
+ },
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecSink extends CommonExecSink implements
BatchExecNode<Object> {
public BatchExecSink(
ReadableConfig tableConfig,
@@ -65,6 +84,27 @@ public class BatchExecSink extends CommonExecSink implements
BatchExecNode<Objec
description);
}
+ @JsonCreator
+ public BatchExecSink(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_DYNAMIC_TABLE_SINK) DynamicTableSinkSpec
tableSinkSpec,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) LogicalType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ persistedConfig,
+ tableSinkSpec,
+ ChangelogMode.insertOnly(),
+ true, // isBounded
+ inputProperties,
+ outputType,
+ description);
+ }
+
@SuppressWarnings("unchecked")
@Override
protected Transformation<Object> translateToPlanInternal(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
index e982477f7ab..f11433ff62c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
@@ -29,6 +30,7 @@ import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
import
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
@@ -37,6 +39,9 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import java.util.Collections;
import java.util.UUID;
@@ -44,6 +49,12 @@ import java.util.UUID;
* Batch {@link ExecNode} to read data from an external source defined by a
bounded {@link
* ScanTableSource}.
*/
+@ExecNodeMetadata(
+ name = "batch-exec-table-source-scan",
+ version = 1,
+ producedTransformations =
CommonExecTableSourceScan.SOURCE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecTableSourceScan extends CommonExecTableSourceScan
implements BatchExecNode<RowData> {
@@ -86,6 +97,25 @@ public class BatchExecTableSourceScan extends
CommonExecTableSourceScan
this.tableConfig = tableConfig;
}
+ @JsonCreator
+ public BatchExecTableSourceScan(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig tableConfig,
+ @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec
tableSourceSpec,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ tableConfig,
+ tableSourceSpec,
+ Collections.emptyList(),
+ outputType,
+ description);
+ this.tableConfig = tableConfig;
+ }
+
public String getDynamicFilteringDataListenerID() {
return dynamicFilteringDataListenerID;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index e6179dbaa58..e58de91048e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -66,7 +66,6 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import
org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
@@ -568,7 +567,7 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
private ProviderContext createProviderContext(ExecNodeConfig config) {
return name -> {
- if (this instanceof StreamExecNode && config.shouldSetUid()) {
+ if (config.shouldSetUid()) {
return Optional.of(createTransformationUid(name, config));
}
return Optional.empty();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 595cded6c08..99b4db795a2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -28,6 +28,9 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
+import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
@@ -148,6 +151,10 @@ public final class ExecNodeMetadataUtil {
add(StreamExecPythonGroupAggregate.class);
add(StreamExecPythonGroupWindowAggregate.class);
add(StreamExecPythonOverAggregate.class);
+ // Batch execution mode
+ add(BatchExecSink.class);
+ add(BatchExecTableSourceScan.class);
+ add(BatchExecCalc.class);
}
};
@@ -198,8 +205,9 @@ public final class ExecNodeMetadataUtil {
}
public static <T extends ExecNode<?>> boolean isUnsupported(Class<T>
execNode) {
- return !StreamExecNode.class.isAssignableFrom(execNode)
- || UNSUPPORTED_JSON_SERDE_CLASSES.contains(execNode);
+ boolean streamOrKnownExecNode =
+ StreamExecNode.class.isAssignableFrom(execNode) ||
execNodes().contains(execNode);
+ return !streamOrKnownExecNode ||
UNSUPPORTED_JSON_SERDE_CLASSES.contains(execNode);
}
public static void addTestNode(Class<? extends ExecNode<?>> execNodeClass)
{
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index bb4c1b75a28..120c5f70b6f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.delegation
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.dag.Transformation
import org.apache.flink.configuration.ExecutionOptions
-import org.apache.flink.table.api.{ExplainDetail, ExplainFormat,
PlanReference, TableConfig, TableException}
+import org.apache.flink.table.api._
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
import org.apache.flink.table.delegation.{Executor, InternalPlan}
@@ -163,19 +163,6 @@ class BatchPlanner(
classLoader)
}
- override def loadPlan(planReference: PlanReference): InternalPlan = {
- throw new UnsupportedOperationException(
- "The compiled plan feature is not supported in batch mode.")
- }
-
- override def compilePlan(modifyOperations: util.List[ModifyOperation]):
InternalPlan =
- throw new UnsupportedOperationException(
- "The compiled plan feature is not supported in batch mode.")
-
- override def translatePlan(plan: InternalPlan): util.List[Transformation[_]]
=
- throw new UnsupportedOperationException(
- "The compiled plan feature is not supported in batch mode.")
-
override def explainPlan(plan: InternalPlan, extraDetails: ExplainDetail*):
String =
throw new UnsupportedOperationException(
"The compiled plan feature is not supported in batch mode.")
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 45788e6278e..55ef54a5bcd 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -19,14 +19,16 @@ package org.apache.flink.table.planner.delegation
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.dag.Transformation
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.graph.StreamGraph
import org.apache.flink.table.api._
+import org.apache.flink.table.api.PlanReference.{ContentPlanReference,
FilePlanReference, ResourcePlanReference}
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
-import org.apache.flink.table.delegation.{Executor, Parser, ParserFactory,
Planner}
+import org.apache.flink.table.delegation._
import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil,
TableFactoryUtil}
import org.apache.flink.table.module.{Module, ModuleManager}
import org.apache.flink.table.operations._
@@ -38,10 +40,11 @@ import
org.apache.flink.table.planner.connectors.DynamicSinkUtils
import
org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.operations.PlannerQueryOperation
+import org.apache.flink.table.planner.plan.ExecNodeGraphInternalPlan
import org.apache.flink.table.planner.plan.nodes.calcite.LogicalLegacySink
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNodeGraph,
ExecNodeGraphGenerator}
import
org.apache.flink.table.planner.plan.nodes.exec.processor.{ExecNodeGraphProcessor,
ProcessorContext}
-import org.apache.flink.table.planner.plan.nodes.exec.serde.SerdeContext
+import org.apache.flink.table.planner.plan.nodes.exec.serde.{JsonSerdeUtil,
SerdeContext}
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.planner.plan.optimize.Optimizer
import org.apache.flink.table.planner.sinks.DataStreamTableSink
@@ -60,6 +63,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalTableModify
+import java.io.{File, IOException}
import java.lang.{Long => JLong}
import java.util
import java.util.{Collections, TimeZone}
@@ -182,6 +186,57 @@ abstract class PlannerBase(
transformations
}
+ override def loadPlan(planReference: PlanReference): InternalPlan = {
+ val ctx = createSerdeContext
+ val objectReader: ObjectReader = JsonSerdeUtil.createObjectReader(ctx)
+ val execNodeGraph = planReference match {
+ case filePlanReference: FilePlanReference =>
+ objectReader.readValue(filePlanReference.getFile,
classOf[ExecNodeGraph])
+ case contentPlanReference: ContentPlanReference =>
+ objectReader.readValue(contentPlanReference.getContent,
classOf[ExecNodeGraph])
+ case resourcePlanReference: ResourcePlanReference =>
+ val url = resourcePlanReference.getClassLoader
+ .getResource(resourcePlanReference.getResourcePath)
+ if (url == null) {
+ throw new IOException("Cannot load the plan reference from
classpath: " + planReference)
+ }
+ objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph])
+ case _ =>
+ throw new IllegalStateException(
+ "Unknown PlanReference. This is a bug, please contact the
developers")
+ }
+ compileExecNodeGraphToInternalPlan(ctx, execNodeGraph)
+ }
+
+ override def compilePlan(modifyOperations: util.List[ModifyOperation]):
InternalPlan = {
+ beforeTranslation()
+ val relNodes = modifyOperations.map(translateToRel)
+ val optimizedRelNodes = optimize(relNodes)
+ val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled =
true)
+ afterTranslation()
+ compileExecNodeGraphToInternalPlan(createSerdeContext, execGraph)
+ }
+
+ override def translatePlan(plan: InternalPlan): util.List[Transformation[_]]
= {
+ beforeTranslation()
+ val execGraph =
plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
+ val transformations = translateToPlan(execGraph)
+ afterTranslation()
+ transformations
+ }
+
+ private def compileExecNodeGraphToInternalPlan(
+ ctx: SerdeContext,
+ execNodeGraph: ExecNodeGraph) = {
+ new ExecNodeGraphInternalPlan(
+ () =>
+ JsonSerdeUtil
+ .createObjectWriter(ctx)
+ .withDefaultPrettyPrinter()
+ .writeValueAsString(execNodeGraph),
+ execNodeGraph)
+ }
+
/** Converts a relational tree of [[ModifyOperation]] into a Calcite
relational expression. */
@VisibleForTesting
private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode
= {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index fb32326f117..f553c62761e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -169,59 +169,6 @@ class StreamPlanner(
classLoader)
}
- override def loadPlan(planReference: PlanReference): InternalPlan = {
- val ctx = createSerdeContext
- val objectReader: ObjectReader = JsonSerdeUtil.createObjectReader(ctx)
- val execNodeGraph = planReference match {
- case filePlanReference: FilePlanReference =>
- objectReader.readValue(filePlanReference.getFile,
classOf[ExecNodeGraph])
- case contentPlanReference: ContentPlanReference =>
- objectReader.readValue(contentPlanReference.getContent,
classOf[ExecNodeGraph])
- case resourcePlanReference: ResourcePlanReference =>
- val url = resourcePlanReference.getClassLoader
- .getResource(resourcePlanReference.getResourcePath)
- if (url == null) {
- throw new IOException("Cannot load the plan reference from
classpath: " + planReference)
- }
- objectReader.readValue(new File(url.toURI), classOf[ExecNodeGraph])
- case _ =>
- throw new IllegalStateException(
- "Unknown PlanReference. This is a bug, please contact the
developers")
- }
-
- new ExecNodeGraphInternalPlan(
- // ensures that the JSON output is always normalized
- () =>
- JsonSerdeUtil
- .createObjectWriter(ctx)
- .withDefaultPrettyPrinter()
- .writeValueAsString(execNodeGraph),
- execNodeGraph)
- }
-
- override def compilePlan(modifyOperations: util.List[ModifyOperation]):
InternalPlan = {
- beforeTranslation()
- val relNodes = modifyOperations.map(translateToRel)
- val optimizedRelNodes = optimize(relNodes)
- val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled =
true)
- afterTranslation()
-
- val compiledJson = JsonSerdeUtil
- .createObjectWriter(createSerdeContext)
- .withDefaultPrettyPrinter()
- .writeValueAsString(execGraph)
-
- new ExecNodeGraphInternalPlan(() => compiledJson, execGraph)
- }
-
- override def translatePlan(plan: InternalPlan): util.List[Transformation[_]]
= {
- beforeTranslation()
- val execGraph =
plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
- val transformations = translateToPlan(execGraph)
- afterTranslation()
- transformations
- }
-
override def explainPlan(plan: InternalPlan, extraDetails: ExplainDetail*):
String = {
beforeTranslation()
val execGraph =
plan.asInstanceOf[ExecNodeGraphInternalPlan].getExecNodeGraph
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index 591f0a52e4a..514a57b9c5d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -404,28 +404,13 @@ class CompiledPlanITCase extends JsonPlanTestBase {
}
@Test
- void testBatchMode() {
+ void testCompileAndExecutePlanBatchMode() throws Exception {
tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
- String srcTableDdl =
- "CREATE TABLE src (\n"
- + " a bigint\n"
- + ") with (\n"
- + " 'connector' = 'values',\n"
- + " 'bounded' = 'true')";
- tableEnv.executeSql(srcTableDdl);
-
- String sinkTableDdl =
- "CREATE TABLE sink (\n"
- + " a bigint\n"
- + ") with (\n"
- + " 'connector' = 'values',\n"
- + " 'table-sink-class' = 'DEFAULT')";
- tableEnv.executeSql(sinkTableDdl);
+ File sinkPath = createSourceSinkTables();
- assertThatThrownBy(() -> tableEnv.compilePlanSql("INSERT INTO sink
SELECT * FROM src"))
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessage("The compiled plan feature is not supported in
batch mode.");
+ tableEnv.compilePlanSql("INSERT INTO sink SELECT * FROM
src").execute().await();
+ assertResult(DATA, sinkPath);
}
private File createSourceSinkTables() throws IOException {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcBatchRestoreTest.java
similarity index 80%
copy from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
copy to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcBatchRestoreTest.java
index 49585d85ccb..095b4d411ca 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CalcBatchRestoreTest.java
@@ -16,19 +16,20 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
-import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
import java.util.Arrays;
import java.util.List;
-/** Restore tests for {@link StreamExecCalc}. */
-public class CalcRestoreTest extends RestoreTestBase {
+/** Restore tests for {@link BatchExecCalc}. */
+public class CalcBatchRestoreTest extends BatchRestoreTestBase {
- public CalcRestoreTest() {
- super(StreamExecCalc.class);
+ public CalcBatchRestoreTest() {
+ super(BatchExecCalc.class);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
similarity index 96%
rename from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcTestPrograms.java
rename to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
index e6606c279e9..0ccbaccc7a0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc1;
@@ -36,7 +36,7 @@ import java.time.LocalDateTime;
*/
public class CalcTestPrograms {
- static final TableTestProgram SIMPLE_CALC =
+ public static final TableTestProgram SIMPLE_CALC =
TableTestProgram.of("calc-simple", "validates basic calc node")
.setupTableSource(
SourceTestStep.newBuilder("t")
@@ -53,7 +53,7 @@ public class CalcTestPrograms {
.runSql("INSERT INTO sink_t SELECT a + 1, b FROM t")
.build();
- static final TableTestProgram CALC_PROJECT_PUSHDOWN =
+ public static final TableTestProgram CALC_PROJECT_PUSHDOWN =
TableTestProgram.of(
"calc-project-pushdown", "validates calc node with
project pushdown")
.setupTableSource(
@@ -73,7 +73,7 @@ public class CalcTestPrograms {
"INSERT INTO sink_t SELECT a, CAST(a AS VARCHAR)
FROM source_t WHERE a > CAST(1 AS BIGINT)")
.build();
- static final TableTestProgram CALC_FILTER =
+ public static final TableTestProgram CALC_FILTER =
TableTestProgram.of("calc-filter", "validates calc node with
filter")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
@@ -90,7 +90,7 @@ public class CalcTestPrograms {
.runSql("INSERT INTO sink_t SELECT * FROM source_t WHERE b
> 0")
.build();
- static final TableTestProgram CALC_FILTER_PUSHDOWN =
+ public static final TableTestProgram CALC_FILTER_PUSHDOWN =
TableTestProgram.of("calc-filter-pushdown", "validates calc node
with filter pushdown")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
@@ -109,7 +109,7 @@ public class CalcTestPrograms {
"INSERT INTO sink_t SELECT a, b FROM source_t
WHERE a > CAST(420 AS BIGINT)")
.build();
- static final TableTestProgram CALC_SARG =
+ public static final TableTestProgram CALC_SARG =
TableTestProgram.of("calc-sarg", "validates calc node with Sarg")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
@@ -128,7 +128,7 @@ public class CalcTestPrograms {
"INSERT INTO sink_t SELECT a FROM source_t WHERE a
= 1 or a = 2 or a is null")
.build();
- static final TableTestProgram CALC_UDF_SIMPLE =
+ public static final TableTestProgram CALC_UDF_SIMPLE =
TableTestProgram.of("calc-udf-simple", "validates calc node with
simple UDF")
.setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
.setupTableSource(
@@ -146,7 +146,7 @@ public class CalcTestPrograms {
.runSql("INSERT INTO sink_t SELECT a, udf1(a) FROM
source_t")
.build();
- static final TableTestProgram CALC_UDF_COMPLEX =
+ public static final TableTestProgram CALC_UDF_COMPLEX =
TableTestProgram.of("calc-udf-complex", "validates calc node with
complex UDFs")
.setupTemporaryCatalogFunction("udf1", JavaFunc0.class)
.setupTemporaryCatalogFunction("udf2", JavaFunc1.class)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
index 49585d85ccb..80b5fde5e53 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CalcRestoreTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
index 9d894718711..ce918a59b41 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscTests.java
@@ -107,7 +107,7 @@ class MiscTests implements TableTestProgramRunner {
}
private static List<String> getExpectedResults(SinkTestStep sinkTestStep,
String tableName) {
- if (sinkTestStep.getTestChangelogData()) {
+ if (sinkTestStep.shouldTestChangelogData()) {
return TestValuesTableFactory.getRawResultsAsStrings(tableName);
} else {
return TestValuesTableFactory.getResultsAsStrings(tableName);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
similarity index 53%
copy from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
copy to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
index b409296390f..df5930035ac 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchRestoreTestBase.java
@@ -18,22 +18,15 @@
package org.apache.flink.table.planner.plan.nodes.exec.testutils;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.configuration.StateBackendOptions;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.RestoreMode;
-import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.PlanReference;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
@@ -45,86 +38,54 @@ import
org.apache.flink.table.test.program.TestStep.TestKind;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
-import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import java.net.URI;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
/**
- * Base class for implementing restore tests for {@link ExecNode}.You can
generate json compiled
- * plan and a savepoint for the latest node version by running {@link
- * RestoreTestBase#generateTestSetupFiles(TableTestProgram)} which is disabled
by default.
+ * Base class for implementing compiled plan tests for {@link BatchExecNode}.
You can generate json
+ * compiled plan for the latest node version by running {@link
+ * BatchRestoreTestBase#generateCompiledPlans(TableTestProgram)}. This method
does not recreate the
+ * compiled plan if it already exists for the given version of the operator.
*
* <p><b>Note:</b> The test base uses {@link
TableConfigOptions.CatalogPlanCompilation#SCHEMA}
- * because it needs to adjust source and sink properties before and after the
restore. Therefore,
- * the test base can not be used for testing storing table options in the
compiled plan.
+ * because it needs to adjust source and sink properties. Therefore, the test
base can not be used
+ * for testing storing table options in the compiled plan.
*/
@ExtendWith(MiniClusterExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestMethodOrder(OrderAnnotation.class)
-public abstract class RestoreTestBase implements TableTestProgramRunner {
+public abstract class BatchRestoreTestBase implements TableTestProgramRunner {
private final Class<? extends ExecNode<?>> execNodeUnderTest;
private final List<Class<? extends ExecNode<?>>> childExecNodesUnderTest;
- private final AfterRestoreSource afterRestoreSource;
- protected RestoreTestBase(Class<? extends ExecNode<?>> execNodeUnderTest) {
- this(execNodeUnderTest, new ArrayList<>(), AfterRestoreSource.FINITE);
+ protected BatchRestoreTestBase(Class<? extends ExecNode<?>>
execNodeUnderTest) {
+ this(execNodeUnderTest, new ArrayList<>());
}
- protected RestoreTestBase(
+ protected BatchRestoreTestBase(
Class<? extends ExecNode<?>> execNodeUnderTest,
List<Class<? extends ExecNode<?>>> childExecNodesUnderTest) {
- this(execNodeUnderTest, childExecNodesUnderTest,
AfterRestoreSource.FINITE);
- }
-
- protected RestoreTestBase(
- Class<? extends ExecNode<?>> execNodeUnderTest, AfterRestoreSource
state) {
- this(execNodeUnderTest, new ArrayList<>(), state);
- }
-
- protected RestoreTestBase(
- Class<? extends ExecNode<?>> execNodeUnderTest,
- List<Class<? extends ExecNode<?>>> childExecNodesUnderTest,
- AfterRestoreSource state) {
this.execNodeUnderTest = execNodeUnderTest;
this.childExecNodesUnderTest = childExecNodesUnderTest;
- this.afterRestoreSource = state;
- }
-
- /**
- * AfterRestoreSource defines the source behavior while running {@link
- * RestoreTestBase#testRestore}.
- */
- protected enum AfterRestoreSource {
- FINITE,
- INFINITE,
- NO_RESTORE
}
// Used for testing Restore Test Completeness
@@ -142,7 +103,6 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
return EnumSet.of(
TestKind.CONFIG,
TestKind.FUNCTION,
- TestKind.TEMPORAL_FUNCTION,
TestKind.SOURCE_WITH_RESTORE_DATA,
TestKind.SOURCE_WITH_DATA,
TestKind.SINK_WITH_RESTORE_DATA,
@@ -159,8 +119,6 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
TestValuesTableFactory.clearAllData();
}
- private @TempDir Path tmpDir;
-
private List<ExecNodeMetadata> getAllMetadata() {
return
ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest);
}
@@ -176,41 +134,17 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
supportedPrograms().stream().map(p ->
Arguments.of(p, metadata)));
}
- private void registerSinkObserver(
- final List<CompletableFuture<?>> futures,
- final SinkTestStep sinkTestStep,
- final boolean ignoreAfter) {
- final CompletableFuture<Object> future = new CompletableFuture<>();
- futures.add(future);
- final String tableName = sinkTestStep.name;
- TestValuesTableFactory.registerLocalRawResultsObserver(
- tableName,
- (integer, strings) -> {
- List<String> results =
- new
ArrayList<>(sinkTestStep.getExpectedBeforeRestoreAsStrings());
- if (!ignoreAfter) {
-
results.addAll(sinkTestStep.getExpectedAfterRestoreAsStrings());
- }
- List<String> expectedResults =
getExpectedResults(sinkTestStep, tableName);
- final boolean shouldComplete =
- CollectionUtils.isEqualCollection(expectedResults,
results);
- if (shouldComplete) {
- future.complete(null);
- }
- });
- }
-
- /**
- * Execute this test to generate test files. Remember to be using the
correct branch when
- * generating the test files.
- */
- @Disabled
+ /** Generates compiled plans for a given TableTestProgram. */
@ParameterizedTest
@MethodSource("supportedPrograms")
@Order(0)
- public void generateTestSetupFiles(TableTestProgram program) throws
Exception {
- final EnvironmentSettings settings =
EnvironmentSettings.inStreamingMode();
- settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND,
"rocksdb");
+ public void generateCompiledPlans(TableTestProgram program) {
+ Path path = getPlanPath(program, getLatestMetadata());
+ if (path.toFile().exists()) {
+ return;
+ }
+
+ final EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
final TableEnvironment tEnv = TableEnvironment.create(settings);
program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
tEnv.getConfig()
@@ -223,14 +157,13 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("data-id", id);
- options.put("terminating", "false");
+ options.put("bounded", "true");
+ options.put("terminating", "true");
options.put("runtime-source", "NewSource");
sourceTestStep.apply(tEnv, options);
}
- final List<CompletableFuture<?>> futures = new ArrayList<>();
for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
- registerSinkObserver(futures, sinkTestStep, true);
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("sink-insert-only", "false");
@@ -238,7 +171,6 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
}
program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
- program.getSetupTemporalFunctionTestSteps().forEach(s ->
s.apply(tEnv));
final CompiledPlan compiledPlan;
if (program.runSteps.get(0).getKind() == TestKind.STATEMENT_SET) {
@@ -249,39 +181,15 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
compiledPlan = tEnv.compilePlanSql(sqlTestStep.sql);
}
- compiledPlan.writeToFile(getPlanPath(program, getLatestMetadata()));
-
- final TableResult tableResult = compiledPlan.execute();
- CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).get();
- final JobClient jobClient = tableResult.getJobClient().get();
- final String savepoint =
- jobClient
- .stopWithSavepoint(false, tmpDir.toString(),
SavepointFormatType.DEFAULT)
- .get();
- CommonTestUtils.waitForJobStatus(jobClient,
Collections.singletonList(JobStatus.FINISHED));
- final Path savepointPath = Paths.get(new URI(savepoint));
- final Path savepointDirPath = getSavepointPath(program,
getLatestMetadata());
- Files.createDirectories(savepointDirPath);
- Files.move(savepointPath, savepointDirPath,
StandardCopyOption.ATOMIC_MOVE);
+ compiledPlan.writeToFile(path);
}
@ParameterizedTest
@MethodSource("createSpecs")
@Order(1)
- void testRestore(TableTestProgram program, ExecNodeMetadata metadata)
throws Exception {
- final EnvironmentSettings settings =
EnvironmentSettings.inStreamingMode();
- final SavepointRestoreSettings restoreSettings;
- if (afterRestoreSource == AfterRestoreSource.NO_RESTORE) {
- restoreSettings = SavepointRestoreSettings.none();
- } else {
- restoreSettings =
- SavepointRestoreSettings.forPath(
- getSavepointPath(program, metadata).toString(),
- false,
- RestoreMode.NO_CLAIM);
- }
- SavepointRestoreSettings.toConfiguration(restoreSettings,
settings.getConfiguration());
- settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND,
"rocksdb");
+ void loadAndRunCompiledPlan(TableTestProgram program, ExecNodeMetadata
metadata)
+ throws Exception {
+ final EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig()
.set(
@@ -291,27 +199,21 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
for (SourceTestStep sourceTestStep :
program.getSetupSourceTestSteps()) {
- final Collection<Row> data =
- afterRestoreSource == AfterRestoreSource.NO_RESTORE
- ? sourceTestStep.dataBeforeRestore
- : sourceTestStep.dataAfterRestore;
+
+ List<Row> data = new ArrayList<>();
+ data.addAll(sourceTestStep.dataBeforeRestore);
+ data.addAll(sourceTestStep.dataAfterRestore);
final String id = TestValuesTableFactory.registerData(data);
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("data-id", id);
options.put("runtime-source", "NewSource");
- if (afterRestoreSource == AfterRestoreSource.INFINITE) {
- options.put("terminating", "false");
- }
+ options.put("terminating", "true");
+ options.put("bounded", "true");
sourceTestStep.apply(tEnv, options);
}
- final List<CompletableFuture<?>> futures = new ArrayList<>();
-
for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
- if (afterRestoreSource == AfterRestoreSource.INFINITE) {
- registerSinkObserver(futures, sinkTestStep, false);
- }
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("disable-lookup", "true");
@@ -320,28 +222,16 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
}
program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
- program.getSetupTemporalFunctionTestSteps().forEach(s ->
s.apply(tEnv));
final CompiledPlan compiledPlan =
tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program,
metadata)));
- if (afterRestoreSource == AfterRestoreSource.INFINITE) {
- final TableResult tableResult = compiledPlan.execute();
- CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).get();
- tableResult.getJobClient().get().cancel().get();
- } else {
- compiledPlan.execute().await();
- for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
- List<String> expectedResults =
getExpectedResults(sinkTestStep, sinkTestStep.name);
- assertThat(expectedResults)
- .containsExactlyInAnyOrder(
- Stream.concat(
-
sinkTestStep.getExpectedBeforeRestoreAsStrings()
- .stream(),
-
sinkTestStep.getExpectedAfterRestoreAsStrings()
- .stream())
- .toArray(String[]::new));
- }
+ compiledPlan.execute().await();
+ for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+ List<String> expectedResults = getExpectedResults(sinkTestStep,
sinkTestStep.name);
+ assertThat(expectedResults)
+ .containsExactlyInAnyOrder(
+ sinkTestStep.getExpectedAsStrings().toArray(new
String[0]));
}
}
@@ -350,10 +240,6 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
getTestResourceDirectory(program, metadata) + "/plan/" +
program.id + ".json");
}
- private Path getSavepointPath(TableTestProgram program, ExecNodeMetadata
metadata) {
- return Paths.get(getTestResourceDirectory(program, metadata) +
"/savepoint/");
- }
-
private String getTestResourceDirectory(TableTestProgram program,
ExecNodeMetadata metadata) {
return String.format(
"%s/src/test/resources/restore-tests/%s_%d/%s",
@@ -361,7 +247,7 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
}
private static List<String> getExpectedResults(SinkTestStep sinkTestStep,
String tableName) {
- if (sinkTestStep.getTestChangelogData()) {
+ if (sinkTestStep.shouldTestChangelogData()) {
return TestValuesTableFactory.getRawResultsAsStrings(tableName);
} else {
return TestValuesTableFactory.getResultsAsStrings(tableName);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index b409296390f..106516b11b8 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -361,7 +361,7 @@ public abstract class RestoreTestBase implements
TableTestProgramRunner {
}
private static List<String> getExpectedResults(SinkTestStep sinkTestStep,
String tableName) {
- if (sinkTestStep.getTestChangelogData()) {
+ if (sinkTestStep.shouldTestChangelogData()) {
return TestValuesTableFactory.getRawResultsAsStrings(tableName);
} else {
return TestValuesTableFactory.getResultsAsStrings(tableName);
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
new file mode 100644
index 00000000000..6130f3a119d
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter-pushdown/plan/calc-filter-pushdown.json
@@ -0,0 +1,89 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 14,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 420,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[>(a, 420:BIGINT)]]], fields=[a, b])",
+ "dynamicFilteringDataListenerID" : "adb172e8-15d9-46f8-a27d-3373a9ff478f",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 15,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, b])"
+ } ],
+ "edges" : [ {
+ "source" : 14,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter/plan/calc-filter.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter/plan/calc-filter.json
new file mode 100644
index 00000000000..0b9ee103a02
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-filter/plan/calc-filter.json
@@ -0,0 +1,138 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 11,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "DOUBLE"
+ }, {
+ "name" : "d",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+ "dynamicFilteringDataListenerID" : "bde6e1ba-d0c8-4d78-9eed-b3aea8bbbcb1",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 12,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "DOUBLE"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[a, b, c, d], where=[(b > 0)])"
+ }, {
+ "id" : 13,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT"
+ }, {
+ "name" : "c",
+ "dataType" : "DOUBLE"
+ }, {
+ "name" : "d",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT, `c` DOUBLE, `d`
VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, b, c, d])"
+ } ],
+ "edges" : [ {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 12,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
new file mode 100644
index 00000000000..315a799eaa4
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-project-pushdown/plan/calc-project-pushdown.json
@@ -0,0 +1,132 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 16,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ]
+ }, {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ] ],
+ "producedType" : "ROW<`a` BIGINT> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` BIGINT> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[>(a, 1:BIGINT)], project=[a],
metadata=[]]], fields=[a])",
+ "dynamicFilteringDataListenerID" : "681c0bb8-b2bb-438f-9cfd-92c61aeba248",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 17,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS
EXPR$1])"
+ }, {
+ "id" : 18,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "a1",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `EXPR$1` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 16,
+ "target" : 17,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 17,
+ "target" : 18,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-sarg/plan/calc-sarg.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-sarg/plan/calc-sarg.json
new file mode 100644
index 00000000000..3d74515cb53
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-sarg/plan/calc-sarg.json
@@ -0,0 +1,129 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 19,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a])",
+ "dynamicFilteringDataListenerID" : "8e80a89c-3b29-4638-af93-77420a9bc628",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 20,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "INTERNAL",
+ "internalName" : "$SEARCH$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "LITERAL",
+ "sarg" : {
+ "ranges" : [ {
+ "lower" : {
+ "value" : 1,
+ "boundType" : "CLOSED"
+ },
+ "upper" : {
+ "value" : 1,
+ "boundType" : "CLOSED"
+ }
+ }, {
+ "lower" : {
+ "value" : 2,
+ "boundType" : "CLOSED"
+ },
+ "upper" : {
+ "value" : 2,
+ "boundType" : "CLOSED"
+ }
+ } ],
+ "nullAs" : "TRUE"
+ },
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT>",
+ "description" : "Calc(select=[a], where=[SEARCH(a, Sarg[1, 2; NULL AS
TRUE])])"
+ }, {
+ "id" : 21,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a])"
+ } ],
+ "edges" : [ {
+ "source" : 19,
+ "target" : 20,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 20,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-simple/plan/calc-simple.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-simple/plan/calc-simple.json
new file mode 100644
index 00000000000..e1e9b0a4ce4
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-simple/plan/calc-simple.json
@@ -0,0 +1,110 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 8,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` DOUBLE>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t]], fields=[a, b])",
+ "dynamicFilteringDataListenerID" : "7d70c7cb-2ee4-460e-9a9f-2385a1fd8526",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 9,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$+$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "DOUBLE"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT, `b` DOUBLE>",
+ "description" : "Calc(select=[(a + 1) AS EXPR$0, b])"
+ }, {
+ "id" : 10,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "DOUBLE"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` BIGINT, `b` DOUBLE>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[EXPR$0, b])"
+ } ],
+ "edges" : [ {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
new file mode 100644
index 00000000000..46983e54a54
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-complex/plan/calc-udf-complex.json
@@ -0,0 +1,269 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 25,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`d` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+ "dynamicFilteringDataListenerID" : "89c634cd-b623-4893-b177-131aa84da4ba",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 26,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf2`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "systemName" : "udf3",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "systemName" : "udf4",
+ "operands" : [ {
+ "kind" : "CALL",
+ "internalName" : "$SUBSTRING$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 5,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf5`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1000,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$AND$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$OR$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$<$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 100,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 10,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2`
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+ "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS a1, b,
udf2(b, b, d) AS b1, udf3(c, b) AS c1, udf4(SUBSTRING(c, 1, 5)) AS c2, udf5(d,
1000) AS d1], where=[(((udf1(a) > 0) OR ((a * b) < 100)) AND (b > 10))])"
+ }, {
+ "id" : 27,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "a1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c2",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d1",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2`
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, a1, b, b1, c1, c2, d1])"
+ } ],
+ "edges" : [ {
+ "source" : 25,
+ "target" : 26,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 26,
+ "target" : 27,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
new file mode 100644
index 00000000000..b1478457069
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-calc_1/calc-udf-simple/plan/calc-udf-simple.json
@@ -0,0 +1,108 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 22,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a])",
+ "dynamicFilteringDataListenerID" : "188277cb-2bd5-4084-85e8-60165c0fff02",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 23,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+ "description" : "Calc(select=[a, udf1(CAST(a AS BIGINT)) AS EXPR$1])"
+ }, {
+ "id" : 24,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "a1",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 22,
+ "target" : 23,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 23,
+ "target" : 24,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file