This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e8d39a60578f1e1ddfdc5214aac33d3fe1497a4
Author: Xintong Song <[email protected]>
AuthorDate: Tue May 30 20:03:51 2023 +0800

    Introduce DataStream, as well as supplier-source & consumer-sink for 
testing purpose.
---
 .../org/apache/flink/processfunction/api/DataStream.java    | 12 +++++-------
 .../flink/processfunction/api/ExecutionEnvironment.java     |  7 ++++++-
 .../apache/flink/processfunction/examples/SimpleMap.java    |  4 +++-
 .../{ExecutionEnvironmentImpl.java => DataStreamImpl.java}  | 12 +++++-------
 .../flink/processfunction/ExecutionEnvironmentImpl.java     | 13 +++++++++++--
 5 files changed, 30 insertions(+), 18 deletions(-)

diff --git 
a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
 
b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
similarity index 69%
copy from 
flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
copy to 
flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
index 531b87e214e..775b95faee3 100644
--- 
a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
+++ 
b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
@@ -16,13 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.processfunction.examples;
+package org.apache.flink.processfunction.api;
 
-import org.apache.flink.processfunction.api.ExecutionEnvironment;
+import java.util.function.Consumer;
 
-/** Usage: Must be executed with flink-process-function jar in classpath. */
-public class SimpleMap {
-    public static void main(String[] args) throws Exception {
-        ExecutionEnvironment.getExecutionEnvironment().foo();
-    }
+public interface DataStream<T> {
+    /** TODO: Temporal method. Will revisit sink functions later. */
+    void tmpToConsumerSink(Consumer<T> consumer);
 }
diff --git 
a/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
 
b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
index a13d60d9385..6335c9e33b8 100644
--- 
a/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
+++ 
b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.processfunction.api;
 
+import java.util.function.Supplier;
+
 public abstract class ExecutionEnvironment {
     public static ExecutionEnvironment getExecutionEnvironment()
             throws ReflectiveOperationException {
@@ -27,5 +29,8 @@ public abstract class ExecutionEnvironment {
                         .invoke(null);
     }
 
-    public abstract void foo();
+    public abstract void execute();
+
+    /** TODO: Temporal method. Will revisit source functions later. */
+    public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> 
supplier);
 }
diff --git 
a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
 
b/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
index 531b87e214e..a9f7212ba88 100644
--- 
a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
+++ 
b/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.processfunction.api.ExecutionEnvironment;
 /** Usage: Must be executed with flink-process-function jar in classpath. */
 public class SimpleMap {
     public static void main(String[] args) throws Exception {
-        ExecutionEnvironment.getExecutionEnvironment().foo();
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        
env.tmpFromSupplierSource(System::currentTimeMillis).tmpToConsumerSink(System.out::println);
+        env.execute();
     }
 }
diff --git 
a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
 
b/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
similarity index 72%
copy from 
flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
copy to 
flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
index 8ea8f33a093..83c679b00e5 100644
--- 
a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
+++ 
b/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
@@ -18,15 +18,13 @@
 
 package org.apache.flink.processfunction;
 
-import org.apache.flink.processfunction.api.ExecutionEnvironment;
+import org.apache.flink.processfunction.api.DataStream;
 
-public class ExecutionEnvironmentImpl extends ExecutionEnvironment {
-    public static ExecutionEnvironmentImpl newInstance() {
-        return new ExecutionEnvironmentImpl();
-    }
+import java.util.function.Consumer;
 
+public class DataStreamImpl<T> implements DataStream<T> {
     @Override
-    public void foo() {
-        System.out.println(getClass().getSimpleName());
+    public void tmpToConsumerSink(Consumer<T> consumer) {
+        // TODO: keep calling `consumer.accept()` at runtime
     }
 }
diff --git 
a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
 
b/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
index 8ea8f33a093..ee366d47b9d 100644
--- 
a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
+++ 
b/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
@@ -18,15 +18,24 @@
 
 package org.apache.flink.processfunction;
 
+import org.apache.flink.processfunction.api.DataStream;
 import org.apache.flink.processfunction.api.ExecutionEnvironment;
 
+import java.util.function.Supplier;
+
 public class ExecutionEnvironmentImpl extends ExecutionEnvironment {
     public static ExecutionEnvironmentImpl newInstance() {
         return new ExecutionEnvironmentImpl();
     }
 
     @Override
-    public void foo() {
-        System.out.println(getClass().getSimpleName());
+    public void execute() {
+        // TODO: submit job for execution
+    }
+
+    @Override
+    public <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> supplier) 
{
+        // TODO: keep calling `supplier.get()` at runtime
+        return new DataStreamImpl<>();
     }
 }

Reply via email to