This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit 28bd60f0f846bd9ba30cee5a3ccfd9eba1d90819 Author: Weijie Guo <[email protected]> AuthorDate: Tue Jun 6 11:34:59 2023 +0800 Implements process operator to handle user process function. --- .../flink/processfunction/api/ProcessFunction.java | 4 +- .../flink/processfunction/examples/SimpleMap.java | 14 +++- .../flink/processfunction/DataStreamImpl.java | 55 +++++++++++++++- .../processfunction/operators/ProcessOperator.java | 76 ++++++++++++++++++++++ 4 files changed, 144 insertions(+), 5 deletions(-) diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java index 5448910b641..55919cd8307 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java @@ -18,12 +18,14 @@ package org.apache.flink.processfunction.api; +import org.apache.flink.api.common.functions.Function; + import java.util.Collections; import java.util.Map; import java.util.function.Consumer; @FunctionalInterface -public interface ProcessFunction<IN, OUT> { +public interface ProcessFunction<IN, OUT> extends Function { void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx); // Explicitly declares states upfront. See FLIP-22. diff --git a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java index 207befab2f5..78b06e881c0 100644 --- a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java +++ b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java @@ -19,9 +19,12 @@ package org.apache.flink.processfunction.examples; import org.apache.flink.processfunction.api.ExecutionEnvironment; +import org.apache.flink.processfunction.api.ProcessFunction; +import org.apache.flink.processfunction.api.RuntimeContext; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.function.Consumer; /** Usage: Must be executed with flink-process-function and flink-dist jar in classpath. */ public class SimpleMap { @@ -29,10 +32,17 @@ public class SimpleMap { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.tmpFromSupplierSource(System::currentTimeMillis) .process( - (tsLong, output, ctx) -> + // Lambda expression can not work here. Due to generic erasure, we cannot + // infer the type of Consumer<T> from it. + new ProcessFunction<Long, String>() { + @Override + public void processRecord( + Long record, Consumer<String> output, RuntimeContext ctx) { output.accept( new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS") - .format(new Date(tsLong)))) + .format(new Date(record))); + } + }) // Don't use Lambda reference as PrintStream is not serializable. .tmpToConsumerSink((tsStr) -> System.out.println(tsStr)); env.execute(); diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java index 98b05356b95..0370a018db9 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java @@ -18,12 +18,19 @@ package org.apache.flink.processfunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.processfunction.api.DataStream; import org.apache.flink.processfunction.api.ProcessFunction; import org.apache.flink.processfunction.connector.ConsumerSinkFunction; +import org.apache.flink.processfunction.operators.ProcessOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ConsumerFunction; @@ -42,8 +49,19 @@ public class DataStreamImpl<T> implements DataStream<T> { @Override public <OUT> DataStream<OUT> process(ProcessFunction<T, OUT> processFunction) { - // TODO: Add implementation that calls processFunction.processRecord() in runtime - return null; + TypeInformation<OUT> outType = + TypeExtractor.getUnaryOperatorReturnType( + processFunction, + ProcessFunction.class, + 0, + 1, + new int[] {1, 0}, + getType(), + Utils.getCallLocationName(), + true); + ProcessOperator<T, OUT> operator = new ProcessOperator<>(processFunction); + + return transform("Process", outType, operator); } @Override @@ -67,4 +85,37 @@ public class DataStreamImpl<T> implements DataStream<T> { environment.addOperator(sinkTransformation); } + + /** + * Gets the type of the stream. + * + * @return The type of the DataStream. + */ + private TypeInformation<T> getType() { + return transformation.getOutputType(); + } + + private <R> DataStream<R> transform( + String operatorName, + TypeInformation<R> outputTypeInfo, + OneInputStreamOperator<T, R> operator) { + // read the output type of the input Transform to coax out errors about MissingTypeInfo + transformation.getOutputType(); + + OneInputTransformation<T, R> resultTransform = + new OneInputTransformation<>( + this.transformation, + operatorName, + SimpleUdfStreamOperatorFactory.of(operator), + outputTypeInfo, + // TODO Supports set parallelism. + 1, + true); + + DataStream<R> returnStream = new DataStreamImpl<>(environment, resultTransform); + + environment.addOperator(resultTransform); + + return returnStream; + } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java new file mode 100644 index 00000000000..70700ede5eb --- /dev/null +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.processfunction.operators; + +import org.apache.flink.processfunction.api.ProcessFunction; +import org.apache.flink.processfunction.api.RuntimeContext; +import org.apache.flink.processfunction.api.State; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.function.Consumer; + +/** Operator for {@link org.apache.flink.processfunction.api.ProcessFunction}. */ +public class ProcessOperator<IN, OUT> + extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>> + implements OneInputStreamOperator<IN, OUT> { + + private transient RuntimeContext context; + + private transient OutputCollector outputCollector; + + public ProcessOperator(ProcessFunction<IN, OUT> userFunction) { + super(userFunction); + + chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + context = new ContextImpl(); + outputCollector = new OutputCollector(); + } + + @Override + public void processElement(StreamRecord<IN> element) { + userFunction.processRecord(element.getValue(), outputCollector, context); + } + + private class OutputCollector implements Consumer<OUT> { + + private final StreamRecord<OUT> reuse = new StreamRecord<>(null); + + @Override + public void accept(OUT outputRecord) { + output.collect(reuse.replace(outputRecord)); + } + } + + private static class ContextImpl implements RuntimeContext { + + @Override + public State getState(String stateId) { + // TODO return state. + return null; + } + } +}
