Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1112#discussion_r39073704
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
    @@ -922,19 +922,18 @@ public boolean updateState(TaskExecutionState state) {
                                case RUNNING:
                                        return attempt.switchToRunning();
                                case FINISHED:
    -                                   Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators = null;
    -                                   Map<String, Accumulator<?, ?>> 
userAccumulators = null;
                                        try {
                                                AccumulatorSnapshot 
accumulators = state.getAccumulators();
    -                                           flinkAccumulators = 
accumulators.deserializeFlinkAccumulators();
    -                                           userAccumulators = 
accumulators.deserializeUserAccumulators(userClassLoader);
    +                                           Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators =
    +                                                   
accumulators.deserializeFlinkAccumulators();
    +                                           Map<String, Accumulator<?, ?>> 
userAccumulators =
    +                                                   
accumulators.deserializeUserAccumulators(userClassLoader);
    +                                           
attempt.markFinished(flinkAccumulators, userAccumulators);
                                        }
                                        catch (Exception e) {
    -                                           // we do not fail the job on 
deserialization problems of accumulators, but only log
                                                LOG.error("Failed to 
deserialize final accumulator results.", e);
    +                                           attempt.markFailed(new 
SerializedThrowable(e));
    --- End diff --
    
    I don't think you need a SerializedThrowable here. The exceptions are 
wrapped in serialized throwables when they are transported via actor messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to