[FLINK-5112] [ExecutionGraph] Remove unused accumulator aggregation code from 
ArchivedExecutionJobVertex

The ArchivedExecutionJobVertex calculated for its ExecutionVertices the 
aggregated accumulator
value. However, the result was nowhere stored. This indicates that this code is 
no longer used
and can be removed.

This closes #2846.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2eacfba2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2eacfba2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2eacfba2

Branch: refs/heads/master
Commit: 2eacfba280d80434b61e0bd0c840b158923b416d
Parents: 1b2f3c0
Author: Till Rohrmann <[email protected]>
Authored: Mon Nov 21 15:57:06 2016 +0100
Committer: zentol <[email protected]>
Committed: Sun Nov 27 10:57:58 2016 +0100

----------------------------------------------------------------------
 .../executiongraph/ArchivedExecutionJobVertex.java       | 11 -----------
 1 file changed, 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2eacfba2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index 8ae6bbd..e30f45a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -17,8 +17,6 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
@@ -27,8 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import scala.Option;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregateJobVertexState;
 
@@ -54,13 +50,6 @@ public class ArchivedExecutionJobVertex implements 
AccessExecutionJobVertex, Ser
                        taskVertices[x] = 
jobVertex.getTaskVertices()[x].archive();
                }
 
-               Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = 
new HashMap<>();
-               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
-                       Map<String, Accumulator<?, ?>> next = 
vertex.getCurrentExecutionAttempt().getUserAccumulators();
-                       if (next != null) {
-                               
AccumulatorHelper.mergeInto(tmpArchivedUserAccumulators, next);
-                       }
-               }
                archivedUserAccumulators = 
jobVertex.getAggregatedUserAccumulatorsStringified();
 
                this.id = jobVertex.getJobVertexId();

Reply via email to