[FLINK-4733] Port Task IO metrics

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba2d007e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba2d007e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba2d007e

Branch: refs/heads/master
Commit: ba2d007e5ad270b9a403d037d186de61cdaac742
Parents: cf4f364
Author: zentol <[email protected]>
Authored: Mon Oct 31 14:17:05 2016 +0100
Committer: zentol <[email protected]>
Committed: Mon Oct 31 15:12:04 2016 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 12 +--
 .../webmonitor/handlers/JobDetailsHandler.java  | 53 +++++++++----
 .../handlers/JobVertexDetailsHandler.java       | 63 ++++++++-------
 .../handlers/JobVertexTaskManagersHandler.java  | 57 +++++++-------
 .../SubtaskCurrentAttemptDetailsHandler.java    |  5 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  | 63 ++++++++-------
 .../accumulators/AccumulatorRegistry.java       | 80 +------------------
 .../accumulators/AccumulatorSnapshot.java       | 15 ----
 .../runtime/executiongraph/AccessExecution.java | 15 +---
 .../executiongraph/AccessExecutionGraph.java    | 11 ---
 .../AccessExecutionJobVertex.java               | 13 ----
 .../executiongraph/ArchivedExecution.java       | 18 ++---
 .../executiongraph/ArchivedExecutionGraph.java  | 21 -----
 .../ArchivedExecutionJobVertex.java             |  8 --
 .../flink/runtime/executiongraph/Execution.java | 24 +++---
 .../runtime/executiongraph/ExecutionGraph.java  | 28 +------
 .../executiongraph/ExecutionJobVertex.java      | 33 --------
 .../api/reader/AbstractRecordReader.java        |  8 --
 .../io/network/api/reader/BufferReader.java     |  6 --
 .../io/network/api/reader/ReaderBase.java       |  6 --
 .../AdaptiveSpanningRecordDeserializer.java     | 31 --------
 .../api/serialization/RecordDeserializer.java   |  6 --
 .../api/serialization/RecordSerializer.java     |  6 --
 .../serialization/SpanningRecordSerializer.java | 13 ----
 ...llingAdaptiveSpanningRecordDeserializer.java | 31 --------
 .../io/network/api/writer/RecordWriter.java     | 10 ---
 .../iterative/task/IterationHeadTask.java       |  4 +-
 .../flink/runtime/operators/BatchTask.java      | 22 ++----
 .../flink/runtime/operators/DataSinkTask.java   |  6 --
 .../flink/runtime/operators/DataSourceTask.java |  6 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  4 +
 .../runtime/taskmanager/TaskExecutionState.java | 13 +++-
 .../flink/runtime/taskmanager/TaskManager.scala |  4 +-
 .../ArchivedExecutionGraphTest.java             | 23 ------
 .../ExecutionGraphDeploymentTest.java           |  7 +-
 .../network/api/reader/AbstractReaderTest.java  |  6 --
 .../testingUtils/TestingJobManagerLike.scala    |  3 +-
 .../TestingJobManagerMessages.scala             |  2 -
 .../runtime/io/StreamInputProcessor.java        |  7 --
 .../runtime/io/StreamTwoInputProcessor.java     |  7 --
 .../runtime/tasks/OneInputStreamTask.java       |  4 -
 .../streaming/runtime/tasks/OperatorChain.java  |  8 +-
 .../streaming/runtime/tasks/StreamTask.java     |  2 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  4 -
 .../operators/StreamOperatorChainingTest.java   |  9 +--
 .../accumulators/AccumulatorLiveITCase.java     | 82 ++------------------
 46 files changed, 207 insertions(+), 652 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index a0afba2..7d2b5b6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -262,12 +262,12 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                        .GET("/jobs", handler(new 
CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT)))
 
-                       .GET("/jobs/:jobid", handler(new 
JobDetailsHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/vertices", handler(new 
JobDetailsHandler(currentGraphs)))
+                       .GET("/jobs/:jobid", handler(new 
JobDetailsHandler(currentGraphs, metricFetcher)))
+                       .GET("/jobs/:jobid/vertices", handler(new 
JobDetailsHandler(currentGraphs, metricFetcher)))
 
-                       .GET("/jobs/:jobid/vertices/:vertexid", handler(new 
JobVertexDetailsHandler(currentGraphs)))
+                       .GET("/jobs/:jobid/vertices/:vertexid", handler(new 
JobVertexDetailsHandler(currentGraphs, metricFetcher)))
                        .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", 
handler(new SubtasksTimesHandler(currentGraphs)))
-                       .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", 
handler(new JobVertexTaskManagersHandler(currentGraphs)))
+                       .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", 
handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher)))
                        .GET("/jobs/:jobid/vertices/:vertexid/accumulators", 
handler(new JobVertexAccumulatorsHandler(currentGraphs)))
                        .GET("/jobs/:jobid/vertices/:vertexid/checkpoints", 
handler(new JobVertexCheckpointsHandler(currentGraphs)))
                        .GET("/jobs/:jobid/vertices/:vertexid/backpressure", 
handler(new JobVertexBackPressureHandler(
@@ -276,8 +276,8 @@ public class WebRuntimeMonitor implements WebMonitor {
                                                        refreshInterval)))
                        .GET("/jobs/:jobid/vertices/:vertexid/metrics", 
handler(new JobVertexMetricsHandler(metricFetcher)))
                        
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new 
SubtasksAllAccumulatorsHandler(currentGraphs)))
-                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new 
SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
-                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", 
handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
+                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new 
SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher)))
+                       
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", 
handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs, 
metricFetcher)))
                        
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
 handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))
 
                        .GET("/jobs/:jobid/plan", handler(new 
JobPlanHandler(currentGraphs)))

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index e7a2a8c..6de6dc5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -20,16 +20,16 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -45,9 +45,12 @@ import java.util.Map;
  * </ul>
  */
 public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
-       
-       public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder) {
+
+       private final MetricFetcher fetcher;
+
+       public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, 
MetricFetcher fetcher) {
                super(executionGraphHolder);
+               this.fetcher = fetcher;
        }
 
        @Override
@@ -124,13 +127,6 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                        ExecutionState jobVertexState = 
                                        
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
ejv.getParallelism());
                        jobVerticesPerState[jobVertexState.ordinal()]++;
-                       
-                       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
metrics = ejv.getAggregatedMetricAccumulators();
-
-                       LongCounter readBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-                       LongCounter writeBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-                       LongCounter readRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-                       LongCounter writeRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
 
                        gen.writeStartObject();
                        gen.writeStringField("id", 
ejv.getJobVertexId().toString());
@@ -148,11 +144,36 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                        }
                        gen.writeEndObject();
                        
+                       long numBytesIn = 0;
+                       long numBytesOut = 0;
+                       long numRecordsIn = 0;
+                       long numRecordsOut = 0;
+
+                       for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
+                               IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+                               if (ioMetrics != null) { // execAttempt is 
already finished, use final metrics stored in ExecutionGraph
+                                       numBytesIn += 
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+                                       numBytesOut += 
ioMetrics.getNumBytesOut();
+                                       numRecordsIn += 
ioMetrics.getNumRecordsIn();
+                                       numRecordsOut += 
ioMetrics.getNumRecordsOut();
+                               } else { // execAttempt is still running, use 
MetricQueryService instead
+                                       fetcher.update();
+                                       MetricStore.SubtaskMetricStore metrics 
= fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), 
ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
+                                       if (metrics != null) {
+                                               numBytesIn += 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+                                               numBytesOut += 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+                                               numRecordsIn += 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+                                               numRecordsOut += 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                                       }
+                               }
+                       }
+
                        gen.writeObjectFieldStart("metrics");
-                       gen.writeNumberField("read-bytes", readBytes != null ? 
readBytes.getLocalValuePrimitive() : -1L);
-                       gen.writeNumberField("write-bytes", writeBytes != null 
? writeBytes.getLocalValuePrimitive() : -1L);
-                       gen.writeNumberField("read-records", readRecords != 
null ? readRecords.getLocalValuePrimitive() : -1L);
-                       gen.writeNumberField("write-records",writeRecords != 
null ? writeRecords.getLocalValuePrimitive() : -1L);
+                       gen.writeNumberField("read-bytes", numBytesIn);
+                       gen.writeNumberField("write-bytes", numBytesOut);
+                       gen.writeNumberField("read-records", numRecordsIn);
+                       gen.writeNumberField("write-records", numRecordsOut);
                        gen.writeEndObject();
                        
                        gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index fbdd86b..14dcd39 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -37,9 +37,12 @@ import java.util.Map;
  * and the runtime and metrics of all its subtasks.
  */
 public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
-       
-       public JobVertexDetailsHandler(ExecutionGraphHolder 
executionGraphHolder) {
+
+       private final MetricFetcher fetcher;
+
+       public JobVertexDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
                super(executionGraphHolder);
+               this.fetcher = fetcher;
        }
 
        @Override
@@ -71,25 +74,6 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                        long endTime = status.isTerminal() ? 
vertex.getStateTimestamp(status) : -1;
                        long duration = startTime > 0 ? ((endTime > 0 ? endTime 
: now) - startTime) : -1;
                        
-                       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-                       LongCounter readBytes;
-                       LongCounter writeBytes;
-                       LongCounter readRecords;
-                       LongCounter writeRecords;
-                       
-                       if (metrics != null) {
-                               readBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-                               writeBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-                               readRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-                               writeRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-                       }
-                       else {
-                               readBytes = null;
-                               writeBytes = null;
-                               readRecords = null;
-                               writeRecords = null;
-                       }
-                       
                        gen.writeStartObject();
                        gen.writeNumberField("subtask", num);
                        gen.writeStringField("status", status.name());
@@ -99,11 +83,34 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                        gen.writeNumberField("end-time", endTime);
                        gen.writeNumberField("duration", duration);
 
+                       IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+                       long numBytesIn = 0;
+                       long numBytesOut = 0;
+                       long numRecordsIn = 0;
+                       long numRecordsOut = 0;
+
+                       if (ioMetrics != null) { // execAttempt is already 
finished, use final metrics stored in ExecutionGraph
+                               numBytesIn = ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
+                               numBytesOut = ioMetrics.getNumBytesOut();
+                               numRecordsIn = ioMetrics.getNumRecordsIn();
+                               numRecordsOut = ioMetrics.getNumRecordsOut();
+                       } else { // execAttempt is still running, use 
MetricQueryService instead
+                               fetcher.update();
+                               MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
+                               if (metrics != null) {
+                                       numBytesIn += 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+                                       numBytesOut += 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+                                       numRecordsIn += 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+                                       numRecordsOut += 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                               }
+                       }
+
                        gen.writeObjectFieldStart("metrics");
-                       gen.writeNumberField("read-bytes", readBytes != null ? 
readBytes.getLocalValuePrimitive() : -1L);
-                       gen.writeNumberField("write-bytes", writeBytes != null 
? writeBytes.getLocalValuePrimitive() : -1L);
-                       gen.writeNumberField("read-records", readRecords != 
null ? readRecords.getLocalValuePrimitive() : -1L);
-                       gen.writeNumberField("write-records",writeRecords != 
null ? writeRecords.getLocalValuePrimitive() : -1L);
+                       gen.writeNumberField("read-bytes", numBytesIn);
+                       gen.writeNumberField("write-bytes", numBytesOut);
+                       gen.writeNumberField("read-records", numRecordsIn);
+                       gen.writeNumberField("write-records", numRecordsOut);
                        gen.writeEndObject();
                        
                        gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index 0e94334..c1fabf8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -19,15 +19,15 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.ArrayList;
@@ -42,8 +42,11 @@ import java.util.Map.Entry;
  */
 public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandler {
 
-       public JobVertexTaskManagersHandler(ExecutionGraphHolder 
executionGraphHolder) {
+       private final MetricFetcher fetcher;
+
+       public JobVertexTaskManagersHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
                super(executionGraphHolder);
+               this.fetcher = fetcher;
        }
 
        @Override
@@ -88,10 +91,10 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                        long endTime = 0;
                        boolean allFinished = true;
 
-                       LongCounter tmReadBytes = new LongCounter();
-                       LongCounter tmWriteBytes = new LongCounter();
-                       LongCounter tmReadRecords = new LongCounter();
-                       LongCounter tmWriteRecords = new LongCounter();
+                       long numBytesIn = 0;
+                       long numBytesOut = 0;
+                       long numRecordsIn = 0;
+                       long numRecordsOut = 0;
 
                        for (AccessExecutionVertex vertex : taskVertices) {
                                final ExecutionState state = 
vertex.getExecutionState();
@@ -106,20 +109,22 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                                allFinished &= state.isTerminal();
                                endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
 
-                               Map<AccumulatorRegistry.Metric, Accumulator<?, 
?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-
-                               if (metrics != null) {
-                                       LongCounter readBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-                                       tmReadBytes.merge(readBytes);
-
-                                       LongCounter writeBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-                                       tmWriteBytes.merge(writeBytes);
-
-                                       LongCounter readRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-                                       tmReadRecords.merge(readRecords);
-
-                                       LongCounter writeRecords = 
(LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-                                       tmWriteRecords.merge(writeRecords);
+                               IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+                               if (ioMetrics != null) { // execAttempt is 
already finished, use final metrics stored in ExecutionGraph
+                                       numBytesIn += 
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+                                       numBytesOut += 
ioMetrics.getNumBytesOut();
+                                       numRecordsIn += 
ioMetrics.getNumRecordsIn();
+                                       numRecordsOut += 
ioMetrics.getNumRecordsOut();
+                               } else { // execAttempt is still running, use 
MetricQueryService instead
+                                       fetcher.update();
+                                       MetricStore.SubtaskMetricStore metrics 
= fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
params.get("vertexid"), vertex.getParallelSubtaskIndex());
+                                       if (metrics != null) {
+                                               numBytesIn += 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+                                               numBytesOut += 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+                                               numRecordsIn += 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+                                               numRecordsOut += 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                                       }
                                }
                        }
 
@@ -152,10 +157,10 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                        gen.writeNumberField("duration", duration);
 
                        gen.writeObjectFieldStart("metrics");
-                       gen.writeNumberField("read-bytes", 
tmReadBytes.getLocalValuePrimitive());
-                       gen.writeNumberField("write-bytes", 
tmWriteBytes.getLocalValuePrimitive());
-                       gen.writeNumberField("read-records", 
tmReadRecords.getLocalValuePrimitive());
-                       gen.writeNumberField("write-records", 
tmWriteRecords.getLocalValuePrimitive());
+                       gen.writeNumberField("read-bytes", numBytesIn);
+                       gen.writeNumberField("write-bytes", numBytesOut);
+                       gen.writeNumberField("read-records", numRecordsIn);
+                       gen.writeNumberField("write-records", numRecordsOut);
                        gen.writeEndObject();
 
                        gen.writeObjectFieldStart("status-counts");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 811bea6..6d09513 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 
 import java.util.Map;
 
@@ -28,8 +29,8 @@ import java.util.Map;
  */
 public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttemptDetailsHandler {
        
-       public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder) {
-               super(executionGraphHolder);
+       public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
+               super(executionGraphHolder, fetcher);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 3cc7376..ca9c7ad 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -20,13 +20,13 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -35,9 +35,12 @@ import java.util.Map;
  * Request handler providing details about a single task execution attempt.
  */
 public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemptRequestHandler {
-       
-       public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder) {
+
+       private final MetricFetcher fetcher;
+
+       public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, MetricFetcher fetcher) {
                super(executionGraphHolder);
+               this.fetcher = fetcher;
        }
 
        @Override
@@ -55,25 +58,6 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                long endTime = status.isTerminal() ? 
execAttempt.getStateTimestamp(status) : -1;
                long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) 
- startTime) : -1;
 
-               Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = 
execAttempt.getFlinkAccumulators();
-               LongCounter readBytes;
-               LongCounter writeBytes;
-               LongCounter readRecords;
-               LongCounter writeRecords;
-
-               if (metrics != null) {
-                       readBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-                       writeBytes = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-                       readRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-                       writeRecords = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-               }
-               else {
-                       readBytes = null;
-                       writeBytes = null;
-                       readRecords = null;
-                       writeRecords = null;
-               }
-
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -86,11 +70,34 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                gen.writeNumberField("end-time", endTime);
                gen.writeNumberField("duration", duration);
 
+               IOMetrics ioMetrics = execAttempt.getIOMetrics();
+
+               long numBytesIn = 0;
+               long numBytesOut = 0;
+               long numRecordsIn = 0;
+               long numRecordsOut = 0;
+               
+               if (ioMetrics != null) { // execAttempt is already finished, 
use final metrics stored in ExecutionGraph
+                       numBytesIn = ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
+                       numBytesOut = ioMetrics.getNumBytesOut();
+                       numRecordsIn = ioMetrics.getNumRecordsIn();
+                       numRecordsOut = ioMetrics.getNumRecordsOut();
+               } else { // execAttempt is still running, use 
MetricQueryService instead
+                       fetcher.update();
+                       MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
params.get("vertexid"), execAttempt.getParallelSubtaskIndex());
+                       if (metrics != null) {
+                               numBytesIn = 
Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + 
Long.valueOf(metrics.getMetric("numBytesInRemote", "0"));
+                               numBytesOut = 
Long.valueOf(metrics.getMetric("numBytesOut", "0"));
+                               numRecordsIn = 
Long.valueOf(metrics.getMetric("numRecordsIn", "0"));
+                               numRecordsOut = 
Long.valueOf(metrics.getMetric("numRecordsOut", "0"));
+                       }
+               }
+               
                gen.writeObjectFieldStart("metrics");
-               gen.writeNumberField("read-bytes", readBytes != null ? 
readBytes.getLocalValuePrimitive() : -1L);
-               gen.writeNumberField("write-bytes", writeBytes != null ? 
writeBytes.getLocalValuePrimitive() : -1L);
-               gen.writeNumberField("read-records", readRecords != null ? 
readRecords.getLocalValuePrimitive() : -1L);
-               gen.writeNumberField("write-records",writeRecords != null ? 
writeRecords.getLocalValuePrimitive() : -1L);
+               gen.writeNumberField("read-bytes", numBytesIn);
+               gen.writeNumberField("write-bytes", numBytesOut);
+               gen.writeNumberField("read-records", numRecordsIn);
+               gen.writeNumberField("write-records", numRecordsOut);
                gen.writeEndObject();
 
                gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
index 44714e7..ce6cb1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
@@ -20,18 +20,16 @@ package org.apache.flink.runtime.accumulators;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 
 /**
- * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ * Main accumulator registry which encapsulates user-defined accumulators.
  */
 public class AccumulatorRegistry {
 
@@ -40,32 +38,13 @@ public class AccumulatorRegistry {
        protected final JobID jobID;
        protected final ExecutionAttemptID taskID;
 
-       /* Flink's internal Accumulator values stored for the executing task. */
-       private final Map<Metric, Accumulator<?, ?>> flinkAccumulators =
-                       new HashMap<Metric, Accumulator<?, ?>>();
-
        /* User-defined Accumulator values stored for the executing task. */
        private final Map<String, Accumulator<?, ?>> userAccumulators =
                        new ConcurrentHashMap<>(4);
 
-       /* The reporter reference that is handed to the reporting tasks. */
-       private final ReadWriteReporter reporter;
-
-       /**
-        * Flink metrics supported
-        */
-       public enum Metric {
-               NUM_RECORDS_IN,
-               NUM_RECORDS_OUT,
-               NUM_BYTES_IN,
-               NUM_BYTES_OUT
-       }
-
-
        public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) {
                this.jobID = jobID;
                this.taskID = taskID;
-               this.reporter = new ReadWriteReporter(flinkAccumulators);
        }
 
        /**
@@ -74,7 +53,7 @@ public class AccumulatorRegistry {
         */
        public AccumulatorSnapshot getSnapshot() {
                try {
-                       return new AccumulatorSnapshot(jobID, taskID, 
flinkAccumulators, userAccumulators);
+                       return new AccumulatorSnapshot(jobID, taskID, 
userAccumulators);
                } catch (Throwable e) {
                        LOG.warn("Failed to serialize accumulators for task.", 
e);
                        return null;
@@ -88,59 +67,4 @@ public class AccumulatorRegistry {
                return userAccumulators;
        }
 
-       /**
-        * Gets the reporter for flink internal metrics.
-        */
-       public Reporter getReadWriteReporter() {
-               return reporter;
-       }
-
-       /**
-        * Interface for Flink's internal accumulators.
-        */
-       public interface Reporter {
-               void reportNumRecordsIn(long value);
-               void reportNumRecordsOut(long value);
-               void reportNumBytesIn(long value);
-               void reportNumBytesOut(long value);
-       }
-
-       /**
-        * Accumulator based reporter for keeping track of internal metrics 
(e.g. bytes and records in/out)
-        */
-       private static class ReadWriteReporter implements Reporter {
-
-               private LongCounter numRecordsIn = new LongCounter();
-               private LongCounter numRecordsOut = new LongCounter();
-               private LongCounter numBytesIn = new LongCounter();
-               private LongCounter numBytesOut = new LongCounter();
-
-               private ReadWriteReporter(Map<Metric, Accumulator<?,?>> 
accumulatorMap) {
-                       accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
-                       accumulatorMap.put(Metric.NUM_RECORDS_OUT, 
numRecordsOut);
-                       accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
-                       accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut);
-               }
-
-               @Override
-               public void reportNumRecordsIn(long value) {
-                       numRecordsIn.add(value);
-               }
-
-               @Override
-               public void reportNumRecordsOut(long value) {
-                       numRecordsOut.add(value);
-               }
-
-               @Override
-               public void reportNumBytesIn(long value) {
-                       numBytesIn.add(value);
-               }
-
-               @Override
-               public void reportNumBytesOut(long value) {
-                       numBytesOut.add(value);
-               }
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
index d0f4bad..0bfb1ac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
@@ -40,21 +40,14 @@ public class AccumulatorSnapshot implements Serializable {
        private final ExecutionAttemptID executionAttemptID;
 
        /**
-        * Flink internal accumulators which can be deserialized using the 
system class loader.
-        */
-       private final SerializedValue<Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> flinkAccumulators;
-
-       /**
         * Serialized user accumulators which may require the custom user class 
loader.
         */
        private final SerializedValue<Map<String, Accumulator<?, ?>>> 
userAccumulators;
 
        public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID 
executionAttemptID,
-                                                       
Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
                                                        Map<String, 
Accumulator<?, ?>> userAccumulators) throws IOException {
                this.jobID = jobID;
                this.executionAttemptID = executionAttemptID;
-               this.flinkAccumulators = new 
SerializedValue<Map<AccumulatorRegistry.Metric, Accumulator<?, 
?>>>(flinkAccumulators);
                this.userAccumulators = new SerializedValue<Map<String, 
Accumulator<?, ?>>>(userAccumulators);
        }
 
@@ -67,14 +60,6 @@ public class AccumulatorSnapshot implements Serializable {
        }
 
        /**
-        * Gets the Flink (internal) accumulators values.
-        * @return the serialized map
-        */
-       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
deserializeFlinkAccumulators() throws IOException, ClassNotFoundException {
-               return 
flinkAccumulators.deserializeValue(getClass().getClassLoader());
-       }
-
-       /**
         * Gets the user-defined accumulators values.
         * @return the serialized map
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
index aefc17d..df558c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -17,14 +17,10 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
-import java.util.Map;
-
 /**
  * Common interface for the runtime {@link Execution and {@link 
ArchivedExecution}.
  */
@@ -88,18 +84,11 @@ public interface AccessExecution {
        StringifiedAccumulatorResult[] getUserAccumulatorsStringified();
 
        /**
-        * Returns the system-defined accumulators.
-        *
-        * @return system-defined accumulators.
-        * @deprecated Will be removed in FLINK-4527
-        */
-       @Deprecated
-       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators();
-
-       /**
         * Returns the subtask index of this execution.
         *
         * @return subtask index of this execution.
         */
        int getParallelSubtaskIndex();
+
+       IOMetrics getIOMetrics();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 0fd97da..e7fe1b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -18,8 +18,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
@@ -153,15 +151,6 @@ public interface AccessExecutionGraph {
        Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws 
IOException;
 
        /**
-        * Returns the aggregated system-defined accumulators.
-        *
-        * @return aggregated system-defined accumulators.
-        * @deprecated Will be removed in FLINK-4527
-        */
-       @Deprecated
-       Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, 
?>>> getFlinkAccumulators();
-
-       /**
         * Returns whether this execution graph was archived.
         *
         * @return true, if the execution graph was archived, false otherwise

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
index c9bf604..92af0c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -17,16 +17,12 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import scala.Option;
 
-import java.util.Map;
-
 /**
  * Common interface for the runtime {@link ExecutionJobVertex} and {@link 
ArchivedExecutionJobVertex}.
  */
@@ -81,15 +77,6 @@ public interface AccessExecutionJobVertex {
        Option<OperatorCheckpointStats> getCheckpointStats();
 
        /**
-        * Returns the aggregated system-defined accumulators.
-        *
-        * @return aggregated system-defined accumulators.
-        * @deprecated Will be removed in FLINK-4527
-        */
-       @Deprecated
-       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getAggregatedMetricAccumulators();
-
-       /**
         * Returns the aggregated user-defined accumulators as strings.
         *
         * @return aggregated user-defined accumulators as strings.

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index 0b2992f..c189d42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -17,15 +17,12 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.Serializable;
-import java.util.Map;
 
 public class ArchivedExecution implements AccessExecution, Serializable {
        private static final long serialVersionUID = 4817108757483345173L;
@@ -46,13 +43,12 @@ public class ArchivedExecution implements AccessExecution, 
Serializable {
        /* Continuously updated map of user-defined accumulators */
        private final StringifiedAccumulatorResult[] userAccumulators;
 
-       /* Continuously updated map of internal accumulators */
-       private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators;
        private final int parallelSubtaskIndex;
 
+       private final IOMetrics ioMetrics;
+
        public ArchivedExecution(Execution execution) {
                this.userAccumulators = 
execution.getUserAccumulatorsStringified();
-               this.flinkAccumulators = execution.getFlinkAccumulators();
                this.attemptId = execution.getAttemptId();
                this.attemptNumber = execution.getAttemptNumber();
                this.stateTimestamps = execution.getStateTimestamps();
@@ -60,6 +56,7 @@ public class ArchivedExecution implements AccessExecution, 
Serializable {
                this.state = execution.getState();
                this.failureCause = 
ExceptionUtils.stringifyException(execution.getFailureCause());
                this.assignedResourceLocation = 
execution.getAssignedResourceLocation();
+               this.ioMetrics = execution.getIOMetrics();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -106,13 +103,14 @@ public class ArchivedExecution implements 
AccessExecution, Serializable {
                return userAccumulators;
        }
 
-       @Override
-       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators() {
-               return flinkAccumulators;
-       }
 
        @Override
        public int getParallelSubtaskIndex() {
                return parallelSubtaskIndex;
        }
+
+       @Override
+       public IOMetrics getIOMetrics() {
+               return ioMetrics;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index d8c58c8..0bd5319 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -19,8 +19,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -31,7 +29,6 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -208,24 +205,6 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                return tracker;
        }
 
-       /**
-        * Gets the internal flink accumulator map of maps which contains some 
metrics.
-        *
-        * @return A map of accumulators for every executed task.
-        */
-       @Override
-       public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> getFlinkAccumulators() {
-               Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> flinkAccumulators =
-                       new HashMap<>();
-
-               for (AccessExecutionVertex vertex : getAllExecutionVertices()) {
-                       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-                       
flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), 
taskAccs);
-               }
-
-               return flinkAccumulators;
-       }
-
        @Override
        public boolean isArchived() {
                return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index 4857bf5..8ae6bbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
@@ -46,7 +45,6 @@ public class ArchivedExecutionJobVertex implements 
AccessExecutionJobVertex, Ser
 
        private final int maxParallelism;
 
-       private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
aggregatedMetricAccumulators;
        private final Option<OperatorCheckpointStats> checkpointStats;
        private final StringifiedAccumulatorResult[] archivedUserAccumulators;
 
@@ -56,8 +54,6 @@ public class ArchivedExecutionJobVertex implements 
AccessExecutionJobVertex, Ser
                        taskVertices[x] = 
jobVertex.getTaskVertices()[x].archive();
                }
 
-               aggregatedMetricAccumulators = 
jobVertex.getAggregatedMetricAccumulators();
-
                Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = 
new HashMap<>();
                for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
                        Map<String, Accumulator<?, ?>> next = 
vertex.getCurrentExecutionAttempt().getUserAccumulators();
@@ -116,10 +112,6 @@ public class ArchivedExecutionJobVertex implements 
AccessExecutionJobVertex, Ser
                return getAggregateJobVertexState(num, parallelism);
        }
 
-       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getAggregatedMetricAccumulators() {
-               return this.aggregatedMetricAccumulators;
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Static / pre-assigned input splits
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 17e0df1..788dee4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,7 +23,6 @@ import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -143,9 +142,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        /* Continuously updated map of user-defined accumulators */
        private volatile Map<String, Accumulator<?, ?>> userAccumulators;
-
-       /* Continuously updated map of internal accumulators */
-       private volatile Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators;
+       private IOMetrics ioMetrics;
 
        // 
--------------------------------------------------------------------------------------------
        
@@ -651,7 +648,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                markFinished(null, null);
        }
 
-       void markFinished(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) {
+       void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, 
IOMetrics metrics) {
 
                // this call usually comes during RUNNING, but may also come 
while still in deploying (very fast tasks!)
                while (true) {
@@ -673,9 +670,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                                }
 
                                                synchronized (accumulatorLock) {
-                                                       this.flinkAccumulators 
= flinkAccumulators;
                                                        this.userAccumulators = 
userAccumulators;
                                                }
+                                               this.ioMetrics = metrics;
 
                                                assignedResource.releaseSlot();
                                                
vertex.getExecutionGraph().deregisterExecution(this);
@@ -1010,14 +1007,11 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        
        /**
         * Update accumulators (discarded when the Execution has already been 
terminated).
-        * @param flinkAccumulators the flink internal accumulators
         * @param userAccumulators the user accumulators
         */
-       public void setAccumulators(Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators,
-                                                               Map<String, 
Accumulator<?, ?>> userAccumulators) {
+       public void setAccumulators(Map<String, Accumulator<?, ?>> 
userAccumulators) {
                synchronized (accumulatorLock) {
                        if (!state.isTerminal()) {
-                               this.flinkAccumulators = flinkAccumulators;
                                this.userAccumulators = userAccumulators;
                        }
                }
@@ -1033,14 +1027,14 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        @Override
-       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators() {
-               return flinkAccumulators;
-       }
-
-       @Override
        public int getParallelSubtaskIndex() {
                return getVertex().getParallelSubtaskIndex();
        }
+               
+       @Override
+       public IOMetrics getIOMetrics() {
+               return ioMetrics;
+       }
 
        // 
------------------------------------------------------------------------
        //  Standard utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0a79cf2..074a04d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -29,7 +29,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -600,23 +599,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        }
 
        /**
-        * Gets the internal flink accumulator map of maps which contains some 
metrics.
-        * @return A map of accumulators for every executed task.
-        */
-       @Override
-       public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?,?>>> getFlinkAccumulators() {
-               Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> flinkAccumulators =
-                               new HashMap<ExecutionAttemptID, 
Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
-
-               for (ExecutionVertex vertex : getAllExecutionVertices()) {
-                       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
-                       
flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), 
taskAccs);
-               }
-
-               return flinkAccumulators;
-       }
-
-       /**
         * Merges all accumulator results from the tasks previously executed in 
the Executions.
         * @return The accumulator map
         */
@@ -1075,7 +1057,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        /**
         * Updates the state of one of the ExecutionVertex's Execution attempts.
-        * If the new status if "FINISHED", this also updates the
+        * If the new status if "FINISHED", this also updates the accumulators.
         * 
         * @param state The state update.
         * @return True, if the task update was properly applied, false, if the 
execution attempt was not found.
@@ -1090,11 +1072,9 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                case FINISHED:
                                        try {
                                                AccumulatorSnapshot 
accumulators = state.getAccumulators();
-                                               Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators =
-                                                       
accumulators.deserializeFlinkAccumulators();
                                                Map<String, Accumulator<?, ?>> 
userAccumulators =
                                                        
accumulators.deserializeUserAccumulators(userClassLoader);
-                                               
attempt.markFinished(flinkAccumulators, userAccumulators);
+                                               
attempt.markFinished(userAccumulators, state.getIOMetrics());
                                        }
                                        catch (Exception e) {
                                                LOG.error("Failed to 
deserialize final accumulator results.", e);
@@ -1160,16 +1140,14 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         * @param accumulatorSnapshot The serialized flink and user-defined 
accumulators
         */
        public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) 
{
-               Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators;
                Map<String, Accumulator<?, ?>> userAccumulators;
                try {
-                       flinkAccumulators = 
accumulatorSnapshot.deserializeFlinkAccumulators();
                        userAccumulators = 
accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
 
                        ExecutionAttemptID execID = 
accumulatorSnapshot.getExecutionAttemptID();
                        Execution execution = currentExecutions.get(execID);
                        if (execution != null) {
-                               execution.setAccumulators(flinkAccumulators, 
userAccumulators);
+                               execution.setAccumulators(userAccumulators);
                        } else {
                                LOG.warn("Received accumulator result for 
unknown execution {}.", execID);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index e7f16a2..2d9ec88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,13 +20,11 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
@@ -443,37 +441,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
        // 
--------------------------------------------------------------------------------------------
        //  Accumulators / Metrics
        // 
--------------------------------------------------------------------------------------------
-       
-       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getAggregatedMetricAccumulators() {
-               // some specialized code to speed things up
-               long bytesRead = 0;
-               long bytesWritten = 0;
-               long recordsRead = 0;
-               long recordsWritten = 0;
-               
-               for (ExecutionVertex v : getTaskVertices()) {
-                       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
metrics = v.getCurrentExecutionAttempt().getFlinkAccumulators();
-                       
-                       if (metrics != null) {
-                               LongCounter br = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN);
-                               LongCounter bw = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT);
-                               LongCounter rr = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN);
-                               LongCounter rw = (LongCounter) 
metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
-                               
-                               bytesRead += br != null ? 
br.getLocalValuePrimitive() : 0;
-                               bytesWritten += bw != null ? 
bw.getLocalValuePrimitive() : 0;
-                               recordsRead += rr != null ? 
rr.getLocalValuePrimitive() : 0;
-                               recordsWritten += rw != null ? 
rw.getLocalValuePrimitive() : 0;
-                       }
-               }
-
-               HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>> agg = 
new HashMap<>();
-               agg.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new 
LongCounter(bytesRead));
-               agg.put(AccumulatorRegistry.Metric.NUM_BYTES_OUT, new 
LongCounter(bytesWritten));
-               agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_IN, new 
LongCounter(recordsRead));
-               agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_OUT, new 
LongCounter(recordsWritten));
-               return agg;
-       }
 
        public StringifiedAccumulatorResult[] 
getAggregatedUserAccumulatorsStringified() {
                Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<String, Accumulator<?, ?>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index e0fe355..c5aeef7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -123,11 +122,4 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                        }
                }
        }
-
-       @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       deserializer.setReporter(reporter);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index debb352..ca59609 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -48,9 +47,4 @@ public final class BufferReader extends AbstractReader {
                        }
                }
        }
-
-       @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index a1d705f..0cc77f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
 
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
@@ -52,9 +51,4 @@ public interface ReaderBase {
 
        boolean hasReachedEndOfSuperstep();
 
-       /**
-        * Setter for the reporter, e.g. for the number of records emitted and 
the number of bytes read.
-        */
-       void setReporter(AccumulatorRegistry.Reporter reporter);
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index cdd8731..8f2c8fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -46,8 +45,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
 
        private Buffer currentBuffer;
 
-       private AccumulatorRegistry.Reporter reporter;
-
        public AdaptiveSpanningRecordDeserializer() {
                this.nonSpanningWrapper = new NonSpanningWrapper();
                this.spanningWrapper = new SpanningWrapper();
@@ -93,18 +90,10 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                if (nonSpanningRemaining >= 4) {
                        int len = this.nonSpanningWrapper.readInt();
 
-                       if (reporter != null) {
-                               reporter.reportNumBytesIn(len);
-                       }
-
                        if (len <= nonSpanningRemaining - 4) {
                                // we can get a full record from here
                                target.read(this.nonSpanningWrapper);
 
-                               if (reporter != null) {
-                                       reporter.reportNumRecordsIn(1);
-                               }
-
                                return (this.nonSpanningWrapper.remaining() == 
0) ?
                                                
DeserializationResult.LAST_RECORD_FROM_BUFFER :
                                                
DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -128,10 +117,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                        // get the full record
                        target.read(this.spanningWrapper);
 
-                       if (reporter != null) {
-                               reporter.reportNumRecordsIn(1);
-                       }
-
                        // move the remainder to the non-spanning wrapper
                        // this does not copy it, only sets the memory segment
                        
this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
@@ -159,12 +144,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                return this.nonSpanningWrapper.remaining() > 0 || 
this.spanningWrapper.getNumGatheredBytes() > 0;
        }
 
-       @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               this.reporter = reporter;
-               this.spanningWrapper.setReporter(reporter);
-       }
-
        // 
-----------------------------------------------------------------------------------------------------------------
 
        private static final class NonSpanningWrapper implements DataInputView {
@@ -447,8 +426,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
 
                private int recordLimit;
 
-               private AccumulatorRegistry.Reporter reporter;
-
                public SpanningWrapper() {
                        this.lengthBuffer = ByteBuffer.allocate(4);
                        this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
@@ -486,10 +463,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                                } else {
                                        this.recordLength = 
this.lengthBuffer.getInt(0);
 
-                                       if (reporter != null) {
-                                               
reporter.reportNumBytesIn(this.recordLength);
-                                       }
-
                                        this.lengthBuffer.clear();
                                        segmentPosition = toPut;
                                }
@@ -634,9 +607,5 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                public int read(byte[] b) throws IOException {
                        return this.serializationReadBuffer.read(b);
                }
-
-               public void setReporter(AccumulatorRegistry.Reporter reporter) {
-                       this.reporter = reporter;
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index e4c7890..dd8ea06 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -65,9 +64,4 @@ public interface RecordDeserializer<T extends 
IOReadableWritable> {
        void clear();
        
        boolean hasUnfinishedData();
-
-       /**
-        * Setter for the reporter, e.g. for the number of records emitted and 
the number of bytes read.
-        */
-       void setReporter(AccumulatorRegistry.Reporter reporter);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index c76dd00..e8179dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -67,11 +66,6 @@ public interface RecordSerializer<T extends 
IOReadableWritable> {
        boolean hasData();
 
        /**
-        * Setter for the reporter, e.g. for the number of records emitted and 
the number of bytes read.
-        */
-       void setReporter(AccumulatorRegistry.Reporter reporter);
-
-       /**
         * Insantiates all metrics.
         *
         * @param metrics metric group

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 7c4d937..e36a16f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -26,7 +26,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
@@ -53,8 +52,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        /** Limit of current {@link MemorySegment} of target buffer */
        private int limit;
 
-       private AccumulatorRegistry.Reporter reporter;
-
        private transient Counter numBytesOut;
 
        public SpanningRecordSerializer() {
@@ -84,11 +81,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
 
                int len = this.serializationBuffer.length();
                this.lengthBuffer.putInt(0, len);
-
-               if (reporter != null) {
-                       reporter.reportNumBytesOut(len);
-                       reporter.reportNumRecordsOut(1);
-               }
                
                if (numBytesOut != null) {
                        numBytesOut.inc(len);
@@ -192,11 +184,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        }
 
        @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               this.reporter = reporter;
-       }
-
-       @Override
        public void instantiateMetrics(TaskIOMetricGroup metrics) {
                numBytesOut = metrics.getNumBytesOutCounter();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index eab8e7c..7c213b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;
@@ -59,8 +58,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
        private Buffer currentBuffer;
 
-       private AccumulatorRegistry.Reporter reporter;
-
        public SpillingAdaptiveSpanningRecordDeserializer(String[] 
tmpDirectories) {
                this.nonSpanningWrapper = new NonSpanningWrapper();
                this.spanningWrapper = new SpanningWrapper(tmpDirectories);
@@ -106,19 +103,11 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                if (nonSpanningRemaining >= 4) {
                        int len = this.nonSpanningWrapper.readInt();
 
-                       if (reporter != null) {
-                               reporter.reportNumBytesIn(len);
-                       }
-
                        if (len <= nonSpanningRemaining - 4) {
                                // we can get a full record from here
                                try {
                                        target.read(this.nonSpanningWrapper);
 
-                                       if (reporter != null) {
-                                               reporter.reportNumRecordsIn(1);
-                                       }
-
                                        int remaining = 
this.nonSpanningWrapper.remaining();
                                        if (remaining > 0) {
                                                return 
DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -153,10 +142,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                if (this.spanningWrapper.hasFullRecord()) {
                        // get the full record
                        target.read(this.spanningWrapper.getInputView());
-
-                       if (reporter != null) {
-                               reporter.reportNumRecordsIn(1);
-                       }
                        
                        // move the remainder to the non-spanning wrapper
                        // this does not copy it, only sets the memory segment
@@ -182,12 +167,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                return this.nonSpanningWrapper.remaining() > 0 || 
this.spanningWrapper.getNumGatheredBytes() > 0;
        }
 
-       @Override
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               this.reporter = reporter;
-               this.spanningWrapper.setReporter(reporter);
-       }
-
 
        // 
-----------------------------------------------------------------------------------------------------------------
        
@@ -483,8 +462,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                
                private DataInputViewStreamWrapper spillFileReader;
 
-               private AccumulatorRegistry.Reporter reporter;
-
                public SpanningWrapper(String[] tempDirs) {
                        this.tempDirs = tempDirs;
                        
@@ -538,10 +515,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                } else {
                                        this.recordLength = 
this.lengthBuffer.getInt(0);
 
-                                       if (reporter != null) {
-                                               
reporter.reportNumBytesIn(recordLength);
-                                       }
-
                                        this.lengthBuffer.clear();
                                        segmentPosition = toPut;
                                        
@@ -672,9 +645,5 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        random.nextBytes(bytes);
                        return StringUtils.byteToHexString(bytes);
                }
-
-               public void setReporter(AccumulatorRegistry.Reporter reporter) {
-                       this.reporter = reporter;
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 96eea23..1e224c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
@@ -201,15 +200,6 @@ public class RecordWriter<T extends IOReadableWritable> {
        }
 
        /**
-        * Counter for the number of records emitted and the records processed.
-        */
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               for(RecordSerializer<?> serializer : serializers) {
-                       serializer.setReporter(reporter);
-               }
-       }
-
-       /**
         * Sets the metric group for this RecordWriter.
         * @param metrics
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 4bc4532..2e3285c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.operators.Driver;
@@ -114,9 +113,8 @@ public class IterationHeadTask<X, Y, S extends Function, 
OT> extends AbstractIte
                List<RecordWriter<?>> finalOutputWriters = new 
ArrayList<RecordWriter<?>>();
                final TaskConfig finalOutConfig = 
this.config.getIterationHeadFinalOutputConfig();
                final ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
-               AccumulatorRegistry.Reporter reporter = 
getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
                this.finalOutputCollector = BatchTask.getOutputCollector(this, 
finalOutConfig,
-                               userCodeClassLoader, finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
+                               userCodeClassLoader, finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs());
 
                // sanity check the setup
                final int writersIntoStepFunction = this.eventualOutputs.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index e896639..f748079 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -654,9 +653,6 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
 
                int currentReaderOffset = 0;
 
-               AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
-               AccumulatorRegistry.Reporter reporter = 
registry.getReadWriteReporter();
-
                for (int i = 0; i < numInputs; i++) {
                        //  ---------------- create the input readers 
---------------------
                        // in case where a logical input unions multiple 
physical inputs, create a union reader
@@ -680,8 +676,6 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                                throw new Exception("Illegal input group size 
in task configuration: " + groupSize);
                        }
 
-                       inputReaders[i].setReporter(reporter);
-
                        currentReaderOffset += groupSize;
                }
                this.inputReaders = inputReaders;
@@ -1015,13 +1009,10 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
 
                ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 
-               AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
-               AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
-
-               this.accumulatorMap = accumulatorRegistry.getUserMap();
+               this.accumulatorMap = 
getEnvironment().getAccumulatorRegistry().getUserMap();
 
                this.output = initOutputs(this, userCodeClassLoader, 
this.config, this.chainedTasks, this.eventualOutputs,
-                               this.getExecutionConfig(), reporter, 
this.accumulatorMap);
+                               this.getExecutionConfig(), this.accumulatorMap);
        }
 
        public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup 
metrics) {
@@ -1215,7 +1206,7 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
         * @return The OutputCollector that data produced in this task is 
submitted to.
         */
        public static <T> Collector<T> getOutputCollector(AbstractInvokable 
task, TaskConfig config, ClassLoader cl,
-                       List<RecordWriter<?>> eventualOutputs, int 
outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws 
Exception
+                       List<RecordWriter<?>> eventualOutputs, int 
outputOffset, int numOutputs) throws Exception
        {
                if (numOutputs == 0) {
                        return null;
@@ -1248,8 +1239,6 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                        final RecordWriter<SerializationDelegate<T>> 
recordWriter =
                                        new 
RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset
 + i), oe);
 
-                       // setup live accumulator counters
-                       recordWriter.setReporter(reporter);
                        
recordWriter.setMetricGroup(task.getEnvironment().getMetricGroup().getIOMetricGroup());
 
                        writers.add(recordWriter);
@@ -1269,7 +1258,6 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                                                                                
List<ChainedDriver<?, ?>> chainedTasksTarget,
                                                                                
List<RecordWriter<?>> eventualOutputs,
                                                                                
ExecutionConfig executionConfig,
-                                                                               
AccumulatorRegistry.Reporter reporter,
                                                                                
Map<String, Accumulator<?,?>> accumulatorMap)
        throws Exception
        {
@@ -1304,7 +1292,7 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
 
                                if (i == numChained - 1) {
                                        // last in chain, instantiate the 
output collector for this task
-                                       previous = 
getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, 
chainedStubConf.getNumOutputs(), reporter);
+                                       previous = 
getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, 
chainedStubConf.getNumOutputs());
                                }
 
                                ct.setup(chainedStubConf, taskName, previous, 
containingTask, cl, executionConfig, accumulatorMap);
@@ -1322,7 +1310,7 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                // else
 
                // instantiate the output collector the default way from this 
configuration
-               return getOutputCollector(containingTask , config, cl, 
eventualOutputs, 0, numOutputs, reporter);
+               return getOutputCollector(containingTask , config, cl, 
eventualOutputs, 0, numOutputs);
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index eb7999c..bd052f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
@@ -354,11 +353,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                } else {
                        throw new Exception("Illegal input group size in task 
configuration: " + groupSize);
                }
-
-               final AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
-               final AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
-
-               inputReader.setReporter(reporter);
                
                this.inputTypeSerializerFactory = 
this.config.getInputSerializer(0, getUserCodeClassLoader());
                @SuppressWarnings({ "rawtypes" })

Reply via email to