Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5308#discussion_r163874024
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -752,16 +751,21 @@ public Executor getFutureExecutor() {
        /**
         * Gets a serialized accumulator map.
         * @return The accumulator map with serialized accumulator values.
    -    * @throws IOException
         */
        @Override
    -   public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
throws IOException {
    +   public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
{
     
                Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
     
                Map<String, SerializedValue<Object>> result = new 
HashMap<>(accumulatorMap.size());
                for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
    -                   result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
    +
    +                   try {
    +                           final SerializedValue<Object> serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
    +                           result.put(entry.getKey(), serializedValue);
    +                   } catch (IOException ioe) {
    +                           LOG.info("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
    --- End diff --
    
    This is mainly a question of behaviour. In case that something goes wrong 
while serializing the accumulators one can either completely fail or try to 
return as much as possible, as it is done here.
    
    I'll change the log level to error and insert a 
`FailedAccumulatorSerialization` entry which throws the exception when being 
accessed instead.


---

Reply via email to