This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 28bd60f0f846bd9ba30cee5a3ccfd9eba1d90819
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Jun 6 11:34:59 2023 +0800

    Implements process operator to handle user process function.
---
 .../flink/processfunction/api/ProcessFunction.java |  4 +-
 .../flink/processfunction/examples/SimpleMap.java  | 14 +++-
 .../flink/processfunction/DataStreamImpl.java      | 55 +++++++++++++++-
 .../processfunction/operators/ProcessOperator.java | 76 ++++++++++++++++++++++
 4 files changed, 144 insertions(+), 5 deletions(-)

diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
index 5448910b641..55919cd8307 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.processfunction.api;
 
+import org.apache.flink.api.common.functions.Function;
+
 import java.util.Collections;
 import java.util.Map;
 import java.util.function.Consumer;
 
 @FunctionalInterface
-public interface ProcessFunction<IN, OUT> {
+public interface ProcessFunction<IN, OUT> extends Function {
     void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx);
 
     // Explicitly declares states upfront. See FLIP-22.
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
index 207befab2f5..78b06e881c0 100644
--- 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
@@ -19,9 +19,12 @@
 package org.apache.flink.processfunction.examples;
 
 import org.apache.flink.processfunction.api.ExecutionEnvironment;
+import org.apache.flink.processfunction.api.ProcessFunction;
+import org.apache.flink.processfunction.api.RuntimeContext;
 
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.function.Consumer;
 
 /** Usage: Must be executed with flink-process-function and flink-dist jar in 
classpath. */
 public class SimpleMap {
@@ -29,10 +32,17 @@ public class SimpleMap {
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
         env.tmpFromSupplierSource(System::currentTimeMillis)
                 .process(
-                        (tsLong, output, ctx) ->
+                        // Lambda expression can not work here. Due to generic 
erasure, we cannot
+                        // infer the type of Consumer<T> from it.
+                        new ProcessFunction<Long, String>() {
+                            @Override
+                            public void processRecord(
+                                    Long record, Consumer<String> output, 
RuntimeContext ctx) {
                                 output.accept(
                                         new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss.SSS")
-                                                .format(new Date(tsLong))))
+                                                .format(new Date(record)));
+                            }
+                        })
                 // Don't use Lambda reference as PrintStream is not 
serializable.
                 .tmpToConsumerSink((tsStr) -> System.out.println(tsStr));
         env.execute();
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
index 98b05356b95..0370a018db9 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
@@ -18,12 +18,19 @@
 
 package org.apache.flink.processfunction;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.processfunction.api.DataStream;
 import org.apache.flink.processfunction.api.ProcessFunction;
 import org.apache.flink.processfunction.connector.ConsumerSinkFunction;
+import org.apache.flink.processfunction.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ConsumerFunction;
@@ -42,8 +49,19 @@ public class DataStreamImpl<T> implements DataStream<T> {
 
     @Override
     public <OUT> DataStream<OUT> process(ProcessFunction<T, OUT> 
processFunction) {
-        // TODO: Add implementation that calls processFunction.processRecord() 
in runtime
-        return null;
+        TypeInformation<OUT> outType =
+                TypeExtractor.getUnaryOperatorReturnType(
+                        processFunction,
+                        ProcessFunction.class,
+                        0,
+                        1,
+                        new int[] {1, 0},
+                        getType(),
+                        Utils.getCallLocationName(),
+                        true);
+        ProcessOperator<T, OUT> operator = new 
ProcessOperator<>(processFunction);
+
+        return transform("Process", outType, operator);
     }
 
     @Override
@@ -67,4 +85,37 @@ public class DataStreamImpl<T> implements DataStream<T> {
 
         environment.addOperator(sinkTransformation);
     }
+
+    /**
+     * Gets the type of the stream.
+     *
+     * @return The type of the DataStream.
+     */
+    private TypeInformation<T> getType() {
+        return transformation.getOutputType();
+    }
+
+    private <R> DataStream<R> transform(
+            String operatorName,
+            TypeInformation<R> outputTypeInfo,
+            OneInputStreamOperator<T, R> operator) {
+        // read the output type of the input Transform to coax out errors 
about MissingTypeInfo
+        transformation.getOutputType();
+
+        OneInputTransformation<T, R> resultTransform =
+                new OneInputTransformation<>(
+                        this.transformation,
+                        operatorName,
+                        SimpleUdfStreamOperatorFactory.of(operator),
+                        outputTypeInfo,
+                        // TODO Supports set parallelism.
+                        1,
+                        true);
+
+        DataStream<R> returnStream = new DataStreamImpl<>(environment, 
resultTransform);
+
+        environment.addOperator(resultTransform);
+
+        return returnStream;
+    }
 }
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java
new file mode 100644
index 00000000000..70700ede5eb
--- /dev/null
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.processfunction.operators;
+
+import org.apache.flink.processfunction.api.ProcessFunction;
+import org.apache.flink.processfunction.api.RuntimeContext;
+import org.apache.flink.processfunction.api.State;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.function.Consumer;
+
+/** Operator for {@link org.apache.flink.processfunction.api.ProcessFunction}. 
*/
+public class ProcessOperator<IN, OUT>
+        extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+        implements OneInputStreamOperator<IN, OUT> {
+
+    private transient RuntimeContext context;
+
+    private transient OutputCollector outputCollector;
+
+    public ProcessOperator(ProcessFunction<IN, OUT> userFunction) {
+        super(userFunction);
+
+        chainingStrategy = ChainingStrategy.ALWAYS;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        context = new ContextImpl();
+        outputCollector = new OutputCollector();
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> element) {
+        userFunction.processRecord(element.getValue(), outputCollector, 
context);
+    }
+
+    private class OutputCollector implements Consumer<OUT> {
+
+        private final StreamRecord<OUT> reuse = new StreamRecord<>(null);
+
+        @Override
+        public void accept(OUT outputRecord) {
+            output.collect(reuse.replace(outputRecord));
+        }
+    }
+
+    private static class ContextImpl implements RuntimeContext {
+
+        @Override
+        public State getState(String stateId) {
+            // TODO return state.
+            return null;
+        }
+    }
+}

Reply via email to