This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9616f46377db055a3e7a1b8323313f10e4f7b68e
Author: Weijie Guo <[email protected]>
AuthorDate: Wed May 31 12:12:23 2023 +0800

    Supports submit job to minicluster environment.
---
 .../processfunction/api/ExecutionEnvironment.java  |   2 +-
 .../flink-process-function-examples/pom.xml        |   2 +-
 .../flink-process-function/pom.xml                 |  14 ++-
 .../processfunction/ExecutionEnvironmentImpl.java  | 110 ++++++++++++++++++++-
 4 files changed, 123 insertions(+), 5 deletions(-)

diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
index 6335c9e33b8..36d7180d2c9 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
@@ -29,7 +29,7 @@ public abstract class ExecutionEnvironment {
                         .invoke(null);
     }
 
-    public abstract void execute();
+    public abstract void execute() throws Exception;
 
     /** TODO: Temporal method. Will revisit source functions later. */
     public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> 
supplier);
diff --git 
a/flink-process-function-parent/flink-process-function-examples/pom.xml 
b/flink-process-function-parent/flink-process-function-examples/pom.xml
index a37a7ad21d3..c223300a5b0 100644
--- a/flink-process-function-parent/flink-process-function-examples/pom.xml
+++ b/flink-process-function-parent/flink-process-function-examples/pom.xml
@@ -41,7 +41,7 @@
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-process-function-api</artifactId>
-                       <version>1.18-SNAPSHOT</version>
+                       <version>${project.version}</version>
                </dependency>
        </dependencies>
 </project>
diff --git a/flink-process-function-parent/flink-process-function/pom.xml 
b/flink-process-function-parent/flink-process-function/pom.xml
index 92609ba3938..f36526d1fe4 100644
--- a/flink-process-function-parent/flink-process-function/pom.xml
+++ b/flink-process-function-parent/flink-process-function/pom.xml
@@ -41,7 +41,19 @@
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-process-function-api</artifactId>
-                       <version>1.18-SNAPSHOT</version>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
                </dependency>
        </dependencies>
 </project>
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
index ee366d47b9d..c5554f23fe4 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
@@ -18,19 +18,48 @@
 
 package org.apache.flink.processfunction;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.client.deployment.executors.LocalExecutorFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.processfunction.api.DataStream;
 import org.apache.flink.processfunction.api.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public class ExecutionEnvironmentImpl extends ExecutionEnvironment {
+    private final List<Transformation<?>> transformations = new ArrayList<>();
+
+    private final ExecutionConfig config = new ExecutionConfig();
+
+    private final Configuration configuration = new Configuration();
+
     public static ExecutionEnvironmentImpl newInstance() {
         return new ExecutionEnvironmentImpl();
     }
 
     @Override
-    public void execute() {
-        // TODO: submit job for execution
+    public void execute() throws Exception {
+        StreamGraph streamGraph = getStreamGraph();
+        streamGraph.setJobName("Process Function Api Test Job");
+        execute(streamGraph);
+        // TODO Supports cache for DataStream.
     }
 
     @Override
@@ -38,4 +67,81 @@ public class ExecutionEnvironmentImpl extends 
ExecutionEnvironment {
         // TODO: keep calling `supplier.get()` at runtime
         return new DataStreamImpl<>();
     }
+
+    // -----------------------------------------------
+    //              Internal Methods
+    // -----------------------------------------------
+
+    private void execute(StreamGraph streamGraph) throws Exception {
+        final JobClient jobClient = executeAsync(streamGraph);
+
+        try {
+            if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
+                jobClient.getJobExecutionResult().get();
+            }
+            // TODO Supports handle JobExecutionResult, including execution 
time and accumulator.
+
+            // TODO Supports job listeners.
+        } catch (Throwable t) {
+            // get() on the JobExecutionResult Future will throw an 
ExecutionException. This
+            // behaviour was largely not there in Flink versions before the 
PipelineExecutor
+            // refactoring so we should strip that exception.
+            Throwable strippedException = 
ExceptionUtils.stripExecutionException(t);
+            ExceptionUtils.rethrowException(strippedException);
+        }
+    }
+
+    private JobClient executeAsync(StreamGraph streamGraph) throws Exception {
+        checkNotNull(streamGraph, "StreamGraph cannot be null.");
+        final PipelineExecutor executor = getPipelineExecutor();
+
+        CompletableFuture<JobClient> jobClientFuture =
+                executor.execute(streamGraph, configuration, 
getClass().getClassLoader());
+
+        try {
+            // TODO Supports job listeners.
+            // TODO Supports DataStream collect.
+            return jobClientFuture.get();
+        } catch (ExecutionException executionException) {
+            final Throwable strippedException =
+                    ExceptionUtils.stripExecutionException(executionException);
+            throw new FlinkException(
+                    String.format("Failed to execute job '%s'.", 
streamGraph.getJobName()),
+                    strippedException);
+        }
+    }
+
+    /** Get {@link StreamGraph} and clear all transformations. */
+    private StreamGraph getStreamGraph() {
+        final StreamGraph streamGraph = 
getStreamGraphGenerator(transformations).generate();
+        transformations.clear();
+        return streamGraph;
+    }
+
+    private StreamGraphGenerator 
getStreamGraphGenerator(List<Transformation<?>> transformations) {
+        if (transformations.size() <= 0) {
+            throw new IllegalStateException(
+                    "No operators defined in streaming topology. Cannot 
execute.");
+        }
+
+        // We copy the transformation so that newly added transformations 
cannot intervene with the
+        // stream graph generation.
+        return new StreamGraphGenerator(
+                        new ArrayList<>(transformations),
+                        config,
+                        new CheckpointConfig(),
+                        configuration)
+                // TODO Re-Consider should we expose the logic of controlling 
chains to users.
+                .setChaining(true)
+                .setTimeCharacteristic(DEFAULT_TIME_CHARACTERISTIC);
+    }
+
+    private PipelineExecutor getPipelineExecutor() {
+        // TODO Get executor factory via SPI.
+        PipelineExecutorFactory executorFactory = new LocalExecutorFactory();
+        // TODO Local executor only expect attached mode, remove this after 
other executor
+        // supported.
+        configuration.set(DeploymentOptions.ATTACHED, true);
+        return executorFactory.getExecutor(configuration);
+    }
 }

Reply via email to