[FLINK-5112] [ExecutionGraph] Remove unused accumulator aggregation code from ArchivedExecutionJobVertex
The ArchivedExecutionJobVertex calculated for its ExecutionVertices the aggregated accumulator value. However, the result was nowhere stored. This indicates that this code is no longer used and can be removed. This closes #2846. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2eacfba2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2eacfba2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2eacfba2 Branch: refs/heads/master Commit: 2eacfba280d80434b61e0bd0c840b158923b416d Parents: 1b2f3c0 Author: Till Rohrmann <[email protected]> Authored: Mon Nov 21 15:57:06 2016 +0100 Committer: zentol <[email protected]> Committed: Sun Nov 27 10:57:58 2016 +0100 ---------------------------------------------------------------------- .../executiongraph/ArchivedExecutionJobVertex.java | 11 ----------- 1 file changed, 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2eacfba2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java index 8ae6bbd..e30f45a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java @@ -17,8 +17,6 @@ */ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; @@ -27,8 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import scala.Option; import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; import static org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregateJobVertexState; @@ -54,13 +50,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser taskVertices[x] = jobVertex.getTaskVertices()[x].archive(); } - Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = new HashMap<>(); - for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { - Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); - if (next != null) { - AccumulatorHelper.mergeInto(tmpArchivedUserAccumulators, next); - } - } archivedUserAccumulators = jobVertex.getAggregatedUserAccumulatorsStringified(); this.id = jobVertex.getJobVertexId();
