Repository: flink Updated Branches: refs/heads/master 463b922ab -> 2dab4374b
[FLINK-8935][tests] Implement MiniClusterClient#stop This closes #5690. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dab4374 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dab4374 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dab4374 Branch: refs/heads/master Commit: 2dab4374bc5280a2b4536f7ad1e153d6361a8885 Parents: ca514e1 Author: zentol <[email protected]> Authored: Wed Mar 7 13:02:27 2018 +0100 Committer: zentol <[email protected]> Committed: Tue Mar 20 10:14:26 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/client/program/MiniClusterClient.java | 2 +- .../apache/flink/runtime/minicluster/MiniCluster.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2dab4374/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 4354267..276df62 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -113,7 +113,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust @Override public void stop(JobID jobId) throws Exception { - throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation."); + guardWithSingleRetry(() -> miniCluster.stopJob(jobId), scheduledExecutor).get(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2dab4374/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index e958005..bc75a54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -512,6 +512,17 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } + public CompletableFuture<Acknowledge> stopJob(JobID jobId) { + try { + return getDispatcherGateway().stopJob(jobId, rpcTimeout); + } catch (LeaderRetrievalException | InterruptedException e) { + return FutureUtils.completedExceptionally( + new FlinkException( + String.format("Could not stop job %s.", jobId), + e)); + } + } + public CompletableFuture<String> triggerSavepoint(JobID jobId, String targetDirectory, boolean cancelJob) { try { return getDispatcherGateway().triggerSavepoint(jobId, targetDirectory, cancelJob, rpcTimeout);
