This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7cd3554 [FLINK-23370][table-planner] Propagate Boundedness of
ScanRuntimeProviders to Transformation
7cd3554 is described below
commit 7cd3554899fa97e3f27202f7ad42cb06cde38198
Author: Timo Walther <[email protected]>
AuthorDate: Wed Jul 14 08:43:53 2021 +0200
[FLINK-23370][table-planner] Propagate Boundedness of ScanRuntimeProviders
to Transformation
This closes #16492.
---
.../nodes/exec/batch/BatchExecTableSourceScan.java | 7 +-
.../exec/common/CommonExecTableSourceScan.java | 52 ++++++++++-
.../exec/stream/StreamExecTableSourceScan.java | 4 +-
.../plan/nodes/exec/TransformationsTest.java | 102 +++++++++++++++++++++
4 files changed, 155 insertions(+), 10 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
index d34947f..4f9cd19 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
@@ -47,13 +47,14 @@ public class BatchExecTableSourceScan extends
CommonExecTableSourceScan
StreamExecutionEnvironment env,
InputFormat<RowData, ?> inputFormat,
InternalTypeInfo<RowData> outputTypeInfo,
- String name) {
+ String operatorName) {
// env.createInput will use ContinuousFileReaderOperator, but it do
not support multiple
// paths. If read partitioned source, after partition pruning, we need
let InputFormat
// to read multiple partitions which are multiple paths.
// We can use InputFormatSourceFunction directly to support
InputFormat.
- InputFormatSourceFunction<RowData> func =
+ final InputFormatSourceFunction<RowData> function =
new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);
- return env.addSource(func, name, outputTypeInfo).getTransformation();
+ return createSourceFunctionTransformation(
+ env, function, true, operatorName, outputTypeInfo);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index 2911373..f289bbe 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -20,10 +20,15 @@ package
org.apache.flink.table.planner.plan.nodes.exec.common;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -78,11 +83,16 @@ public abstract class CommonExecTableSourceScan extends
ExecNodeBase<RowData>
ScanTableSource.ScanRuntimeProvider provider =
tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
if (provider instanceof SourceFunctionProvider) {
- SourceFunction<RowData> sourceFunction =
- ((SourceFunctionProvider) provider).createSourceFunction();
- return env.addSource(sourceFunction, operatorName,
outputTypeInfo).getTransformation();
+ final SourceFunctionProvider sourceFunctionProvider =
(SourceFunctionProvider) provider;
+ final SourceFunction<RowData> function =
sourceFunctionProvider.createSourceFunction();
+ return createSourceFunctionTransformation(
+ env,
+ function,
+ sourceFunctionProvider.isBounded(),
+ operatorName,
+ outputTypeInfo);
} else if (provider instanceof InputFormatProvider) {
- InputFormat<RowData, ?> inputFormat =
+ final InputFormat<RowData, ?> inputFormat =
((InputFormatProvider) provider).createInputFormat();
return createInputFormatTransformation(env, inputFormat,
outputTypeInfo, operatorName);
} else if (provider instanceof SourceProvider) {
@@ -108,6 +118,38 @@ public abstract class CommonExecTableSourceScan extends
ExecNodeBase<RowData>
}
/**
+ * Adopted from {@link
StreamExecutionEnvironment#addSource(SourceFunction, String,
+ * TypeInformation)} but with custom {@link Boundedness}.
+ */
+ protected Transformation<RowData> createSourceFunctionTransformation(
+ StreamExecutionEnvironment env,
+ SourceFunction<RowData> function,
+ boolean isBounded,
+ String operatorName,
+ TypeInformation<RowData> outputTypeInfo) {
+
+ env.clean(function);
+
+ final int parallelism;
+ if (function instanceof ParallelSourceFunction) {
+ parallelism = env.getParallelism();
+ } else {
+ parallelism = 1;
+ }
+
+ final Boundedness boundedness;
+ if (isBounded) {
+ boundedness = Boundedness.BOUNDED;
+ } else {
+ boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ final StreamSource<RowData, ?> sourceOperator = new
StreamSource<>(function);
+ return new LegacySourceTransformation<>(
+ operatorName, sourceOperator, outputTypeInfo, parallelism,
boundedness);
+ }
+
+ /**
* Creates a {@link Transformation} based on the given {@link
InputFormat}. The implementation
* is different for streaming mode and batch mode.
*/
@@ -115,5 +157,5 @@ public abstract class CommonExecTableSourceScan extends
ExecNodeBase<RowData>
StreamExecutionEnvironment env,
InputFormat<RowData, ?> inputFormat,
InternalTypeInfo<RowData> outputTypeInfo,
- String name);
+ String operatorName);
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 995b021..5176302 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -60,9 +60,9 @@ public class StreamExecTableSourceScan extends
CommonExecTableSourceScan
StreamExecutionEnvironment env,
InputFormat<RowData, ?> inputFormat,
InternalTypeInfo<RowData> outputTypeInfo,
- String name) {
+ String operatorName) {
// It's better to use StreamExecutionEnvironment.createInput()
// rather than addLegacySource() for streaming, because it take care
of checkpoint.
- return env.createInput(inputFormat,
outputTypeInfo).name(name).getTransformation();
+ return env.createInput(inputFormat,
outputTypeInfo).name(operatorName).getTransformation();
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
new file mode 100644
index 0000000..54bd51f
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.dag.Transformation;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.WithBoundedness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Various tests to check {@link Transformation}s that have been generated
from {@link ExecNode}s.
+ */
+public class TransformationsTest extends TableTestBase {
+
+ @Test
+ public void testLegacyBatchSource() {
+ final JavaStreamTableTestUtil util = javaStreamTestUtil();
+ final StreamTableEnvironment env = util.tableEnv();
+
+ final Table table =
+ env.from(
+ TableDescriptor.forConnector("values")
+ .option("bounded", "true")
+ .schema(dummySchema())
+ .build());
+
+ final LegacySourceTransformation<?> sourceTransform =
+ toLegacySourceTransformation(env, table);
+
+ assertBoundedness(Boundedness.BOUNDED, sourceTransform);
+ }
+
+ @Test
+ public void testLegacyStreamSource() {
+ final JavaStreamTableTestUtil util = javaStreamTestUtil();
+ final StreamTableEnvironment env = util.tableEnv();
+
+ final Table table =
+ env.from(
+ TableDescriptor.forConnector("values")
+ .option("bounded", "false")
+ .schema(dummySchema())
+ .build());
+
+ final LegacySourceTransformation<?> sourceTransform =
+ toLegacySourceTransformation(env, table);
+
+ assertBoundedness(Boundedness.CONTINUOUS_UNBOUNDED, sourceTransform);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Helper methods
+ //
--------------------------------------------------------------------------------------------
+
+ private static LegacySourceTransformation<?> toLegacySourceTransformation(
+ StreamTableEnvironment env, Table table) {
+ final Transformation<?> transform =
env.toChangelogStream(table).getTransformation();
+ assertFalse(transform.getInputs().isEmpty());
+
+ final Transformation<?> sourceTransform = transform.getInputs().get(0);
+ assertTrue(sourceTransform instanceof LegacySourceTransformation);
+ return (LegacySourceTransformation<?>) sourceTransform;
+ }
+
+ private static void assertBoundedness(Boundedness boundedness,
Transformation<?> transform) {
+ assertTrue(transform instanceof WithBoundedness);
+ assertEquals(boundedness, ((WithBoundedness)
transform).getBoundedness());
+ }
+
+ private static Schema dummySchema() {
+ return Schema.newBuilder().column("i", DataTypes.INT()).build();
+ }
+}