http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index e89559d..39d3453 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -284,11 +283,8 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
                this.eventualOutputs = new ArrayList<RecordWriter<?>>();
 
-               final AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
-               final AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
-
                this.output = BatchTask.initOutputs(this, cl, this.config, 
this.chainedTasks, this.eventualOutputs,
-                               getExecutionConfig(), reporter, 
getEnvironment().getAccumulatorRegistry().getUserMap());
+                               getExecutionConfig(), 
getEnvironment().getAccumulatorRegistry().getUserMap());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7ce9b0b..3a5c96f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -414,6 +414,10 @@ public class Task implements Runnable, TaskActions {
                return accumulatorRegistry;
        }
 
+       public TaskMetricGroup getMetricGroup() {
+               return metrics;
+       }
+
        public Thread getExecutingThread() {
                return executingThread;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 60aadf5..7bea5d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
@@ -48,6 +49,7 @@ public class TaskExecutionState implements 
java.io.Serializable {
 
        /** Serialized flink and user-defined accumulators */
        private final AccumulatorSnapshot accumulators;
+       private final IOMetrics ioMetrics;
 
        /**
         * Creates a new task execution state update, with no attached 
exception and no accumulators.
@@ -60,7 +62,7 @@ public class TaskExecutionState implements 
java.io.Serializable {
         *        the execution state to be reported
         */
        public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, 
ExecutionState executionState) {
-               this(jobID, executionId, executionState, null, null);
+               this(jobID, executionId, executionState, null, null, null);
        }
 
        /**
@@ -75,7 +77,7 @@ public class TaskExecutionState implements 
java.io.Serializable {
         */
        public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
                                                        ExecutionState 
executionState, Throwable error) {
-               this(jobID, executionId, executionState, error, null);
+               this(jobID, executionId, executionState, error, null, null);
        }
 
        /**
@@ -95,7 +97,7 @@ public class TaskExecutionState implements 
java.io.Serializable {
         */
        public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
                        ExecutionState executionState, Throwable error,
-                       AccumulatorSnapshot accumulators) {
+                       AccumulatorSnapshot accumulators, IOMetrics ioMetrics) {
 
                if (jobID == null || executionId == null || executionState == 
null) {
                        throw new NullPointerException();
@@ -110,6 +112,7 @@ public class TaskExecutionState implements 
java.io.Serializable {
                        this.throwable = null;
                }
                this.accumulators = accumulators;
+               this.ioMetrics = ioMetrics;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -164,6 +167,10 @@ public class TaskExecutionState implements 
java.io.Serializable {
                return accumulators;
        }
 
+       public IOMetrics getIOMetrics() {
+               return ioMetrics;
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7608b87..ce763e7 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -23,7 +23,6 @@ import java.lang.management.ManagementFactory
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
-import java.util.concurrent.TimeUnit
 
 import _root_.akka.actor._
 import _root_.akka.pattern.ask
@@ -1296,7 +1295,8 @@ class TaskManager(
             task.getExecutionId,
             task.getExecutionState,
             task.getFailureCause,
-            accumulators)
+            accumulators,
+            task.getMetricGroup.getIOMetricGroup.createSnapshot())
         )
       )
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index aac2d00..ff50573 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -66,8 +65,6 @@ public class ArchivedExecutionGraphTest {
        private static JobVertexID v1ID = new JobVertexID();
        private static JobVertexID v2ID = new JobVertexID();
 
-       private static ExecutionAttemptID executionWithAccumulatorsID;
-
        private static ExecutionGraph runtimeGraph;
 
        @BeforeClass
@@ -119,15 +116,10 @@ public class ArchivedExecutionGraphTest {
                        null,
                        new TestCheckpointStatsTracker());
 
-               Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators = new HashMap<>();
-               flinkAccumulators.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, 
new LongCounter(32));
-
                Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<>();
                userAccumulators.put("userAcc", new LongCounter(64));
 
                Execution executionWithAccumulators = 
runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt();
-               executionWithAccumulators.setAccumulators(flinkAccumulators, 
userAccumulators);
-               executionWithAccumulatorsID = 
executionWithAccumulators.getAttemptId();
 
                
runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new
 RuntimeException("This exception was thrown on purpose."));
        }
@@ -200,7 +192,6 @@ public class ArchivedExecutionGraphTest {
                // 
-------------------------------------------------------------------------------------------------------------
                
compareStringifiedAccumulators(runtimeGraph.getAccumulatorResultsStringified(), 
archivedGraph.getAccumulatorResultsStringified());
                
compareSerializedAccumulators(runtimeGraph.getAccumulatorsSerialized(), 
archivedGraph.getAccumulatorsSerialized());
-               
compareFlinkAccumulators(runtimeGraph.getFlinkAccumulators().get(executionWithAccumulatorsID),
 archivedGraph.getFlinkAccumulators().get(executionWithAccumulatorsID));
 
                // 
-------------------------------------------------------------------------------------------------------------
                // JobVertices
@@ -250,7 +241,6 @@ public class ArchivedExecutionGraphTest {
                
compareOperatorCheckpointStats(runtimeJobVertex.getCheckpointStats().get(), 
archivedJobVertex.getCheckpointStats().get());
 
                
compareStringifiedAccumulators(runtimeJobVertex.getAggregatedUserAccumulatorsStringified(),
 archivedJobVertex.getAggregatedUserAccumulatorsStringified());
-               
compareFlinkAccumulators(runtimeJobVertex.getAggregatedMetricAccumulators(), 
archivedJobVertex.getAggregatedMetricAccumulators());
 
                AccessExecutionVertex[] runtimeExecutionVertices = 
runtimeJobVertex.getTaskVertices();
                AccessExecutionVertex[] archivedExecutionVertices = 
archivedJobVertex.getTaskVertices();
@@ -294,7 +284,6 @@ public class ArchivedExecutionGraphTest {
                
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELED), 
archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
                
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FAILED), 
archivedExecution.getStateTimestamp(ExecutionState.FAILED));
                
compareStringifiedAccumulators(runtimeExecution.getUserAccumulatorsStringified(),
 archivedExecution.getUserAccumulatorsStringified());
-               
compareFlinkAccumulators(runtimeExecution.getFlinkAccumulators(), 
archivedExecution.getFlinkAccumulators());
                assertEquals(runtimeExecution.getParallelSubtaskIndex(), 
archivedExecution.getParallelSubtaskIndex());
        }
 
@@ -321,18 +310,6 @@ public class ArchivedExecutionGraphTest {
                }
        }
 
-       private static void 
compareFlinkAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
runtimeAccs, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> archivedAccs) {
-               assertEquals(runtimeAccs == null, archivedAccs == null);
-               if (runtimeAccs != null && archivedAccs != null) {
-                       assertEquals(runtimeAccs.size(), archivedAccs.size());
-                       for (Map.Entry<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> runtimeAcc : runtimeAccs.entrySet()) {
-                               Accumulator<?, ?> archivedAcc = 
archivedAccs.get(runtimeAcc.getKey());
-
-                               
assertEquals(runtimeAcc.getValue().getLocalValue(), 
archivedAcc.getLocalValue());
-                       }
-               }
-       }
-
        private static void 
compareOperatorCheckpointStats(OperatorCheckpointStats runtimeStats, 
OperatorCheckpointStats archivedStats) {
                assertEquals(runtimeStats.getNumberOfSubTasks(), 
archivedStats.getNumberOfSubTasks());
                assertEquals(runtimeStats.getCheckpointId(), 
archivedStats.getCheckpointId());

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 2d0ae41..2a287af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -27,15 +27,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -50,7 +46,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -335,7 +330,7 @@ public class ExecutionGraphDeploymentTest {
 
                ExecutionAttemptID attemptID = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
                eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.RUNNING));
-               eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.FINISHED, null, new AccumulatorSnapshot(jobId, attemptID, new 
HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>>(), new HashMap<String, 
Accumulator<?, ?>>())));
+               eg.updateState(new TaskExecutionState(jobId, attemptID, 
ExecutionState.FINISHED, null));
 
                assertEquals(JobStatus.FAILED, eg.getState());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 6853722..c6c0645 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -184,10 +183,5 @@ public class AbstractReaderTest {
                protected MockReader(InputGate inputGate) {
                        super(inputGate);
                }
-
-               @Override
-               public void setReporter(AccumulatorRegistry.Reporter reporter) {
-
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index d775869..a1f7c2d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -226,10 +226,9 @@ trait TestingJobManagerLike extends FlinkActor {
         case (jobID, (updated, actors)) if updated =>
           currentJobs.get(jobID) match {
             case Some((graph, jobInfo)) =>
-              val flinkAccumulators = graph.getFlinkAccumulators
               val userAccumulators = graph.aggregateUserAccumulators
               actors foreach {
-                 actor => actor ! UpdatedAccumulators(jobID, 
flinkAccumulators, userAccumulators)
+                 actor => actor ! UpdatedAccumulators(jobID, userAccumulators)
               }
             case None =>
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 72cf58b..48c2bfb 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -23,7 +23,6 @@ import java.util.Map
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.accumulators.Accumulator
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
 import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, 
ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.ActorGateway
@@ -73,7 +72,6 @@ object TestingJobManagerMessages {
    * Reports updated accumulators back to the listener.
    */
   case class UpdatedAccumulators(jobID: JobID,
-    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, 
Accumulator[_,_]]],
     userAccumulators: Map[String, Accumulator[_,_]])
 
   /** Notifies the sender when the [[TestingJobManager]] has been elected as 
the leader

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 5a27564..714317d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -27,7 +27,6 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -204,12 +203,6 @@ public class StreamInputProcessor<IN> {
                        }
                }
        }
-       
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       deserializer.setReporter(reporter);
-               }
-       }
 
        /**
         * Sets the metric group for this StreamInputProcessor.

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 075d9e0..5f7ffe4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -269,12 +268,6 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        }
                }
        }
-       
-       public void setReporter(AccumulatorRegistry.Reporter reporter) {
-               for (RecordDeserializer<?> deserializer : recordDeserializers) {
-                       deserializer.setReporter(reporter);
-               }
-       }
 
        /**
         * Sets the metric group for this StreamTwoInputProcessor.

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 0f8f4a4..68d3064 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -48,9 +47,6 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        getEnvironment().getIOManager());
 
                        // make sure that stream tasks report their I/O 
statistics
-                       AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
-                       AccumulatorRegistry.Reporter reporter = 
registry.getReadWriteReporter();
-                       inputProcessor.setReporter(reporter);
                        
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 2a4a065..30bc377 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -28,7 +28,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -74,7 +73,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
 
        private final OP headOperator;
 
-       public OperatorChain(StreamTask<OUT, OP> containingTask, 
AccumulatorRegistry.Reporter reporter) {
+       public OperatorChain(StreamTask<OUT, OP> containingTask) {
                
                final ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
                final StreamConfig configuration = 
containingTask.getConfiguration();
@@ -99,7 +98,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
                                
                                RecordWriterOutput<?> streamOutput = 
createStreamOutput(
                                                outEdge, 
chainedConfigs.get(outEdge.getSourceId()), i,
-                                               
containingTask.getEnvironment(), reporter, containingTask.getName());
+                                               
containingTask.getEnvironment(), containingTask.getName());
        
                                this.streamOutputs[i] = streamOutput;
                                streamOutputMap.put(outEdge, streamOutput);
@@ -306,7 +305,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
        private static <T> RecordWriterOutput<T> createStreamOutput(
                        StreamEdge edge, StreamConfig upStreamConfig, int 
outputIndex,
                        Environment taskEnvironment,
-                       AccumulatorRegistry.Reporter reporter, String taskName)
+                       String taskName)
        {
                TypeSerializer<T> outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
 
@@ -319,7 +318,6 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> {
 
                StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> 
output = 
                                new StreamRecordWriter<>(bufferWriter, 
outputPartitioner, upStreamConfig.getBufferTimeout());
-               output.setReporter(reporter);
                
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
                
                return new RecordWriterOutput<>(output, outSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index cd63102..fa7d1b0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -227,7 +227,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                timerService = new 
SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
                        }
 
-                       operatorChain = new OperatorChain<>(this, 
getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
+                       operatorChain = new OperatorChain<>(this);
                        headOperator = operatorChain.getHeadOperator();
 
                        
getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index d695781..978c9f2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -73,9 +72,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                                getEnvironment().getIOManager());
 
                // make sure that stream tasks report their I/O statistics
-               AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
-               AccumulatorRegistry.Reporter reporter = 
registry.getReadWriteReporter();
-               this.inputProcessor.setReporter(reporter);
                
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index 32e8ea9..c95a85e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.operators;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -149,9 +148,7 @@ public class StreamOperatorChainingTest {
                StreamTask<Integer, StreamMap<Integer, Integer>> mockTask =
                                createMockTask(streamConfig, 
chainedVertex.getName());
 
-               OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = new OperatorChain<>(
-                               mockTask,
-                               mock(AccumulatorRegistry.Reporter.class));
+               OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = new OperatorChain<>(mockTask);
 
                headOperator.setup(mockTask, streamConfig, 
operatorChain.getChainEntryPoint());
 
@@ -291,9 +288,7 @@ public class StreamOperatorChainingTest {
                StreamTask<Integer, StreamMap<Integer, Integer>> mockTask =
                                createMockTask(streamConfig, 
chainedVertex.getName());
 
-               OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = new OperatorChain<>(
-                               mockTask,
-                               mock(AccumulatorRegistry.Reporter.class));
+               OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = new OperatorChain<>(mockTask);
 
                headOperator.setup(mockTask, streamConfig, 
operatorChain.getChainEntryPoint());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 624bfff..c56fa91 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
@@ -41,7 +40,6 @@ import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -203,32 +201,14 @@ public class AccumulatorLiveITCase {
 
 
                        TestingJobManagerMessages.UpdatedAccumulators msg = 
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-                       Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> flinkAccumulators = msg.flinkAccumulators();
                        Map<String, Accumulator<?, ?>> userAccumulators = 
msg.userAccumulators();
 
                        ExecutionAttemptID mapperTaskID = null;
 
-                       // find out the first task's execution attempt id
-                       for (Map.Entry<ExecutionAttemptID, ?> entry : 
flinkAccumulators.entrySet()) {
-                               if (entry.getValue() != null) {
-                                       mapperTaskID = entry.getKey();
-                                       break;
-                               }
-                       }
-
                        ExecutionAttemptID sinkTaskID = null;
 
-                       // find the second's task id
-                       for (ExecutionAttemptID key : 
flinkAccumulators.keySet()) {
-                               if (key != mapperTaskID) {
-                                       sinkTaskID = key;
-                                       break;
-                               }
-                       }
-
                        /* Check for accumulator values */
-                       if(checkUserAccumulators(0, userAccumulators) &&
-                                       checkFlinkAccumulators(mapperTaskID, 0, 
0, 0, 0, flinkAccumulators)) {
+                       if(checkUserAccumulators(0, userAccumulators)) {
                                LOG.info("Passed initial check for map task.");
                        } else {
                                fail("Wrong accumulator results when map task 
begins execution.");
@@ -242,17 +222,13 @@ public class AccumulatorLiveITCase {
 
                                // receive message
                                msg = 
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-                               flinkAccumulators = msg.flinkAccumulators();
                                userAccumulators = msg.userAccumulators();
 
-                               LOG.info("{}", flinkAccumulators);
                                LOG.info("{}", userAccumulators);
 
-                               if (checkUserAccumulators(expectedAccVal, 
userAccumulators) &&
-                                               
checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
+                               if (checkUserAccumulators(expectedAccVal, 
userAccumulators)) {
                                        LOG.info("Passed round #" + i);
-                               } else if 
(checkUserAccumulators(expectedAccVal, userAccumulators) &&
-                                               
checkFlinkAccumulators(sinkTaskID, 0, i, 0, i * 4, flinkAccumulators)) {
+                               } else if 
(checkUserAccumulators(expectedAccVal, userAccumulators)) {
                                        // we determined the wrong task id and 
need to switch the two here
                                        ExecutionAttemptID temp = mapperTaskID;
                                        mapperTaskID = sinkTaskID;
@@ -264,11 +240,9 @@ public class AccumulatorLiveITCase {
                        }
 
                        msg = (TestingJobManagerMessages.UpdatedAccumulators) 
receiveOne(TIMEOUT);
-                       flinkAccumulators = msg.flinkAccumulators();
                        userAccumulators = msg.userAccumulators();
 
-                       if(checkUserAccumulators(expectedAccVal, 
userAccumulators) &&
-                                       checkFlinkAccumulators(sinkTaskID, 0, 
0, 0, 0, flinkAccumulators)) {
+                       if(checkUserAccumulators(expectedAccVal, 
userAccumulators)) {
                                LOG.info("Passed initial check for sink task.");
                        } else {
                                fail("Wrong accumulator results when sink task 
begins execution.");
@@ -280,14 +254,11 @@ public class AccumulatorLiveITCase {
                                // receive message
                                msg = 
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
 
-                               flinkAccumulators = msg.flinkAccumulators();
                                userAccumulators = msg.userAccumulators();
 
-                               LOG.info("{}", flinkAccumulators);
                                LOG.info("{}", userAccumulators);
 
-                               if (checkUserAccumulators(expectedAccVal, 
userAccumulators) &&
-                                               
checkFlinkAccumulators(sinkTaskID, i, 0, i * 4, 0, flinkAccumulators)) {
+                               if (checkUserAccumulators(expectedAccVal, 
userAccumulators)) {
                                        LOG.info("Passed round #" + i);
                                } else {
                                        fail("Failed in round #" + i);
@@ -305,49 +276,6 @@ public class AccumulatorLiveITCase {
                return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected 
== ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
        }
 
-       private static boolean checkFlinkAccumulators(ExecutionAttemptID 
taskKey, int expectedRecordsIn, int expectedRecordsOut, int expectedBytesIn, 
int expectedBytesOut,
-                                                                               
                  Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?,?>>> accumulatorMap) {
-               LOG.info("checking flink accumulators");
-
-               Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskMap = 
accumulatorMap.get(taskKey);
-               assertTrue(accumulatorMap.size() > 0);
-
-               for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
entry : taskMap.entrySet()) {
-                       switch (entry.getKey()) {
-                               /**
-                                * The following two cases are for the 
DataSource and Map task
-                                */
-                               case NUM_RECORDS_OUT:
-                                       if(((LongCounter) 
entry.getValue()).getLocalValue() < expectedRecordsOut) {
-                                               return false;
-                                       }
-                                       break;
-                               case NUM_BYTES_OUT:
-                                       if (((LongCounter) 
entry.getValue()).getLocalValue() < expectedBytesOut) {
-                                               return false;
-                                       }
-                                       break;
-                               /**
-                                * The following two cases are for the DataSink 
task
-                                */
-                               case NUM_RECORDS_IN:
-                                       if (((LongCounter) 
entry.getValue()).getLocalValue() < expectedRecordsIn) {
-                                               return false;
-                                       }
-                                       break;
-                               case NUM_BYTES_IN:
-                                       if (((LongCounter) 
entry.getValue()).getLocalValue() < expectedBytesIn) {
-                                               return false;
-                                       }
-                                       break;
-                               default:
-                                       fail("Unknown accumulator found.");
-                       }
-               }
-               return true;
-       }
-
-
        /**
         * UDF that notifies when it changes the accumulator values
         */

Reply via email to