This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8be47a72cab6ce5304920e65e51a521ee5c96103
Author: Xintong Song <[email protected]>
AuthorDate: Sun Jun 4 11:47:36 2023 +0800

    Introduce ProcessFunction interface.
---
 .../java/org/apache/flink/processfunction/api/DataStream.java    | 3 +++
 .../api/{DataStream.java => ProcessFunction.java}                | 8 +++-----
 .../org/apache/flink/processfunction/examples/SimpleMap.java     | 9 ++++++++-
 .../java/org/apache/flink/processfunction/DataStreamImpl.java    | 7 +++++++
 4 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
index 70a0a5aa040..47646b95bfe 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
@@ -21,6 +21,9 @@ package org.apache.flink.processfunction.api;
 import org.apache.flink.util.function.ConsumerFunction;
 
 public interface DataStream<T> {
+
+    <OUT> DataStream<OUT> process(ProcessFunction<T, OUT> processFunction);
+
     /** TODO: Temporal method. Will revisit sink functions later. */
     void tmpToConsumerSink(ConsumerFunction<T> consumer);
 }
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
similarity index 79%
copy from 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
copy to 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
index 70a0a5aa040..3058727c4df 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
@@ -18,9 +18,7 @@
 
 package org.apache.flink.processfunction.api;
 
-import org.apache.flink.util.function.ConsumerFunction;
-
-public interface DataStream<T> {
-    /** TODO: Temporal method. Will revisit sink functions later. */
-    void tmpToConsumerSink(ConsumerFunction<T> consumer);
+@FunctionalInterface
+public interface ProcessFunction<IN, OUT> {
+    OUT processRecord(IN record);
 }
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
index 523815b8aec..35f0a94ddf1 100644
--- 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
@@ -20,13 +20,20 @@ package org.apache.flink.processfunction.examples;
 
 import org.apache.flink.processfunction.api.ExecutionEnvironment;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
 /** Usage: Must be executed with flink-process-function and flink-dist jar in 
classpath. */
 public class SimpleMap {
     public static void main(String[] args) throws Exception {
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
         env.tmpFromSupplierSource(System::currentTimeMillis)
+                .process(
+                        tsLong ->
+                                new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS")
+                                        .format(new Date(tsLong)))
                 // Don't use Lambda reference as PrintStream is not 
serializable.
-                .tmpToConsumerSink((data) -> System.out.println(data));
+                .tmpToConsumerSink((tsStr) -> System.out.println(tsStr));
         env.execute();
     }
 }
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
index b926c7e1d60..98b05356b95 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.processfunction;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.processfunction.api.DataStream;
+import org.apache.flink.processfunction.api.ProcessFunction;
 import org.apache.flink.processfunction.connector.ConsumerSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
@@ -39,6 +40,12 @@ public class DataStreamImpl<T> implements DataStream<T> {
                         transformation, "Stream Transformation must not be 
null.");
     }
 
+    @Override
+    public <OUT> DataStream<OUT> process(ProcessFunction<T, OUT> 
processFunction) {
+        // TODO: Add implementation that calls processFunction.processRecord() 
in runtime
+        return null;
+    }
+
     @Override
     public void tmpToConsumerSink(ConsumerFunction<T> consumer) {
         // read the output type of the input Transform to coax out errors 
about MissingTypeInfo

Reply via email to