[FLINK-8935][tests] Implement MiniClusterClient#listJobs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74525afc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74525afc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74525afc Branch: refs/heads/release-1.5 Commit: 74525afca7a741b1d26b587a35ddc4ac30214d28 Parents: 92ed0b2 Author: zentol <[email protected]> Authored: Wed Mar 7 11:05:12 2018 +0100 Committer: zentol <[email protected]> Committed: Tue Mar 20 10:15:33 2018 +0100 ---------------------------------------------------------------------- .../flink/client/program/MiniClusterClient.java | 2 +- .../flink/runtime/minicluster/MiniCluster.java | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/74525afc/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 86e9279..961604f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -125,7 +125,7 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust @Override public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception { - throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation."); + return guardWithSingleRetry(miniCluster::listJobs, scheduledExecutor); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/74525afc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index dfe30af..21b89ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -87,6 +88,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -473,6 +475,20 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { // Accessing jobs // ------------------------------------------------------------------------ + public CompletableFuture<Collection<JobStatusMessage>> listJobs() { + try { + return getDispatcherGateway().requestMultipleJobDetails(rpcTimeout) + .thenApply(jobs -> jobs.getJobs().stream() + .map(details -> new JobStatusMessage(details.getJobId(), details.getJobName(), details.getStatus(), details.getStartTime())) + .collect(Collectors.toList())); + } catch (LeaderRetrievalException | InterruptedException e) { + return FutureUtils.completedExceptionally( + new FlinkException( + "Could not retrieve job list.", + e)); + } + } + public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { try { return getDispatcherGateway().requestJobStatus(jobId, rpcTimeout);
