Repository: flink
Updated Branches:
  refs/heads/release-1.5 0ba8ed6ba -> 62d927371


[FLINK-8935][tests] Implement MiniClusterClient#stop

This closes #5690.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62d92737
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62d92737
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62d92737

Branch: refs/heads/release-1.5
Commit: 62d927371d508bc00ab2bc55965387abafb38daa
Parents: 2933530
Author: zentol <[email protected]>
Authored: Wed Mar 7 13:02:27 2018 +0100
Committer: zentol <[email protected]>
Committed: Tue Mar 20 10:15:33 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/program/MiniClusterClient.java   |  2 +-
 .../apache/flink/runtime/minicluster/MiniCluster.java    | 11 +++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62d92737/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 4354267..276df62 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -113,7 +113,7 @@ public class MiniClusterClient extends 
ClusterClient<MiniClusterClient.MiniClust
 
        @Override
        public void stop(JobID jobId) throws Exception {
-               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+               guardWithSingleRetry(() -> miniCluster.stopJob(jobId), 
scheduledExecutor).get();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/62d92737/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index e958005..bc75a54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -512,6 +512,17 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
+       public CompletableFuture<Acknowledge> stopJob(JobID jobId) {
+               try {
+                       return getDispatcherGateway().stopJob(jobId, 
rpcTimeout);
+               } catch (LeaderRetrievalException | InterruptedException e) {
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException(
+                                       String.format("Could not stop job %s.", 
jobId),
+                                       e));
+               }
+       }
+
        public CompletableFuture<String> triggerSavepoint(JobID jobId, String 
targetDirectory, boolean cancelJob) {
                try {
                        return getDispatcherGateway().triggerSavepoint(jobId, 
targetDirectory, cancelJob, rpcTimeout);

Reply via email to