This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0328fb62c05396dba51e44a8e0bcf5641f13e8f8
Author: tison <wander4...@gmail.com>
AuthorDate: Fri Nov 29 09:52:39 2019 +0800

    [FLINK-14762][client] Implement JobClient#stopWithSavepoint
---
 .../client/deployment/ClusterClientJobClientAdapter.java  |  7 +++++++
 .../java/org/apache/flink/core/execution/JobClient.java   | 15 +++++++++++++++
 .../java/org/apache/flink/api/java/TestingJobClient.java  |  7 +++++++
 .../flink/streaming/environment/TestingJobClient.java     |  7 +++++++
 4 files changed, 36 insertions(+)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 602d64d..88ca29c 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.FunctionUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -62,6 +64,11 @@ public class ClusterClientJobClientAdapter<ClusterID> 
implements JobClient {
        }
 
        @Override
+       public CompletableFuture<String> stopWithSavepoint(boolean 
advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+               return clusterClient.stopWithSavepoint(jobID, 
advanceToEndOfEventTime, savepointDirectory);
+       }
+
+       @Override
        public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(final ClassLoader userClassloader) {
                checkNotNull(userClassloader);
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 9f4614c..d392377 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -41,6 +43,19 @@ public interface JobClient extends AutoCloseable {
        CompletableFuture<Void> cancel();
 
        /**
+        * Stops the associated job on Flink cluster.
+        *
+        * <p>Stopping works only for streaming programs. Be aware, that the 
job might continue to run for
+        * a while after sending the stop command, because after sources 
stopped to emit data all operators
+        * need to finish processing.
+        *
+        * @param advanceToEndOfEventTime flag indicating if the source should 
inject a {@code MAX_WATERMARK} in the pipeline
+        * @param savepointDirectory directory the savepoint should be written 
to
+        * @return a {@link CompletableFuture} containing the path where the 
savepoint is located
+        */
+       CompletableFuture<String> stopWithSavepoint(boolean 
advanceToEndOfEventTime, @Nullable String savepointDirectory);
+
+       /**
         * Returns the {@link JobExecutionResult result of the job execution} 
of the submitted job.
         *
         * @param userClassloader the classloader used to de-serialize the 
accumulators of the job.
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java 
b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
index 215efed..c9aa64b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.execution.JobClient;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
@@ -45,4 +47,9 @@ public class TestingJobClient implements JobClient {
                return CompletableFuture.completedFuture(null);
        }
 
+       @Override
+       public CompletableFuture<String> stopWithSavepoint(boolean 
advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+               return CompletableFuture.completedFuture("null");
+       }
+
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
index af1025c..5cd669c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.execution.JobClient;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
@@ -45,4 +47,9 @@ public class TestingJobClient implements JobClient {
                return CompletableFuture.completedFuture(null);
        }
 
+       @Override
+       public CompletableFuture<String> stopWithSavepoint(boolean 
advanceToEndOfEventTime, @Nullable String savepointDirectory) {
+               return CompletableFuture.completedFuture("null");
+       }
+
 }

Reply via email to