Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1112#discussion_r39073704
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
---
@@ -922,19 +922,18 @@ public boolean updateState(TaskExecutionState state) {
case RUNNING:
return attempt.switchToRunning();
case FINISHED:
- Map<AccumulatorRegistry.Metric,
Accumulator<?, ?>> flinkAccumulators = null;
- Map<String, Accumulator<?, ?>>
userAccumulators = null;
try {
AccumulatorSnapshot
accumulators = state.getAccumulators();
- flinkAccumulators =
accumulators.deserializeFlinkAccumulators();
- userAccumulators =
accumulators.deserializeUserAccumulators(userClassLoader);
+ Map<AccumulatorRegistry.Metric,
Accumulator<?, ?>> flinkAccumulators =
+
accumulators.deserializeFlinkAccumulators();
+ Map<String, Accumulator<?, ?>>
userAccumulators =
+
accumulators.deserializeUserAccumulators(userClassLoader);
+
attempt.markFinished(flinkAccumulators, userAccumulators);
}
catch (Exception e) {
- // we do not fail the job on
deserialization problems of accumulators, but only log
LOG.error("Failed to
deserialize final accumulator results.", e);
+ attempt.markFailed(new
SerializedThrowable(e));
--- End diff --
I don't think you need a SerializedThrowable here. The exceptions are
wrapped in serialized throwables when they are transported via actor messages.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---