This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7dcfd90 [FLINK-17744] Make (Stream)ContextEnvironment#execute call
JobListener#onJobExecuted
7dcfd90 is described below
commit 7dcfd90dd42648133bb7c1216740f47fffe89c13
Author: Echo Lee <[email protected]>
AuthorDate: Tue May 26 16:44:13 2020 +0800
[FLINK-17744] Make (Stream)ContextEnvironment#execute call
JobListener#onJobExecuted
This closes #12339.
---
.../flink/client/program/ContextEnvironment.java | 29 ++++++++++++++++++++--
.../client/program/StreamContextEnvironment.java | 26 ++++++++++++++++++-
.../flink/api/java/ExecutionEnvironment.java | 7 ++++++
3 files changed, 59 insertions(+), 3 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index ea62b80..f7e515b 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -26,16 +26,21 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Execution Environment for remote execution with the Client.
*/
@@ -64,7 +69,26 @@ public class ContextEnvironment extends ExecutionEnvironment
{
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- JobClient jobClient = executeAsync(jobName);
+ final JobClient jobClient = executeAsync(jobName);
+ final List<JobListener> jobListeners = getJobListeners();
+
+ try {
+ final JobExecutionResult jobExecutionResult =
getJobExecutionResult(jobClient);
+ jobListeners.forEach(jobListener ->
+
jobListener.onJobExecuted(jobExecutionResult, null));
+ return jobExecutionResult;
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener ->
+ jobListener.onJobExecuted(null,
ExceptionUtils.stripExecutionException(t)));
+ ExceptionUtils.rethrowException(t);
+
+ // never reached, only make javac happy
+ return null;
+ }
+ }
+
+ private JobExecutionResult getJobExecutionResult(final JobClient
jobClient) throws Exception {
+ checkNotNull(jobClient);
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
@@ -81,7 +105,8 @@ public class ContextEnvironment extends ExecutionEnvironment
{
ContextEnvironment.class.getSimpleName(),
LOG);
jobExecutionResultFuture.whenComplete((ignored,
throwable) ->
-
ShutdownHookUtil.removeShutdownHook(shutdownHook,
ContextEnvironment.class.getSimpleName(), LOG));
+
ShutdownHookUtil.removeShutdownHook(
+ shutdownHook,
ContextEnvironment.class.getSimpleName(), LOG));
}
jobExecutionResult = jobExecutionResultFuture.get();
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 697c9a0..ec81d83 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -24,19 +24,24 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Special {@link StreamExecutionEnvironment} that will be used in cases where
the CLI client or
* testing utilities create a {@link StreamExecutionEnvironment} that should
be used when
@@ -68,7 +73,26 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws
Exception {
- JobClient jobClient = executeAsync(streamGraph);
+ final JobClient jobClient = executeAsync(streamGraph);
+ final List<JobListener> jobListeners = getJobListeners();
+
+ try {
+ final JobExecutionResult jobExecutionResult =
getJobExecutionResult(jobClient);
+ jobListeners.forEach(jobListener ->
+
jobListener.onJobExecuted(jobExecutionResult, null));
+ return jobExecutionResult;
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener ->
+ jobListener.onJobExecuted(null,
ExceptionUtils.stripExecutionException(t)));
+ ExceptionUtils.rethrowException(t);
+
+ // never reached, only make javac happy
+ return null;
+ }
+ }
+
+ private JobExecutionResult getJobExecutionResult(final JobClient
jobClient) throws Exception {
+ checkNotNull(jobClient);
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a227f84..75c8dd1 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -213,6 +213,13 @@ public class ExecutionEnvironment {
}
/**
+ * Gets the config JobListeners.
+ */
+ protected List<JobListener> getJobListeners() {
+ return jobListeners;
+ }
+
+ /**
* Gets the parallelism with which operation are executed by default.
Operations can
* individually override this value to use a specific parallelism via
* {@link Operator#setParallelism(int)}. Other operations may need to
run with a different