This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit f8e89abde72835c703ffb15f6ea716f8badd3591 Author: Xintong Song <[email protected]> AuthorDate: Sun Jun 4 14:17:38 2023 +0800 Change ProcessFunction to support arbitrary number of outputs. --- .../java/org/apache/flink/processfunction/api/ProcessFunction.java | 3 ++- .../java/org/apache/flink/processfunction/examples/SimpleMap.java | 7 ++++--- .../apache/flink/processfunction/examples/SimpleStatefulMap.java | 6 +++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java index 796305fadb2..5448910b641 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java @@ -20,10 +20,11 @@ package org.apache.flink.processfunction.api; import java.util.Collections; import java.util.Map; +import java.util.function.Consumer; @FunctionalInterface public interface ProcessFunction<IN, OUT> { - OUT processRecord(IN record, RuntimeContext ctx); + void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx); // Explicitly declares states upfront. See FLIP-22. default Map<String, StateDescriptor> usesStates() { // stateId -> stateDescriptor diff --git a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java index 5bcfe8592a5..207befab2f5 100644 --- a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java +++ b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java @@ -29,9 +29,10 @@ public class SimpleMap { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.tmpFromSupplierSource(System::currentTimeMillis) .process( - (tsLong, ctx) -> - new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS") - .format(new Date(tsLong))) + (tsLong, output, ctx) -> + output.accept( + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS") + .format(new Date(tsLong)))) // Don't use Lambda reference as PrintStream is not serializable. .tmpToConsumerSink((tsStr) -> System.out.println(tsStr)); env.execute(); diff --git a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java index df759e8eb45..5f849dbf8e4 100644 --- a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java +++ b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java @@ -26,6 +26,7 @@ import org.apache.flink.processfunction.api.StateDescriptor; import java.util.Collections; import java.util.Map; +import java.util.function.Consumer; /** Usage: Must be executed with flink-process-function and flink-dist jar in classpath. */ public class SimpleStatefulMap { @@ -43,13 +44,12 @@ public class SimpleStatefulMap { static final String STATE_ID = "lastTimestamp"; @Override - public Long processRecord(Long record, RuntimeContext ctx) { + public void processRecord(Long record, Consumer<Long> output, RuntimeContext ctx) { State state = ctx.getState(STATE_ID); // TODO: // long diff = record - state.getValue(); // state.setValue(record) - // return diff; - return record; + // output.accept(diff); } @Override
