This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3898a4b9bcbae8e50134fa74ccc3c7a9cb4034a5 Author: tison <wander4...@gmail.com> AuthorDate: Fri Nov 29 18:56:56 2019 +0800 [FLINK-14762][client] Implement JobClient#getJobStatus This closes #10311 . --- .../flink/client/deployment/ClusterClientJobClientAdapter.java | 6 ++++++ .../src/main/java/org/apache/flink/core/execution/JobClient.java | 6 ++++++ .../src/test/java/org/apache/flink/api/java/TestingJobClient.java | 6 ++++++ .../org/apache/flink/streaming/environment/TestingJobClient.java | 6 ++++++ 4 files changed, 24 insertions(+) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java index 91a9c91..73a343f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java @@ -20,6 +20,7 @@ package org.apache.flink.client.deployment; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.core.execution.JobClient; @@ -60,6 +61,11 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { } @Override + public CompletableFuture<JobStatus> getJobStatus() { + return clusterClient.getJobStatus(jobID); + } + + @Override public CompletableFuture<Void> cancel() { return clusterClient.cancel(jobID).thenApply(FunctionUtils.nullFn()); } diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java index 5bec503..d82cf64 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java @@ -21,6 +21,7 @@ package org.apache.flink.core.execution; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import javax.annotation.Nullable; @@ -39,6 +40,11 @@ public interface JobClient extends AutoCloseable { JobID getJobID(); /** + * Requests the {@link JobStatus} of the associated job. + */ + CompletableFuture<JobStatus> getJobStatus(); + + /** * Cancels the associated job. */ CompletableFuture<Void> cancel(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java index b5c9873..a24cc3e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.JobClient; import javax.annotation.Nullable; @@ -39,6 +40,11 @@ public class TestingJobClient implements JobClient { } @Override + public CompletableFuture<JobStatus> getJobStatus() { + return CompletableFuture.completedFuture(JobStatus.FINISHED); + } + + @Override public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) { return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap())); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java index bcc02ee..36439b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.environment; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.JobClient; import javax.annotation.Nullable; @@ -39,6 +40,11 @@ public class TestingJobClient implements JobClient { } @Override + public CompletableFuture<JobStatus> getJobStatus() { + return CompletableFuture.completedFuture(JobStatus.FINISHED); + } + + @Override public CompletableFuture<JobExecutionResult> getJobExecutionResult(ClassLoader userClassloader) { return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 0L, Collections.emptyMap())); }