This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dcfd90  [FLINK-17744] Make (Stream)ContextEnvironment#execute call 
JobListener#onJobExecuted
7dcfd90 is described below

commit 7dcfd90dd42648133bb7c1216740f47fffe89c13
Author: Echo Lee <[email protected]>
AuthorDate: Tue May 26 16:44:13 2020 +0800

    [FLINK-17744] Make (Stream)ContextEnvironment#execute call 
JobListener#onJobExecuted
    
    This closes #12339.
---
 .../flink/client/program/ContextEnvironment.java   | 29 ++++++++++++++++++++--
 .../client/program/StreamContextEnvironment.java   | 26 ++++++++++++++++++-
 .../flink/api/java/ExecutionEnvironment.java       |  7 ++++++
 3 files changed, 59 insertions(+), 3 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index ea62b80..f7e515b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -26,16 +26,21 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
@@ -64,7 +69,26 @@ public class ContextEnvironment extends ExecutionEnvironment 
{
 
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
-               JobClient jobClient = executeAsync(jobName);
+               final JobClient jobClient = executeAsync(jobName);
+               final List<JobListener> jobListeners = getJobListeners();
+
+               try {
+                       final JobExecutionResult  jobExecutionResult = 
getJobExecutionResult(jobClient);
+                       jobListeners.forEach(jobListener ->
+                                       
jobListener.onJobExecuted(jobExecutionResult, null));
+                       return jobExecutionResult;
+               } catch (Throwable t) {
+                       jobListeners.forEach(jobListener ->
+                                       jobListener.onJobExecuted(null, 
ExceptionUtils.stripExecutionException(t)));
+                       ExceptionUtils.rethrowException(t);
+
+                       // never reached, only make javac happy
+                       return null;
+               }
+       }
+
+       private JobExecutionResult getJobExecutionResult(final JobClient 
jobClient) throws Exception {
+               checkNotNull(jobClient);
 
                JobExecutionResult jobExecutionResult;
                if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
@@ -81,7 +105,8 @@ public class ContextEnvironment extends ExecutionEnvironment 
{
                                                
ContextEnvironment.class.getSimpleName(),
                                                LOG);
                                jobExecutionResultFuture.whenComplete((ignored, 
throwable) ->
-                                               
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
ContextEnvironment.class.getSimpleName(), LOG));
+                                               
ShutdownHookUtil.removeShutdownHook(
+                                                       shutdownHook, 
ContextEnvironment.class.getSimpleName(), LOG));
                        }
 
                        jobExecutionResult = jobExecutionResultFuture.get();
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 697c9a0..ec81d83 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -24,19 +24,24 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Special {@link StreamExecutionEnvironment} that will be used in cases where 
the CLI client or
  * testing utilities create a {@link StreamExecutionEnvironment} that should 
be used when
@@ -68,7 +73,26 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
 
        @Override
        public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
-               JobClient jobClient = executeAsync(streamGraph);
+               final JobClient jobClient = executeAsync(streamGraph);
+               final List<JobListener> jobListeners = getJobListeners();
+
+               try {
+                       final JobExecutionResult  jobExecutionResult = 
getJobExecutionResult(jobClient);
+                       jobListeners.forEach(jobListener ->
+                                       
jobListener.onJobExecuted(jobExecutionResult, null));
+                       return jobExecutionResult;
+               } catch (Throwable t) {
+                       jobListeners.forEach(jobListener ->
+                                       jobListener.onJobExecuted(null, 
ExceptionUtils.stripExecutionException(t)));
+                       ExceptionUtils.rethrowException(t);
+
+                       // never reached, only make javac happy
+                       return null;
+               }
+       }
+
+       private JobExecutionResult getJobExecutionResult(final JobClient 
jobClient) throws Exception {
+               checkNotNull(jobClient);
 
                JobExecutionResult jobExecutionResult;
                if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a227f84..75c8dd1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -213,6 +213,13 @@ public class ExecutionEnvironment {
        }
 
        /**
+        * Gets the config JobListeners.
+        */
+       protected List<JobListener> getJobListeners() {
+               return jobListeners;
+       }
+
+       /**
         * Gets the parallelism with which operation are executed by default. 
Operations can
         * individually override this value to use a specific parallelism via
         * {@link Operator#setParallelism(int)}. Other operations may need to 
run with a different

Reply via email to