[FLINK-8935][tests] Implement MiniClusterClient#listJobs

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74525afc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74525afc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74525afc

Branch: refs/heads/release-1.5
Commit: 74525afca7a741b1d26b587a35ddc4ac30214d28
Parents: 92ed0b2
Author: zentol <[email protected]>
Authored: Wed Mar 7 11:05:12 2018 +0100
Committer: zentol <[email protected]>
Committed: Tue Mar 20 10:15:33 2018 +0100

----------------------------------------------------------------------
 .../flink/client/program/MiniClusterClient.java     |  2 +-
 .../flink/runtime/minicluster/MiniCluster.java      | 16 ++++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74525afc/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 86e9279..961604f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -125,7 +125,7 @@ public class MiniClusterClient extends 
ClusterClient<MiniClusterClient.MiniClust
 
        @Override
        public CompletableFuture<Collection<JobStatusMessage>> listJobs() 
throws Exception {
-               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+               return guardWithSingleRetry(miniCluster::listJobs, 
scheduledExecutor);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/74525afc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index dfe30af..21b89ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -87,6 +88,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -473,6 +475,20 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        //  Accessing jobs
        // 
------------------------------------------------------------------------
 
+       public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+               try {
+                       return 
getDispatcherGateway().requestMultipleJobDetails(rpcTimeout)
+                               .thenApply(jobs -> jobs.getJobs().stream()
+                                       .map(details -> new 
JobStatusMessage(details.getJobId(), details.getJobName(), details.getStatus(), 
details.getStartTime()))
+                                       .collect(Collectors.toList()));
+               } catch (LeaderRetrievalException | InterruptedException e) {
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException(
+                                       "Could not retrieve job list.",
+                                       e));
+               }
+       }
+
        public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
                try {
                        return getDispatcherGateway().requestJobStatus(jobId, 
rpcTimeout);

Reply via email to