This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0328fb62c05396dba51e44a8e0bcf5641f13e8f8 Author: tison <wander4...@gmail.com> AuthorDate: Fri Nov 29 09:52:39 2019 +0800 [FLINK-14762][client] Implement JobClient#stopWithSavepoint --- .../client/deployment/ClusterClientJobClientAdapter.java | 7 +++++++ .../java/org/apache/flink/core/execution/JobClient.java | 15 +++++++++++++++ .../java/org/apache/flink/api/java/TestingJobClient.java | 7 +++++++ .../flink/streaming/environment/TestingJobClient.java | 7 +++++++ 4 files changed, 36 insertions(+) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java index 602d64d..88ca29c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.FunctionUtils; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -62,6 +64,11 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient { } @Override + public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) { + return clusterClient.stopWithSavepoint(jobID, advanceToEndOfEventTime, savepointDirectory); + } + + @Override public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) { checkNotNull(userClassloader); diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java index 9f4614c..d392377 100644 --- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java +++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java @@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; +import javax.annotation.Nullable; + import java.util.concurrent.CompletableFuture; /** @@ -41,6 +43,19 @@ public interface JobClient extends AutoCloseable { CompletableFuture<Void> cancel(); /** + * Stops the associated job on Flink cluster. + * + * <p>Stopping works only for streaming programs. Be aware, that the job might continue to run for + * a while after sending the stop command, because after sources stopped to emit data all operators + * need to finish processing. + * + * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline + * @param savepointDirectory directory the savepoint should be written to + * @return a {@link CompletableFuture} containing the path where the savepoint is located + */ + CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory); + + /** * Returns the {@link JobExecutionResult result of the job execution} of the submitted job. * * @param userClassloader the classloader used to de-serialize the accumulators of the job. diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java index 215efed..c9aa64b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.core.execution.JobClient; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -45,4 +47,9 @@ public class TestingJobClient implements JobClient { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) { + return CompletableFuture.completedFuture("null"); + } + } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java index af1025c..5cd669c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.core.execution.JobClient; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -45,4 +47,9 @@ public class TestingJobClient implements JobClient { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, @Nullable String savepointDirectory) { + return CompletableFuture.completedFuture("null"); + } + }