This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9616f46377db055a3e7a1b8323313f10e4f7b68e Author: Weijie Guo <[email protected]> AuthorDate: Wed May 31 12:12:23 2023 +0800 Supports submit job to minicluster environment. --- .../processfunction/api/ExecutionEnvironment.java | 2 +- .../flink-process-function-examples/pom.xml | 2 +- .../flink-process-function/pom.xml | 14 ++- .../processfunction/ExecutionEnvironmentImpl.java | 110 ++++++++++++++++++++- 4 files changed, 123 insertions(+), 5 deletions(-) diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java index 6335c9e33b8..36d7180d2c9 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java @@ -29,7 +29,7 @@ public abstract class ExecutionEnvironment { .invoke(null); } - public abstract void execute(); + public abstract void execute() throws Exception; /** TODO: Temporal method. Will revisit source functions later. */ public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> supplier); diff --git a/flink-process-function-parent/flink-process-function-examples/pom.xml b/flink-process-function-parent/flink-process-function-examples/pom.xml index a37a7ad21d3..c223300a5b0 100644 --- a/flink-process-function-parent/flink-process-function-examples/pom.xml +++ b/flink-process-function-parent/flink-process-function-examples/pom.xml @@ -41,7 +41,7 @@ <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-process-function-api</artifactId> - <version>1.18-SNAPSHOT</version> + <version>${project.version}</version> </dependency> </dependencies> </project> diff --git a/flink-process-function-parent/flink-process-function/pom.xml b/flink-process-function-parent/flink-process-function/pom.xml index 92609ba3938..f36526d1fe4 100644 --- a/flink-process-function-parent/flink-process-function/pom.xml +++ b/flink-process-function-parent/flink-process-function/pom.xml @@ -41,7 +41,19 @@ <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-process-function-api</artifactId> - <version>1.18-SNAPSHOT</version> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> </dependency> </dependencies> </project> diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java index ee366d47b9d..c5554f23fe4 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java @@ -18,19 +18,48 @@ package org.apache.flink.processfunction; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.client.deployment.executors.LocalExecutorFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.PipelineExecutor; +import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.processfunction.api.DataStream; import org.apache.flink.processfunction.api.ExecutionEnvironment; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import static org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC; +import static org.apache.flink.util.Preconditions.checkNotNull; + public class ExecutionEnvironmentImpl extends ExecutionEnvironment { + private final List<Transformation<?>> transformations = new ArrayList<>(); + + private final ExecutionConfig config = new ExecutionConfig(); + + private final Configuration configuration = new Configuration(); + public static ExecutionEnvironmentImpl newInstance() { return new ExecutionEnvironmentImpl(); } @Override - public void execute() { - // TODO: submit job for execution + public void execute() throws Exception { + StreamGraph streamGraph = getStreamGraph(); + streamGraph.setJobName("Process Function Api Test Job"); + execute(streamGraph); + // TODO Supports cache for DataStream. } @Override @@ -38,4 +67,81 @@ public class ExecutionEnvironmentImpl extends ExecutionEnvironment { // TODO: keep calling `supplier.get()` at runtime return new DataStreamImpl<>(); } + + // ----------------------------------------------- + // Internal Methods + // ----------------------------------------------- + + private void execute(StreamGraph streamGraph) throws Exception { + final JobClient jobClient = executeAsync(streamGraph); + + try { + if (configuration.getBoolean(DeploymentOptions.ATTACHED)) { + jobClient.getJobExecutionResult().get(); + } + // TODO Supports handle JobExecutionResult, including execution time and accumulator. + + // TODO Supports job listeners. + } catch (Throwable t) { + // get() on the JobExecutionResult Future will throw an ExecutionException. This + // behaviour was largely not there in Flink versions before the PipelineExecutor + // refactoring so we should strip that exception. + Throwable strippedException = ExceptionUtils.stripExecutionException(t); + ExceptionUtils.rethrowException(strippedException); + } + } + + private JobClient executeAsync(StreamGraph streamGraph) throws Exception { + checkNotNull(streamGraph, "StreamGraph cannot be null."); + final PipelineExecutor executor = getPipelineExecutor(); + + CompletableFuture<JobClient> jobClientFuture = + executor.execute(streamGraph, configuration, getClass().getClassLoader()); + + try { + // TODO Supports job listeners. + // TODO Supports DataStream collect. + return jobClientFuture.get(); + } catch (ExecutionException executionException) { + final Throwable strippedException = + ExceptionUtils.stripExecutionException(executionException); + throw new FlinkException( + String.format("Failed to execute job '%s'.", streamGraph.getJobName()), + strippedException); + } + } + + /** Get {@link StreamGraph} and clear all transformations. */ + private StreamGraph getStreamGraph() { + final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate(); + transformations.clear(); + return streamGraph; + } + + private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) { + if (transformations.size() <= 0) { + throw new IllegalStateException( + "No operators defined in streaming topology. Cannot execute."); + } + + // We copy the transformation so that newly added transformations cannot intervene with the + // stream graph generation. + return new StreamGraphGenerator( + new ArrayList<>(transformations), + config, + new CheckpointConfig(), + configuration) + // TODO Re-Consider should we expose the logic of controlling chains to users. + .setChaining(true) + .setTimeCharacteristic(DEFAULT_TIME_CHARACTERISTIC); + } + + private PipelineExecutor getPipelineExecutor() { + // TODO Get executor factory via SPI. + PipelineExecutorFactory executorFactory = new LocalExecutorFactory(); + // TODO Local executor only expect attached mode, remove this after other executor + // supported. + configuration.set(DeploymentOptions.ATTACHED, true); + return executorFactory.getExecutor(configuration); + } }
