Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5573#discussion_r175736246 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -389,6 +394,36 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto }); } + @Override + public Map<String, Object> getAccumulators(JobID jobID) throws Exception { + return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); + } + + @Override + public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader) throws Exception { + final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance(); + final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters(); + accMsgParams.jobPathParameter.resolve(jobID); + accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true)); + + CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest( + accumulatorsHeaders, + accMsgParams + ); + + return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> { + if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) { --- End diff -- `accumulatorsInfo` should always be null. The same applies to `getSerializedUserAccumulators`. Thus there is no need for a null check.
---