Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5308#discussion_r163538231
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
/**
* Gets a serialized accumulator map.
* @return The accumulator map with serialized accumulator values.
- * @throws IOException
*/
@Override
- public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
throws IOException {
+ public Map<String, SerializedValue<Object>> getAccumulatorsSerialized()
{
Map<String, Accumulator<?, ?>> accumulatorMap =
aggregateUserAccumulators();
Map<String, SerializedValue<Object>> result = new
HashMap<>(accumulatorMap.size());
for (Map.Entry<String, Accumulator<?, ?>> entry :
accumulatorMap.entrySet()) {
- result.put(entry.getKey(), new
SerializedValue<>(entry.getValue().getLocalValue()));
+
+ try {
+ final SerializedValue<Object> serializedValue =
new SerializedValue<>(entry.getValue().getLocalValue());
+ result.put(entry.getKey(), serializedValue);
+ } catch (IOException ioe) {
+ LOG.info("Could not serialize accumulator " +
entry.getKey() + '.', ioe);
--- End diff --
Why is it acceptable to change the behavior, i.e., to ignore the exception.
It is not even logged on `ERROR` level.
Also:
```
LOG.info("Could not serialize accumulator {}.", entry.getKey(), ioe);
```
---