This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1be9ad7a73f797be7ac86563e1a0072912d61391 Author: Weijie Guo <[email protected]> AuthorDate: Wed May 31 14:38:08 2023 +0800 Supports supplier source and consumer sink. --- .../processfunction/api/ExecutionEnvironment.java | 5 ++- .../flink/processfunction/examples/SimpleMap.java | 2 +- .../flink/processfunction/DataStreamImpl.java | 36 +++++++++++++++++++- .../processfunction/ExecutionEnvironmentImpl.java | 39 ++++++++++++++++++++-- .../ConsumerSinkFunction.java} | 17 +++++++--- .../connector/SupplierSourceFunction.java} | 34 +++++++++++++------ 6 files changed, 112 insertions(+), 21 deletions(-) diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java index 8f4d39427ce..0c8e726e222 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java @@ -31,6 +31,9 @@ public abstract class ExecutionEnvironment { public abstract void execute() throws Exception; - /** TODO: Temporal method. Will revisit source functions later. */ + /** + * TODO: 1. Temporal method. Will revisit source functions later. 2. Refactor and move the type + * information related code to core-api module. + */ public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(SupplierFunction<OUT> supplier); } diff --git a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java index 172b62c2853..523815b8aec 100644 --- a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java +++ b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java @@ -20,7 +20,7 @@ package org.apache.flink.processfunction.examples; import org.apache.flink.processfunction.api.ExecutionEnvironment; -/** Usage: Must be executed with flink-process-function jar in classpath. */ +/** Usage: Must be executed with flink-process-function and flink-dist jar in classpath. */ public class SimpleMap { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java index 72cb6d276f5..b926c7e1d60 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java @@ -18,12 +18,46 @@ package org.apache.flink.processfunction; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.processfunction.api.DataStream; +import org.apache.flink.processfunction.connector.ConsumerSinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.transformations.PhysicalTransformation; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.ConsumerFunction; public class DataStreamImpl<T> implements DataStream<T> { + private final ExecutionEnvironmentImpl environment; + private final Transformation<T> transformation; + + public DataStreamImpl(ExecutionEnvironmentImpl environment, Transformation<T> transformation) { + this.environment = + Preconditions.checkNotNull(environment, "Execution Environment must not be null."); + this.transformation = + Preconditions.checkNotNull( + transformation, "Stream Transformation must not be null."); + } + @Override public void tmpToConsumerSink(ConsumerFunction<T> consumer) { - // TODO: keep calling `consumer.accept()` at runtime + // read the output type of the input Transform to coax out errors about MissingTypeInfo + transformation.getOutputType(); + + ConsumerSinkFunction<T> sinkFunction = new ConsumerSinkFunction<>(consumer); + + // TODO Supports clean closure + StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction); + + PhysicalTransformation<T> sinkTransformation = + new LegacySinkTransformation<>( + transformation, + "Consumer Sink", + sinkOperator, + // TODO Supports configure parallelism + 1, + true); + + environment.addOperator(sinkTransformation); } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java index be0401df86c..16dca9d2b4e 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java @@ -19,7 +19,10 @@ package org.apache.flink.processfunction; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.deployment.executors.LocalExecutorFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -28,11 +31,15 @@ import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.processfunction.api.DataStream; import org.apache.flink.processfunction.api.ExecutionEnvironment; +import org.apache.flink.processfunction.connector.SupplierSourceFunction; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.SupplierFunction; import java.util.ArrayList; @@ -64,8 +71,36 @@ public class ExecutionEnvironmentImpl extends ExecutionEnvironment { @Override public <OUT> DataStream<OUT> tmpFromSupplierSource(SupplierFunction<OUT> supplier) { - // TODO: keep calling `supplier.get()` at runtime - return new DataStreamImpl<>(); + final String sourceName = "Supplier Source"; + // TODO Supports clean closure + final SupplierSourceFunction<OUT> sourceFunction = new SupplierSourceFunction<>(supplier); + final TypeInformation<OUT> resolvedTypeInfo = + TypeExtractor.getUnaryOperatorReturnType( + supplier, + SupplierFunction.class, + -1, + 0, + TypeExtractor.NO_INDEX, + null, + null, + false); + + final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(sourceFunction); + return new DataStreamImpl<>( + this, + new LegacySourceTransformation<>( + sourceName, + sourceOperator, + resolvedTypeInfo, + // TODO Supports configure parallelism + 1, + Boundedness.CONTINUOUS_UNBOUNDED, + true)); + } + + public void addOperator(Transformation<?> transformation) { + Preconditions.checkNotNull(transformation, "transformation must not be null."); + this.transformations.add(transformation); } // ----------------------------------------------- diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/ConsumerSinkFunction.java similarity index 64% copy from flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java copy to flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/ConsumerSinkFunction.java index 72cb6d276f5..996e5ec0e25 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/ConsumerSinkFunction.java @@ -16,14 +16,21 @@ * limitations under the License. */ -package org.apache.flink.processfunction; +package org.apache.flink.processfunction.connector; -import org.apache.flink.processfunction.api.DataStream; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.function.ConsumerFunction; -public class DataStreamImpl<T> implements DataStream<T> { +/** Legacy implementation for ConsumerSink. */ +public class ConsumerSinkFunction<IN> implements SinkFunction<IN> { + private final ConsumerFunction<IN> dataConsumer; + + public ConsumerSinkFunction(ConsumerFunction<IN> dataConsumer) { + this.dataConsumer = dataConsumer; + } + @Override - public void tmpToConsumerSink(ConsumerFunction<T> consumer) { - // TODO: keep calling `consumer.accept()` at runtime + public void invoke(IN value, Context context) { + dataConsumer.accept(value); } } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/SupplierSourceFunction.java similarity index 52% copy from flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java copy to flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/SupplierSourceFunction.java index 8f4d39427ce..044dc4c6c1d 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/connector/SupplierSourceFunction.java @@ -16,21 +16,33 @@ * limitations under the License. */ -package org.apache.flink.processfunction.api; +package org.apache.flink.processfunction.connector; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.function.SupplierFunction; -public abstract class ExecutionEnvironment { - public static ExecutionEnvironment getExecutionEnvironment() - throws ReflectiveOperationException { - return (ExecutionEnvironment) - Class.forName("org.apache.flink.processfunction.ExecutionEnvironmentImpl") - .getMethod("newInstance") - .invoke(null); +/** Legacy implementation for SupplierSource. */ +public class SupplierSourceFunction<OUT> implements SourceFunction<OUT> { + private volatile boolean isRunning = true; + + private final SupplierFunction<OUT> dataSupplier; + + public SupplierSourceFunction(SupplierFunction<OUT> dataSupplier) { + this.dataSupplier = dataSupplier; } - public abstract void execute() throws Exception; + @Override + public void run(SourceContext<OUT> ctx) throws Exception { + while (isRunning) { + OUT data = dataSupplier.get(); + ctx.collect(data); + //noinspection BusyWait + Thread.sleep(1000L); + } + } - /** TODO: Temporal method. Will revisit source functions later. */ - public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(SupplierFunction<OUT> supplier); + @Override + public void cancel() { + isRunning = false; + } }
