Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5308#discussion_r163874024
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
* Gets a serialized accumulator map.
* @return The accumulator map with serialized accumulator values.
- * @throws IOException
*/
@Override
- public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
throws IOException {
+ public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
{
Map<String, Accumulator<?, ?>> accumulatorMap =
aggregateUserAccumulators();
Map<String, SerializedValue<Object>> result = new
HashMap<>(accumulatorMap.size());
for (Map.Entry<String, Accumulator<?, ?>> entry :
accumulatorMap.entrySet()) {
- result.put(entry.getKey(), new
SerializedValue<>(entry.getValue().getLocalValue()));
+
+ try {
+ final SerializedValue<Object> serializedValue =
new SerializedValue<>(entry.getValue().getLocalValue());
+ result.put(entry.getKey(), serializedValue);
+ } catch (IOException ioe) {
+ LOG.info("Could not serialize accumulator " +
entry.getKey() + '.', ioe);
--- End diff --
This is mainly a question of behaviour. In case that something goes wrong
while serializing the accumulators one can either completely fail or try to
return as much as possible, as it is done here.
I'll change the log level to error and insert a
`FailedAccumulatorSerialization` entry which throws the exception when being
accessed instead.
---