[FLINK-8935][tests] Implement MiniClusterClient#getAccumulators

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d86a2f0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d86a2f0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d86a2f0f

Branch: refs/heads/release-1.5
Commit: d86a2f0fbe3ca55fce18e6b319419eca04739a6a
Parents: 74525af
Author: zentol <[email protected]>
Authored: Wed Mar 7 11:05:42 2018 +0100
Committer: zentol <[email protected]>
Committed: Tue Mar 20 10:15:33 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/program/MiniClusterClient.java | 13 +++++++++++--
 .../apache/flink/runtime/minicluster/MiniCluster.java  | 12 ++++++++++++
 2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d86a2f0f/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 961604f..9c87423 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -47,6 +49,7 @@ import javax.annotation.Nullable;
 import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -130,12 +133,18 @@ public class MiniClusterClient extends 
ClusterClient<MiniClusterClient.MiniClust
 
        @Override
        public Map<String, Object> getAccumulators(JobID jobID) throws 
Exception {
-               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+               return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
        }
 
        @Override
        public Map<String, Object> getAccumulators(JobID jobID, ClassLoader 
loader) throws Exception {
-               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+               AccessExecutionGraph executionGraph = guardWithSingleRetry(() 
-> miniCluster.getExecutionGraph(jobID), scheduledExecutor).get();
+               Map<String, SerializedValue<Object>> accumulatorsSerialized = 
executionGraph.getAccumulatorsSerialized();
+               Map<String, Object> result = new 
HashMap<>(accumulatorsSerialized.size());
+               for (Map.Entry<String, SerializedValue<Object>> acc : 
accumulatorsSerialized.entrySet()) {
+                       result.put(acc.getKey(), 
acc.getValue().deserializeValue(loader));
+               }
+               return result;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d86a2f0f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 21b89ec..e958005 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -522,6 +523,17 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
+       public CompletableFuture<? extends AccessExecutionGraph> 
getExecutionGraph(JobID jobId) {
+               try {
+                       return getDispatcherGateway().requestJob(jobId, 
rpcTimeout);
+               } catch (LeaderRetrievalException | InterruptedException e) {
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException(
+                                       String.format("Could not retrieve job 
job %s.", jobId),
+                                       e));
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  running jobs
        // 
------------------------------------------------------------------------

Reply via email to