This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8e89abde72835c703ffb15f6ea716f8badd3591
Author: Xintong Song <[email protected]>
AuthorDate: Sun Jun 4 14:17:38 2023 +0800

    Change ProcessFunction to support arbitrary number of outputs.
---
 .../java/org/apache/flink/processfunction/api/ProcessFunction.java | 3 ++-
 .../java/org/apache/flink/processfunction/examples/SimpleMap.java  | 7 ++++---
 .../apache/flink/processfunction/examples/SimpleStatefulMap.java   | 6 +++---
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
index 796305fadb2..5448910b641 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
@@ -20,10 +20,11 @@ package org.apache.flink.processfunction.api;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.function.Consumer;
 
 @FunctionalInterface
 public interface ProcessFunction<IN, OUT> {
-    OUT processRecord(IN record, RuntimeContext ctx);
+    void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx);
 
     // Explicitly declares states upfront. See FLIP-22.
     default Map<String, StateDescriptor> usesStates() { // stateId -> 
stateDescriptor
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
index 5bcfe8592a5..207befab2f5 100644
--- 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
@@ -29,9 +29,10 @@ public class SimpleMap {
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
         env.tmpFromSupplierSource(System::currentTimeMillis)
                 .process(
-                        (tsLong, ctx) ->
-                                new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS")
-                                        .format(new Date(tsLong)))
+                        (tsLong, output, ctx) ->
+                                output.accept(
+                                        new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss.SSS")
+                                                .format(new Date(tsLong))))
                 // Don't use Lambda reference as PrintStream is not 
serializable.
                 .tmpToConsumerSink((tsStr) -> System.out.println(tsStr));
         env.execute();
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
index df759e8eb45..5f849dbf8e4 100644
--- 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
@@ -26,6 +26,7 @@ import org.apache.flink.processfunction.api.StateDescriptor;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /** Usage: Must be executed with flink-process-function and flink-dist jar in 
classpath. */
 public class SimpleStatefulMap {
@@ -43,13 +44,12 @@ public class SimpleStatefulMap {
         static final String STATE_ID = "lastTimestamp";
 
         @Override
-        public Long processRecord(Long record, RuntimeContext ctx) {
+        public void processRecord(Long record, Consumer<Long> output, 
RuntimeContext ctx) {
             State state = ctx.getState(STATE_ID);
             // TODO:
             //  long diff = record - state.getValue();
             //  state.setValue(record)
-            //  return diff;
-            return record;
+            //  output.accept(diff);
         }
 
         @Override

Reply via email to