Hi all,

I did a fix some time ago regarding accumulators: the/JobClient.getAccumulators()/ was infinitely  blocking in local environment for a streaming job (1). The change (2) consisted of giving the current accumulators value for the running job. And when fixing this in the PR, it appeared that I had to change the accumulators semantics with /JobClient/ and I just realized that I forgot to bring this back to the ML:

Previously /JobClient/ assumed that getAccumulator() was called on a bounded pipeline and that the user wanted to acquire the *final accumulator values* after the job is finished.

But now it returns the *current value of accumulators* immediately to be compatible with unbounded pipelines.

If it is run on a bounded pipeline, then to get the final accumulator values after the job is finished, one needs to call /getJobExecutionResult().thenApply(JobExecutionResult::getAllAccumulatorResults)/

(1): https://issues.apache.org/jira/browse/FLINK-18685

(2): https://github.com/apache/flink/pull/14558#


Cheers,

Etienne

Reply via email to