Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5573#discussion_r172756524
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
@@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId,
@Nullable String savepointDirecto
});
}
+ @Override
+ public Map<String, Object> getAccumulators(final JobID jobID) throws
Exception {
+ final JobAccumulatorsHeaders accumulatorsHeaders =
JobAccumulatorsHeaders.getInstance();
+ final JobAccumulatorsMessageParameters accMsgParams =
accumulatorsHeaders.getUnresolvedMessageParameters();
+ accMsgParams.jobPathParameter.resolve(jobID);
+
accMsgParams.queryParameter.resolve(Collections.singletonList(true));
+
+ CompletableFuture<JobAccumulatorsInfo> responseFuture =
sendRequest(
+ accumulatorsHeaders,
+ accMsgParams
+ );
+
+ return responseFuture.thenApply((JobAccumulatorsInfo
accumulatorsInfo) -> {
+ if (accumulatorsInfo != null &&
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+ return
accumulatorsInfo.getSerializedUserAccumulators();
--- End diff --
the accumulators should be deserialized via
`SerializedValue#deserialize(ClassLoader)` .
If `Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader)`
(that also should be overridden) was called use the passed in `ClassLoader`,
otherwise `ClassLoader.getSystemClassLoader()`.
---