This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8be47a72cab6ce5304920e65e51a521ee5c96103 Author: Xintong Song <[email protected]> AuthorDate: Sun Jun 4 11:47:36 2023 +0800 Introduce ProcessFunction interface. --- .../java/org/apache/flink/processfunction/api/DataStream.java | 3 +++ .../api/{DataStream.java => ProcessFunction.java} | 8 +++----- .../org/apache/flink/processfunction/examples/SimpleMap.java | 9 ++++++++- .../java/org/apache/flink/processfunction/DataStreamImpl.java | 7 +++++++ 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java index 70a0a5aa040..47646b95bfe 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java @@ -21,6 +21,9 @@ package org.apache.flink.processfunction.api; import org.apache.flink.util.function.ConsumerFunction; public interface DataStream<T> { + + <OUT> DataStream<OUT> process(ProcessFunction<T, OUT> processFunction); + /** TODO: Temporal method. Will revisit sink functions later. */ void tmpToConsumerSink(ConsumerFunction<T> consumer); } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java similarity index 79% copy from flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java copy to flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java index 70a0a5aa040..3058727c4df 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java @@ -18,9 +18,7 @@ package org.apache.flink.processfunction.api; -import org.apache.flink.util.function.ConsumerFunction; - -public interface DataStream<T> { - /** TODO: Temporal method. Will revisit sink functions later. */ - void tmpToConsumerSink(ConsumerFunction<T> consumer); +@FunctionalInterface +public interface ProcessFunction<IN, OUT> { + OUT processRecord(IN record); } diff --git a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java index 523815b8aec..35f0a94ddf1 100644 --- a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java +++ b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java @@ -20,13 +20,20 @@ package org.apache.flink.processfunction.examples; import org.apache.flink.processfunction.api.ExecutionEnvironment; +import java.text.SimpleDateFormat; +import java.util.Date; + /** Usage: Must be executed with flink-process-function and flink-dist jar in classpath. */ public class SimpleMap { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.tmpFromSupplierSource(System::currentTimeMillis) + .process( + tsLong -> + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS") + .format(new Date(tsLong))) // Don't use Lambda reference as PrintStream is not serializable. - .tmpToConsumerSink((data) -> System.out.println(data)); + .tmpToConsumerSink((tsStr) -> System.out.println(tsStr)); env.execute(); } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java index b926c7e1d60..98b05356b95 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java @@ -20,6 +20,7 @@ package org.apache.flink.processfunction; import org.apache.flink.api.dag.Transformation; import org.apache.flink.processfunction.api.DataStream; +import org.apache.flink.processfunction.api.ProcessFunction; import org.apache.flink.processfunction.connector.ConsumerSinkFunction; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; @@ -39,6 +40,12 @@ public class DataStreamImpl<T> implements DataStream<T> { transformation, "Stream Transformation must not be null."); } + @Override + public <OUT> DataStream<OUT> process(ProcessFunction<T, OUT> processFunction) { + // TODO: Add implementation that calls processFunction.processRecord() in runtime + return null; + } + @Override public void tmpToConsumerSink(ConsumerFunction<T> consumer) { // read the output type of the input Transform to coax out errors about MissingTypeInfo
