This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cd3554  [FLINK-23370][table-planner] Propagate Boundedness of 
ScanRuntimeProviders to Transformation
7cd3554 is described below

commit 7cd3554899fa97e3f27202f7ad42cb06cde38198
Author: Timo Walther <[email protected]>
AuthorDate: Wed Jul 14 08:43:53 2021 +0200

    [FLINK-23370][table-planner] Propagate Boundedness of ScanRuntimeProviders 
to Transformation
    
    This closes #16492.
---
 .../nodes/exec/batch/BatchExecTableSourceScan.java |   7 +-
 .../exec/common/CommonExecTableSourceScan.java     |  52 ++++++++++-
 .../exec/stream/StreamExecTableSourceScan.java     |   4 +-
 .../plan/nodes/exec/TransformationsTest.java       | 102 +++++++++++++++++++++
 4 files changed, 155 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
index d34947f..4f9cd19 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java
@@ -47,13 +47,14 @@ public class BatchExecTableSourceScan extends 
CommonExecTableSourceScan
             StreamExecutionEnvironment env,
             InputFormat<RowData, ?> inputFormat,
             InternalTypeInfo<RowData> outputTypeInfo,
-            String name) {
+            String operatorName) {
         // env.createInput will use ContinuousFileReaderOperator, but it do 
not support multiple
         // paths. If read partitioned source, after partition pruning, we need 
let InputFormat
         // to read multiple partitions which are multiple paths.
         // We can use InputFormatSourceFunction directly to support 
InputFormat.
-        InputFormatSourceFunction<RowData> func =
+        final InputFormatSourceFunction<RowData> function =
                 new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);
-        return env.addSource(func, name, outputTypeInfo).getTransformation();
+        return createSourceFunctionTransformation(
+                env, function, true, operatorName, outputTypeInfo);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index 2911373..f289bbe 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -20,10 +20,15 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -78,11 +83,16 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
         ScanTableSource.ScanRuntimeProvider provider =
                 
tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         if (provider instanceof SourceFunctionProvider) {
-            SourceFunction<RowData> sourceFunction =
-                    ((SourceFunctionProvider) provider).createSourceFunction();
-            return env.addSource(sourceFunction, operatorName, 
outputTypeInfo).getTransformation();
+            final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
+            final SourceFunction<RowData> function = 
sourceFunctionProvider.createSourceFunction();
+            return createSourceFunctionTransformation(
+                    env,
+                    function,
+                    sourceFunctionProvider.isBounded(),
+                    operatorName,
+                    outputTypeInfo);
         } else if (provider instanceof InputFormatProvider) {
-            InputFormat<RowData, ?> inputFormat =
+            final InputFormat<RowData, ?> inputFormat =
                     ((InputFormatProvider) provider).createInputFormat();
             return createInputFormatTransformation(env, inputFormat, 
outputTypeInfo, operatorName);
         } else if (provider instanceof SourceProvider) {
@@ -108,6 +118,38 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
     }
 
     /**
+     * Adopted from {@link 
StreamExecutionEnvironment#addSource(SourceFunction, String,
+     * TypeInformation)} but with custom {@link Boundedness}.
+     */
+    protected Transformation<RowData> createSourceFunctionTransformation(
+            StreamExecutionEnvironment env,
+            SourceFunction<RowData> function,
+            boolean isBounded,
+            String operatorName,
+            TypeInformation<RowData> outputTypeInfo) {
+
+        env.clean(function);
+
+        final int parallelism;
+        if (function instanceof ParallelSourceFunction) {
+            parallelism = env.getParallelism();
+        } else {
+            parallelism = 1;
+        }
+
+        final Boundedness boundedness;
+        if (isBounded) {
+            boundedness = Boundedness.BOUNDED;
+        } else {
+            boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        }
+
+        final StreamSource<RowData, ?> sourceOperator = new 
StreamSource<>(function);
+        return new LegacySourceTransformation<>(
+                operatorName, sourceOperator, outputTypeInfo, parallelism, 
boundedness);
+    }
+
+    /**
      * Creates a {@link Transformation} based on the given {@link 
InputFormat}. The implementation
      * is different for streaming mode and batch mode.
      */
@@ -115,5 +157,5 @@ public abstract class CommonExecTableSourceScan extends 
ExecNodeBase<RowData>
             StreamExecutionEnvironment env,
             InputFormat<RowData, ?> inputFormat,
             InternalTypeInfo<RowData> outputTypeInfo,
-            String name);
+            String operatorName);
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
index 995b021..5176302 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -60,9 +60,9 @@ public class StreamExecTableSourceScan extends 
CommonExecTableSourceScan
             StreamExecutionEnvironment env,
             InputFormat<RowData, ?> inputFormat,
             InternalTypeInfo<RowData> outputTypeInfo,
-            String name) {
+            String operatorName) {
         // It's better to use StreamExecutionEnvironment.createInput()
         // rather than addLegacySource() for streaming, because it take care 
of checkpoint.
-        return env.createInput(inputFormat, 
outputTypeInfo).name(name).getTransformation();
+        return env.createInput(inputFormat, 
outputTypeInfo).name(operatorName).getTransformation();
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
new file mode 100644
index 0000000..54bd51f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.streaming.api.transformations.WithBoundedness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Various tests to check {@link Transformation}s that have been generated 
from {@link ExecNode}s.
+ */
+public class TransformationsTest extends TableTestBase {
+
+    @Test
+    public void testLegacyBatchSource() {
+        final JavaStreamTableTestUtil util = javaStreamTestUtil();
+        final StreamTableEnvironment env = util.tableEnv();
+
+        final Table table =
+                env.from(
+                        TableDescriptor.forConnector("values")
+                                .option("bounded", "true")
+                                .schema(dummySchema())
+                                .build());
+
+        final LegacySourceTransformation<?> sourceTransform =
+                toLegacySourceTransformation(env, table);
+
+        assertBoundedness(Boundedness.BOUNDED, sourceTransform);
+    }
+
+    @Test
+    public void testLegacyStreamSource() {
+        final JavaStreamTableTestUtil util = javaStreamTestUtil();
+        final StreamTableEnvironment env = util.tableEnv();
+
+        final Table table =
+                env.from(
+                        TableDescriptor.forConnector("values")
+                                .option("bounded", "false")
+                                .schema(dummySchema())
+                                .build());
+
+        final LegacySourceTransformation<?> sourceTransform =
+                toLegacySourceTransformation(env, table);
+
+        assertBoundedness(Boundedness.CONTINUOUS_UNBOUNDED, sourceTransform);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper methods
+    // 
--------------------------------------------------------------------------------------------
+
+    private static LegacySourceTransformation<?> toLegacySourceTransformation(
+            StreamTableEnvironment env, Table table) {
+        final Transformation<?> transform = 
env.toChangelogStream(table).getTransformation();
+        assertFalse(transform.getInputs().isEmpty());
+
+        final Transformation<?> sourceTransform = transform.getInputs().get(0);
+        assertTrue(sourceTransform instanceof LegacySourceTransformation);
+        return (LegacySourceTransformation<?>) sourceTransform;
+    }
+
+    private static void assertBoundedness(Boundedness boundedness, 
Transformation<?> transform) {
+        assertTrue(transform instanceof WithBoundedness);
+        assertEquals(boundedness, ((WithBoundedness) 
transform).getBoundedness());
+    }
+
+    private static Schema dummySchema() {
+        return Schema.newBuilder().column("i", DataTypes.INT()).build();
+    }
+}

Reply via email to