Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5573#discussion_r175733976
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
@@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId,
@Nullable String savepointDirecto
});
}
+ @Override
+ public Map<String, Object> getAccumulators(JobID jobID) throws
Exception {
+ return getAccumulators(jobID,
ClassLoader.getSystemClassLoader());
+ }
+
+ @Override
+ public Map<String, Object> getAccumulators(final JobID jobID,
ClassLoader loader) throws Exception {
+ final JobAccumulatorsHeaders accumulatorsHeaders =
JobAccumulatorsHeaders.getInstance();
+ final JobAccumulatorsMessageParameters accMsgParams =
accumulatorsHeaders.getUnresolvedMessageParameters();
+ accMsgParams.jobPathParameter.resolve(jobID);
+
accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
+
+ CompletableFuture<JobAccumulatorsInfo> responseFuture =
sendRequest(
+ accumulatorsHeaders,
+ accMsgParams
+ );
+
+ return responseFuture.thenApply((JobAccumulatorsInfo
accumulatorsInfo) -> {
+ if (accumulatorsInfo != null &&
accumulatorsInfo.getSerializedUserAccumulators() != null) {
+ try {
+ return
AccumulatorHelper.deserializeAccumulators(accumulatorsInfo.getSerializedUserAccumulators(),
loader);
+ } catch (Exception e) {
+ log.error("Deserialize accumulators
with customized classloader error : {}", e);
+ }
+ }
+
+ return Collections.EMPTY_MAP;
--- End diff --
please replace with `Collections.<String, Object>emptyMap()`
---