[FLINK-8935][tests] Implement MiniClusterClient#getAccumulators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e6aa676 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e6aa676 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e6aa676 Branch: refs/heads/master Commit: 3e6aa676e36824ac76258ff20723159b40c3a338 Parents: 5fa84c2 Author: zentol <[email protected]> Authored: Wed Mar 7 11:05:42 2018 +0100 Committer: zentol <[email protected]> Committed: Tue Mar 20 10:14:26 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/client/program/MiniClusterClient.java | 13 +++++++++++-- .../apache/flink/runtime/minicluster/MiniCluster.java | 12 ++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e6aa676/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 961604f..9c87423 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; @@ -40,6 +41,7 @@ import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedValue; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -47,6 +49,7 @@ import javax.annotation.Nullable; import java.net.URL; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -130,12 +133,18 @@ public class MiniClusterClient extends ClusterClient<MiniClusterClient.MiniClust @Override public Map<String, Object> getAccumulators(JobID jobID) throws Exception { - throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation."); + return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); } @Override public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception { - throw new UnsupportedOperationException("MiniClusterClient does not yet support this operation."); + AccessExecutionGraph executionGraph = guardWithSingleRetry(() -> miniCluster.getExecutionGraph(jobID), scheduledExecutor).get(); + Map<String, SerializedValue<Object>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized(); + Map<String, Object> result = new HashMap<>(accumulatorsSerialized.size()); + for (Map.Entry<String, SerializedValue<Object>> acc : accumulatorsSerialized.entrySet()) { + result.put(acc.getKey(), acc.getValue().deserializeValue(loader)); + } + return result; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3e6aa676/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 21b89ec..e958005 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; @@ -522,6 +523,17 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } + public CompletableFuture<? extends AccessExecutionGraph> getExecutionGraph(JobID jobId) { + try { + return getDispatcherGateway().requestJob(jobId, rpcTimeout); + } catch (LeaderRetrievalException | InterruptedException e) { + return FutureUtils.completedExceptionally( + new FlinkException( + String.format("Could not retrieve job job %s.", jobId), + e)); + } + } + // ------------------------------------------------------------------------ // running jobs // ------------------------------------------------------------------------
