This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1be9ad7a73f797be7ac86563e1a0072912d61391
Author: Weijie Guo <[email protected]>
AuthorDate: Wed May 31 14:38:08 2023 +0800

    Supports supplier source and consumer sink.
---
 .../processfunction/api/ExecutionEnvironment.java  |  5 ++-
 .../flink/processfunction/examples/SimpleMap.java  |  2 +-
 .../flink/processfunction/DataStreamImpl.java      | 36 +++++++++++++++++++-
 .../processfunction/ExecutionEnvironmentImpl.java  | 39 ++++++++++++++++++++--
 .../ConsumerSinkFunction.java}                     | 17 +++++++---
 .../connector/SupplierSourceFunction.java}         | 34 +++++++++++++------
 6 files changed, 112 insertions(+), 21 deletions(-)

diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
index 8f4d39427ce..0c8e726e222 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
@@ -31,6 +31,9 @@ public abstract class ExecutionEnvironment {
 
     public abstract void execute() throws Exception;
 
-    /** TODO: Temporal method. Will revisit source functions later. */
+    /**
+     * TODO: 1. Temporal method. Will revisit source functions later. 2. 
Refactor and move the type
+     * information related code to core-api module.
+     */
     public abstract <OUT> DataStream<OUT> 
tmpFromSupplierSource(SupplierFunction<OUT> supplier);
 }
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
index 172b62c2853..523815b8aec 100644
--- 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
@@ -20,7 +20,7 @@ package org.apache.flink.processfunction.examples;
 
 import org.apache.flink.processfunction.api.ExecutionEnvironment;
 
-/** Usage: Must be executed with flink-process-function jar in classpath. */
+/** Usage: Must be executed with flink-process-function and flink-dist jar in 
classpath. */
 public class SimpleMap {
     public static void main(String[] args) throws Exception {
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
index 72cb6d276f5..b926c7e1d60 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
@@ -18,12 +18,46 @@
 
 package org.apache.flink.processfunction;
 
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.processfunction.api.DataStream;
+import org.apache.flink.processfunction.connector.ConsumerSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ConsumerFunction;
 
 public class DataStreamImpl<T> implements DataStream<T> {
+    private final ExecutionEnvironmentImpl environment;
+    private final Transformation<T> transformation;
+
+    public DataStreamImpl(ExecutionEnvironmentImpl environment, 
Transformation<T> transformation) {
+        this.environment =
+                Preconditions.checkNotNull(environment, "Execution Environment 
must not be null.");
+        this.transformation =
+                Preconditions.checkNotNull(
+                        transformation, "Stream Transformation must not be 
null.");
+    }
+
     @Override
     public void tmpToConsumerSink(ConsumerFunction<T> consumer) {
-        // TODO: keep calling `consumer.accept()` at runtime
+        // read the output type of the input Transform to coax out errors 
about MissingTypeInfo
+        transformation.getOutputType();
+
+        ConsumerSinkFunction<T> sinkFunction = new 
ConsumerSinkFunction<>(consumer);
+
+        // TODO Supports clean closure
+        StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction);
+
+        PhysicalTransformation<T> sinkTransformation =
+                new LegacySinkTransformation<>(
+                        transformation,
+                        "Consumer Sink",
+                        sinkOperator,
+                        // TODO Supports configure parallelism
+                        1,
+                        true);
+
+        environment.addOperator(sinkTransformation);
     }
 }
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
index be0401df86c..16dca9d2b4e 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
@@ -19,7 +19,10 @@
 package org.apache.flink.processfunction;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.deployment.executors.LocalExecutorFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
@@ -28,11 +31,15 @@ import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.processfunction.api.DataStream;
 import org.apache.flink.processfunction.api.ExecutionEnvironment;
+import org.apache.flink.processfunction.connector.SupplierSourceFunction;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierFunction;
 
 import java.util.ArrayList;
@@ -64,8 +71,36 @@ public class ExecutionEnvironmentImpl extends 
ExecutionEnvironment {
 
     @Override
     public <OUT> DataStream<OUT> tmpFromSupplierSource(SupplierFunction<OUT> 
supplier) {
-        // TODO: keep calling `supplier.get()` at runtime
-        return new DataStreamImpl<>();
+        final String sourceName = "Supplier Source";
+        // TODO Supports clean closure
+        final SupplierSourceFunction<OUT> sourceFunction = new 
SupplierSourceFunction<>(supplier);
+        final TypeInformation<OUT> resolvedTypeInfo =
+                TypeExtractor.getUnaryOperatorReturnType(
+                        supplier,
+                        SupplierFunction.class,
+                        -1,
+                        0,
+                        TypeExtractor.NO_INDEX,
+                        null,
+                        null,
+                        false);
+
+        final StreamSource<OUT, ?> sourceOperator = new 
StreamSource<>(sourceFunction);
+        return new DataStreamImpl<>(
+                this,
+                new LegacySourceTransformation<>(
+                        sourceName,
+                        sourceOperator,
+                        resolvedTypeInfo,
+                        // TODO Supports configure parallelism
+                        1,
+                        Boundedness.CONTINUOUS_UNBOUNDED,
+                        true));
+    }
+
+    public void addOperator(Transformation<?> transformation) {
+        Preconditions.checkNotNull(transformation, "transformation must not be 
null.");
+        this.transformations.add(transformation);
     }
 
     // -----------------------------------------------
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/ConsumerSinkFunction.java
similarity index 64%
copy from 
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
copy to 
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/ConsumerSinkFunction.java
index 72cb6d276f5..996e5ec0e25 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/ConsumerSinkFunction.java
@@ -16,14 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.processfunction;
+package org.apache.flink.processfunction.connector;
 
-import org.apache.flink.processfunction.api.DataStream;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.util.function.ConsumerFunction;
 
-public class DataStreamImpl<T> implements DataStream<T> {
+/** Legacy implementation for ConsumerSink. */
+public class ConsumerSinkFunction<IN> implements SinkFunction<IN> {
+    private final ConsumerFunction<IN> dataConsumer;
+
+    public ConsumerSinkFunction(ConsumerFunction<IN> dataConsumer) {
+        this.dataConsumer = dataConsumer;
+    }
+
     @Override
-    public void tmpToConsumerSink(ConsumerFunction<T> consumer) {
-        // TODO: keep calling `consumer.accept()` at runtime
+    public void invoke(IN value, Context context) {
+        dataConsumer.accept(value);
     }
 }
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/SupplierSourceFunction.java
similarity index 52%
copy from 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
copy to 
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/SupplierSourceFunction.java
index 8f4d39427ce..044dc4c6c1d 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/SupplierSourceFunction.java
@@ -16,21 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.flink.processfunction.api;
+package org.apache.flink.processfunction.connector;
 
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.function.SupplierFunction;
 
-public abstract class ExecutionEnvironment {
-    public static ExecutionEnvironment getExecutionEnvironment()
-            throws ReflectiveOperationException {
-        return (ExecutionEnvironment)
-                
Class.forName("org.apache.flink.processfunction.ExecutionEnvironmentImpl")
-                        .getMethod("newInstance")
-                        .invoke(null);
+/** Legacy implementation for SupplierSource. */
+public class SupplierSourceFunction<OUT> implements SourceFunction<OUT> {
+    private volatile boolean isRunning = true;
+
+    private final SupplierFunction<OUT> dataSupplier;
+
+    public SupplierSourceFunction(SupplierFunction<OUT> dataSupplier) {
+        this.dataSupplier = dataSupplier;
     }
 
-    public abstract void execute() throws Exception;
+    @Override
+    public void run(SourceContext<OUT> ctx) throws Exception {
+        while (isRunning) {
+            OUT data = dataSupplier.get();
+            ctx.collect(data);
+            //noinspection BusyWait
+            Thread.sleep(1000L);
+        }
+    }
 
-    /** TODO: Temporal method. Will revisit source functions later. */
-    public abstract <OUT> DataStream<OUT> 
tmpFromSupplierSource(SupplierFunction<OUT> supplier);
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
 }

Reply via email to