[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.

This closes #896.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8261ed54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8261ed54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8261ed54

Branch: refs/heads/master
Commit: 8261ed5438278331c9cd760463273ad94d4bd410
Parents: d592ee6
Author: Maximilian Michels <[email protected]>
Authored: Wed Jul 8 09:23:42 2015 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Wed Jul 15 15:19:06 2015 +0200

----------------------------------------------------------------------
 .../api/common/accumulators/Accumulator.java    |   1 -
 .../api/common/functions/RuntimeContext.java    |  10 +-
 .../util/AbstractRuntimeUDFContext.java         |  26 +-
 .../functions/util/RuntimeUDFContext.java       |  11 +-
 .../common/operators/CollectionExecutor.java    |  19 +-
 .../functions/util/RuntimeUDFContextTest.java   |  12 +-
 .../base/FlatMapOperatorCollectionTest.java     |   4 +-
 .../operators/base/JoinOperatorBaseTest.java    |   7 +-
 .../common/operators/base/MapOperatorTest.java  |   7 +-
 .../base/PartitionMapOperatorTest.java          |   6 +-
 .../examples/java/wordcount/WordCount.java      |   2 +-
 .../java/org/apache/flink/api/java/DataSet.java |  18 +-
 .../java/org/apache/flink/api/java/Utils.java   |   7 +-
 .../base/CoGroupOperatorCollectionTest.java     |   5 +-
 .../operators/base/GroupReduceOperatorTest.java |   6 +-
 .../operators/base/JoinOperatorBaseTest.java    |   6 +-
 .../operators/base/ReduceOperatorTest.java      |   6 +-
 .../org/apache/flink/optimizer/Optimizer.java   |   2 +-
 .../runtime/accumulators/AccumulatorEvent.java  |  49 ---
 .../accumulators/AccumulatorRegistry.java       | 147 +++++++++
 .../accumulators/AccumulatorSnapshot.java       |  84 ++++++
 .../flink/runtime/execution/Environment.java    |   9 +-
 .../librarycache/BlobLibraryCacheManager.java   |   2 +-
 .../flink/runtime/executiongraph/Execution.java |  43 +++
 .../runtime/executiongraph/ExecutionGraph.java  | 100 ++++++-
 .../io/network/api/reader/AbstractReader.java   |   1 +
 .../api/reader/AbstractRecordReader.java        |  14 +-
 .../io/network/api/reader/BufferReader.java     |   6 +
 .../io/network/api/reader/ReaderBase.java       |   6 +
 .../io/network/api/reader/RecordReader.java     |   1 +
 .../AdaptiveSpanningRecordDeserializer.java     |  31 ++
 .../api/serialization/RecordDeserializer.java   |   6 +
 .../api/serialization/RecordSerializer.java     |   6 +
 .../serialization/SpanningRecordSerializer.java |  16 +-
 ...llingAdaptiveSpanningRecordDeserializer.java |  34 ++-
 .../io/network/api/writer/RecordWriter.java     |  28 ++
 .../task/AbstractIterativePactTask.java         |  19 +-
 .../iterative/task/IterationHeadPactTask.java   |   4 +-
 .../jobgraph/tasks/AbstractInvokable.java       |   2 +-
 .../accumulators/AccumulatorManager.java        | 144 ---------
 .../accumulators/JobAccumulators.java           |  42 ---
 .../flink/runtime/operators/DataSinkTask.java   |   6 +
 .../flink/runtime/operators/DataSourceTask.java |  27 +-
 .../flink/runtime/operators/PactDriver.java     |   2 +-
 .../runtime/operators/PactTaskContext.java      |   2 +-
 .../runtime/operators/RegularPactTask.java      | 133 +++------
 .../operators/chaining/ChainedDriver.java       |  14 +-
 .../util/DistributedRuntimeUDFContext.java      |  11 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  32 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  42 ++-
 .../runtime/taskmanager/TaskExecutionState.java |  43 ++-
 .../flink/runtime/util/SerializedValue.java     |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 118 ++++----
 .../runtime/messages/TaskManagerMessages.scala  |   5 +-
 .../accumulators/AccumulatorMessages.scala      |  15 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  61 ++--
 .../network/api/reader/AbstractReaderTest.java  |   7 +
 .../operators/testutils/DriverTestBase.java     |   6 +-
 .../operators/testutils/MockEnvironment.java    |   9 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   1 +
 .../runtime/testingUtils/TestingCluster.scala   |   2 -
 .../testingUtils/TestingJobManager.scala        |  10 +
 .../TestingJobManagerMessages.scala             |  10 +-
 .../streaming/runtime/io/CoRecordReader.java    |  11 +
 .../io/StreamingAbstractRecordReader.java       |  13 +-
 .../io/StreamingMutableRecordReader.java        |   1 +
 .../runtime/tasks/OneInputStreamTask.java       |   6 +
 .../streaming/runtime/tasks/OutputHandler.java  |  32 +-
 .../runtime/tasks/StreamIterationHead.java      |   9 +-
 .../streaming/runtime/tasks/StreamTask.java     |  16 +-
 .../runtime/tasks/StreamingRuntimeContext.java  |   5 +-
 .../api/state/StatefulOperatorTest.java         |   4 +-
 .../streaming/runtime/io/BarrierBufferTest.java |   6 +
 .../runtime/tasks/StreamMockEnvironment.java    |   9 +-
 .../flink/streaming/util/MockCoContext.java     |   4 +-
 .../flink/streaming/util/MockContext.java       |   4 +-
 .../streaming/util/SourceFunctionUtil.java      |   4 +-
 .../flink/tez/runtime/RegularProcessor.java     |   4 +-
 .../flink/test/util/RecordAPITestBase.java      |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 -
 .../test/accumulators/AccumulatorITCase.java    |   3 +-
 .../AccumulatorIterativeITCase.java             |  18 +-
 .../accumulators/AccumulatorLiveITCase.java     | 299 +++++++++++++++++++
 .../apache/flink/yarn/ApplicationMaster.scala   |   2 -
 84 files changed, 1362 insertions(+), 599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
index 123d956..e49cc04 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
@@ -40,7 +40,6 @@ import java.io.Serializable;
  *            client
  */
 public interface Accumulator<V, R extends Serializable> extends Serializable, 
Cloneable {
-
        /**
         * @param value
         *            The value to add to the accumulator object

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index eb84d1c..fb9d842 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common.functions;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -100,10 +100,12 @@ public interface RuntimeContext {
        <V, A extends Serializable> Accumulator<V, A> getAccumulator(String 
name);
 
        /**
-        * For system internal usage only. Use getAccumulator(...) to obtain a
-        * accumulator. Use this as read-only.
+        * Returns a map of all registered accumulators for this task.
+        * The returned map must not be modified.
+        * @deprecated Use getAccumulator(..) to obtain the value of an 
accumulator.
         */
-       HashMap<String, Accumulator<?, ?>> getAllAccumulators();
+       @Deprecated
+       Map<String, Accumulator<?, ?>> getAllAccumulators();
 
        /**
         * Convenience function to create a counter object for integers.

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index f48eb57..13d79e7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.common.functions.util;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -53,32 +53,34 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
 
        private final ExecutionConfig executionConfig;
 
-       private final HashMap<String, Accumulator<?, ?>> accumulators = new 
HashMap<String, Accumulator<?, ?>>();
-       
+       private final Map<String, Accumulator<?, ?>> accumulators;
+
        private final DistributedCache distributedCache;
-       
-       
+
+
        public AbstractRuntimeUDFContext(String name,
                                                                                
int numParallelSubtasks, int subtaskIndex,
                                                                                
ClassLoader userCodeClassLoader,
-                                                                               
ExecutionConfig executionConfig)
+                                                                               
ExecutionConfig executionConfig,
+                                                                               
Map<String, Accumulator<?,?>> accumulators)
        {
                this(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig,
-                               Collections.<String, Future<Path>>emptyMap());
+                               accumulators, Collections.<String, 
Future<Path>>emptyMap());
        }
-       
+
        public AbstractRuntimeUDFContext(String name,
                                                                                
int numParallelSubtasks, int subtaskIndex,
                                                                                
ClassLoader userCodeClassLoader,
                                                                                
ExecutionConfig executionConfig,
-                                                                               
Map<String, Future<Path>> cpTasks)
-       {
+                                                                               
Map<String, Accumulator<?,?>> accumulators,
+                                                                               
Map<String, Future<Path>> cpTasks) {
                this.name = name;
                this.numParallelSubtasks = numParallelSubtasks;
                this.subtaskIndex = subtaskIndex;
                this.userCodeClassLoader = userCodeClassLoader;
                this.executionConfig = executionConfig;
                this.distributedCache = new DistributedCache(cpTasks);
+               this.accumulators = Preconditions.checkNotNull(accumulators);
        }
 
        @Override
@@ -137,8 +139,8 @@ public abstract class AbstractRuntimeUDFContext implements 
RuntimeContext {
        }
 
        @Override
-       public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
-               return this.accumulators;
+       public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+               return Collections.unmodifiableMap(this.accumulators);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index d116d00..1689138 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
@@ -38,12 +39,14 @@ public class RuntimeUDFContext extends 
AbstractRuntimeUDFContext {
        private final HashMap<String, List<?>> uninitializedBroadcastVars = new 
HashMap<String, List<?>>();
        
        
-       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) 
{
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig);
+       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader,
+                                                       ExecutionConfig 
executionConfig, Map<String, Accumulator<?,?>> accumulators) {
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators);
        }
        
-       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, 
Map<String, Future<Path>> cpTasks) {
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, cpTasks);
+       public RuntimeUDFContext(String name, int numParallelSubtasks, int 
subtaskIndex, ClassLoader userCodeClassLoader,
+                                                       ExecutionConfig 
executionConfig, Map<String, Future<Path>> cpTasks, Map<String, 
Accumulator<?,?>> accumulators) {
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators, cpTasks);
        }
        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index f605113..0a9146c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -184,8 +184,8 @@ public class CollectionExecutor {
                // build the runtime context and compute broadcast variables, 
if necessary
                RuntimeUDFContext ctx;
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), 
executionConfig) :
-                                       new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, 
executionConfig);
+                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), 
executionConfig, accumulators) :
+                                       new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, 
executionConfig, accumulators);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -197,9 +197,6 @@ public class CollectionExecutor {
                
                List<OUT> result = typedOp.executeOnCollections(inputData, ctx, 
executionConfig);
                
-               if (ctx != null) {
-                       AccumulatorHelper.mergeInto(this.accumulators, 
ctx.getAllAccumulators());
-               }
                return result;
        }
        
@@ -226,8 +223,8 @@ public class CollectionExecutor {
                // build the runtime context and compute broadcast variables, 
if necessary
                RuntimeUDFContext ctx;
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
-                               new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, 
executionConfig);
+                       ctx = superStep == 0 ? new 
RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, 
accumulators) :
+                               new 
IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, 
executionConfig, accumulators);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -239,9 +236,6 @@ public class CollectionExecutor {
                
                List<OUT> result = typedOp.executeOnCollections(inputData1, 
inputData2, ctx, executionConfig);
                
-               if (ctx != null) {
-                       AccumulatorHelper.mergeInto(this.accumulators, 
ctx.getAllAccumulators());
-               }
                return result;
        }
        
@@ -485,8 +479,9 @@ public class CollectionExecutor {
        
        private class IterationRuntimeUDFContext extends RuntimeUDFContext 
implements IterationRuntimeContext {
 
-               public IterationRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader classloader, ExecutionConfig 
executionConfig) {
-                       super(name, numParallelSubtasks, subtaskIndex, 
classloader, executionConfig);
+               public IterationRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader classloader,
+                                                                               
ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
+                       super(name, numParallelSubtasks, subtaskIndex, 
classloader, executionConfig, accumulators);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index c77a1b6..9189d5b 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -22,9 +22,11 @@ import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.junit.Test;
 
@@ -34,7 +36,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableNotFound() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new 
HashMap<String, Accumulator<?, ?>>());
                        
                        try {
                                ctx.getBroadcastVariable("some name");
@@ -64,7 +66,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableSimple() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new 
HashMap<String, Accumulator<?, ?>>());
                        
                        ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 
3, 4));
                        ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 
2.0, 3.0, 4.0));
@@ -98,7 +100,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableWithInitializer() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new 
HashMap<String, Accumulator<?, ?>>());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        
@@ -123,7 +125,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testResetBroadcastVariableWithInitializer() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new 
HashMap<String, Accumulator<?, ?>>());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        
@@ -146,7 +148,7 @@ public class RuntimeUDFContextTest {
        @Test
        public void testBroadcastVariableWithInitializerAndMismatch() {
                try {
-                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+                       RuntimeUDFContext ctx = new RuntimeUDFContext("test 
name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new 
HashMap<String, Accumulator<?, ?>>());
                        
                        ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 
4));
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index d231455..734324b 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 @SuppressWarnings("serial")
@@ -72,7 +74,7 @@ public class FlatMapOperatorCollectionTest implements 
Serializable {
                }
                // run on collections
                final List<String> result = getTestFlatMapOperator(udf)
-                               .executeOnCollections(input, new 
RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig), executionConfig);
+                               .executeOnCollections(input, new 
RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap<String, 
Accumulator<?, ?>>()), executionConfig);
 
                Assert.assertEquals(input.size(), result.size());
                Assert.assertEquals(input, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 54975b4..98f75bc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -114,11 +116,12 @@ public class JoinOperatorBaseTest implements Serializable 
{
 
 
                try {
+                       final HashMap<String, Accumulator<?, ?>> accumulatorMap 
= new HashMap<String, Accumulator<?, ?>>();
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+                       List<Integer> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), 
executionConfig);
                        executionConfig.enableObjectReuse();
-                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+                       List<Integer> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new 
RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), 
executionConfig);
 
                        assertEquals(expected, resultSafe);
                        assertEquals(expected, resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index 8e07f07..2c98a17 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -22,10 +22,12 @@ import static org.junit.Assert.*;
 import static java.util.Arrays.asList;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -102,11 +104,12 @@ public class MapOperatorTest implements 
java.io.Serializable {
                                        parser, new 
UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO), taskName);
                        
                        List<String> input = new ArrayList<String>(asList("1", 
"2", "3", "4", "5", "6"));
+                       final HashMap<String, Accumulator<?, ?>> accumulatorMap 
= new HashMap<String, Accumulator<?, ?>>();
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, accumulatorMap), executionConfig);
                        executionConfig.enableObjectReuse();
-                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, accumulatorMap), executionConfig);
                        
                        assertEquals(asList(1, 2, 3, 4, 5, 6), 
resultMutableSafe);
                        assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 61ba359..28c6d821 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -22,10 +22,12 @@ import static org.junit.Assert.*;
 import static java.util.Arrays.asList;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
@@ -78,9 +80,9 @@ public class PartitionMapOperatorTest implements 
java.io.Serializable {
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Integer> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
                        executionConfig.enableObjectReuse();
-                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Integer> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
                        
                        assertEquals(asList(1, 2, 3, 4, 5, 6), 
resultMutableSafe);
                        assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index 82c3ad8..e6b8418 100644
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -60,7 +60,7 @@ public class WordCount {
                
                // set up the execution environment
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
+
                // get input data
                DataSet<String> text = getTextDataSet(env);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d24a350..c628b04 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -408,14 +408,16 @@ public abstract class DataSet<T> {
                JobExecutionResult res = getExecutionEnvironment().execute();
 
                ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
-               try {
-                       return 
SerializedListAccumulator.deserializeList(accResult, serializer);
-               }
-               catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Cannot find type class of 
collected data type.", e);
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Serialization error while 
deserializing collected data", e);
+               if (accResult != null) {
+                       try {
+                               return 
SerializedListAccumulator.deserializeList(accResult, serializer);
+                       } catch (ClassNotFoundException e) {
+                               throw new RuntimeException("Cannot find type 
class of collected data type.", e);
+                       } catch (IOException e) {
+                               throw new RuntimeException("Serialization error 
while deserializing collected data", e);
+                       }
+               } else {
+                       throw new RuntimeException("The call to collect() could 
not retrieve the DataSet.");
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index dd1d6d2..a1e3d25 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -114,13 +114,18 @@ public class Utils {
                @Override
                public void open(Configuration parameters) throws Exception {
                        this.accumulator = new SerializedListAccumulator<T>();
-                       getRuntimeContext().addAccumulator(id, accumulator);
                }
 
                @Override
                public void flatMap(T value, Collector<T> out) throws Exception 
{
                        accumulator.add(value, serializer);
                }
+
+               @Override
+               public void close() throws Exception {
+                       // Important: should only be added in close method to 
minimize traffic of accumulators
+                       getRuntimeContext().addAccumulator(id, accumulator);
+               }
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 0aa6097..a178a47 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -35,6 +36,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -68,7 +70,8 @@ public class CoGroupOperatorCollectionTest implements 
Serializable {
                        );
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
-                       final RuntimeContext ctx = new RuntimeUDFContext("Test 
UDF", 4, 0, null, executionConfig);
+                       final HashMap<String, Accumulator<?, ?>> accumulators = 
new HashMap<String, Accumulator<?, ?>>();
+                       final RuntimeContext ctx = new RuntimeUDFContext("Test 
UDF", 4, 0, null, executionConfig, accumulators);
 
                        {
                                SumCoGroup udf1 = new SumCoGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index 447c8c5..08f4acd 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -31,6 +32,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -161,9 +163,9 @@ public class GroupReduceOperatorTest implements 
java.io.Serializable {
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
                        executionConfig.enableObjectReuse();
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
                        
                        
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 1b38281..21fcfb3 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -103,9 +105,9 @@ public class JoinOperatorBaseTest implements Serializable {
                try {
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Tuple2<Double, String>> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null, executionConfig), executionConfig);
+                       List<Tuple2<Double, String>> resultSafe = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), 
executionConfig);
                        executionConfig.enableObjectReuse();
-                       List<Tuple2<Double, String>> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null, executionConfig), executionConfig);
+                       List<Tuple2<Double, String>> resultRegular = 
base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 
1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), 
executionConfig);
 
                        assertEquals(expected, new HashSet<Tuple2<Double, 
String>>(resultSafe));
                        assertEquals(expected, new HashSet<Tuple2<Double, 
String>>(resultRegular));

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 4e1eebd..7cd9771 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -30,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -138,9 +140,9 @@ public class ReduceOperatorTest implements 
java.io.Serializable {
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
-                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Tuple2<String, Integer>> resultMutableSafe = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
                        executionConfig.enableObjectReuse();
-                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig), executionConfig);
+                       List<Tuple2<String, Integer>> resultRegular = 
op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, 
executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 
                        Set<Tuple2<String, Integer>> resultSetMutableSafe = new 
HashSet<Tuple2<String, Integer>>(resultMutableSafe);
                        Set<Tuple2<String, Integer>> resultSetRegular = new 
HashSet<Tuple2<String, Integer>>(resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index a2b78ed..6f41c29 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -545,7 +545,7 @@ public class Optimizer {
        // 
------------------------------------------------------------------------
        
        private OptimizerPostPass getPostPassFromPlan(Plan program) {
-               final String className =  program.getPostPassClassName();
+               final String className = program.getPostPassClassName();
                if (className == null) {
                        throw new CompilerException("Optimizer Post Pass class 
description is null");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
deleted file mode 100644
index ad7fabd..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.accumulators;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.util.SerializedValue;
-
-/**
- * This class encapsulates a map of accumulators for a single job. It is used
- * for the transfer from TaskManagers to the JobManager and from the JobManager
- * to the Client.
- */
-public class AccumulatorEvent extends SerializedValue<Map<String, 
Accumulator<?, ?>>> {
-
-       private static final long serialVersionUID = 8965894516006882735L;
-
-       /** JobID for the target job */
-       private final JobID jobID;
-
-
-       public AccumulatorEvent(JobID jobID, Map<String, Accumulator<?, ?>> 
accumulators) throws IOException {
-               super(accumulators);
-               this.jobID = jobID;
-       }
-
-       public JobID getJobID() {
-               return this.jobID;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
new file mode 100644
index 0000000..0ef3650
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined 
accumulators.
+ */
+public class AccumulatorRegistry {
+
+       protected static final Logger LOG = 
LoggerFactory.getLogger(AccumulatorRegistry.class);
+
+       protected final JobID jobID;
+       protected final ExecutionAttemptID taskID;
+
+       /* Flink's internal Accumulator values stored for the executing task. */
+       private final Map<Metric, Accumulator<?, ?>> flinkAccumulators =
+                       new HashMap<Metric, Accumulator<?, ?>>();
+
+       /* User-defined Accumulator values stored for the executing task. */
+       private final Map<String, Accumulator<?, ?>> userAccumulators =
+                       Collections.synchronizedMap(new HashMap<String, 
Accumulator<?, ?>>());
+
+       /* The reporter reference that is handed to the reporting tasks. */
+       private final ReadWriteReporter reporter;
+
+       /**
+        * Flink metrics supported
+        */
+       public enum Metric {
+               NUM_RECORDS_IN,
+               NUM_RECORDS_OUT,
+               NUM_BYTES_IN,
+               NUM_BYTES_OUT
+       }
+
+
+       public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) {
+               this.jobID = jobID;
+               this.taskID = taskID;
+               this.reporter = new ReadWriteReporter(flinkAccumulators);
+       }
+
+       /**
+        * Creates a snapshot of this accumulator registry.
+        * @return a serialized accumulator map
+        */
+       public AccumulatorSnapshot getSnapshot() {
+               try {
+                       return new AccumulatorSnapshot(jobID, taskID, 
flinkAccumulators, userAccumulators);
+               } catch (IOException e) {
+                       LOG.warn("Failed to serialize accumulators for task.", 
e);
+                       return null;
+               }
+       }
+
+       /**
+        * Gets the map for user-defined accumulators.
+        */
+       public Map<String, Accumulator<?, ?>> getUserMap() {
+               return userAccumulators;
+       }
+
+       /**
+        * Gets the reporter for flink internal metrics.
+        */
+       public Reporter getReadWriteReporter() {
+               return reporter;
+       }
+
+       /**
+        * Interface for Flink's internal accumulators.
+        */
+       public interface Reporter {
+               void reportNumRecordsIn(long value);
+               void reportNumRecordsOut(long value);
+               void reportNumBytesIn(long value);
+               void reportNumBytesOut(long value);
+       }
+
+       /**
+        * Accumulator based reporter for keeping track of internal metrics 
(e.g. bytes and records in/out)
+        */
+       private static class ReadWriteReporter implements Reporter {
+
+               private LongCounter numRecordsIn = new LongCounter();
+               private LongCounter numRecordsOut = new LongCounter();
+               private LongCounter numBytesIn = new LongCounter();
+               private LongCounter numBytesOut = new LongCounter();
+
+               private ReadWriteReporter(Map<Metric, Accumulator<?,?>> 
accumulatorMap) {
+                       accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
+                       accumulatorMap.put(Metric.NUM_RECORDS_OUT, 
numRecordsOut);
+                       accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
+                       accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut);
+               }
+
+               @Override
+               public void reportNumRecordsIn(long value) {
+                       numRecordsIn.add(value);
+               }
+
+               @Override
+               public void reportNumRecordsOut(long value) {
+                       numRecordsOut.add(value);
+               }
+
+               @Override
+               public void reportNumBytesIn(long value) {
+                       numBytesIn.add(value);
+               }
+
+               @Override
+               public void reportNumBytesOut(long value) {
+                       numBytesOut.add(value);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
new file mode 100644
index 0000000..a6288f0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedValue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * This class encapsulates a map of accumulators for a single task. It is used
+ * for the transfer from TaskManagers to the JobManager and from the JobManager
+ * to the Client.
+ */
+public class AccumulatorSnapshot implements Serializable {
+
+       private static final long serialVersionUID = 42L;
+
+       private final JobID jobID;
+       private final ExecutionAttemptID executionAttemptID;
+
+       /**
+        * Flink internal accumulators which can be serialized using the system 
class loader.
+        */
+       private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators;
+
+       /**
+        * Serialized user accumulators which may require the custom user class 
loader.
+        */
+       private final SerializedValue<Map<String, Accumulator<?, ?>>> 
userAccumulators;
+
+       public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID 
executionAttemptID,
+                                                       
Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
+                                                       Map<String, 
Accumulator<?, ?>> userAccumulators) throws IOException {
+               this.jobID = jobID;
+               this.executionAttemptID = executionAttemptID;
+               this.flinkAccumulators = flinkAccumulators;
+               this.userAccumulators = new SerializedValue<Map<String, 
Accumulator<?, ?>>>(userAccumulators);
+       }
+
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       public ExecutionAttemptID getExecutionAttemptID() {
+               return executionAttemptID;
+       }
+
+       /**
+        * Gets the Flink (internal) accumulators values.
+        * @return the serialized map
+        */
+       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators() {
+               return flinkAccumulators;
+       }
+
+       /**
+        * Gets the user-defined accumulators values.
+        * @return the serialized map
+        */
+       public Map<String, Accumulator<?, ?>> 
deserializeUserAccumulators(ClassLoader classLoader) throws IOException, 
ClassNotFoundException {
+               return userAccumulators.deserializeValue(classLoader);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 755f1ad..c561869 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.execution;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -142,11 +142,10 @@ public interface Environment {
        BroadcastVariableManager getBroadcastVariableManager();
 
        /**
-        * Reports the given set of accumulators to the JobManager.
-        *
-        * @param accumulators The accumulators to report.
+        * Return the registry for accumulators which are periodically sent to 
the job manager.
+        * @return the registry
         */
-       void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators);
+       AccumulatorRegistry getAccumulatorRegistry();
 
        /**
         * Confirms that the invokable has successfully completed all steps it 
needed to

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 848f619..88be5e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -46,7 +46,7 @@ import com.google.common.base.Preconditions;
  * For each job graph that is submitted to the system the library cache 
manager maintains
  * a set of libraries (typically JAR files) which the job requires to run. The 
library cache manager
  * caches library files in order to avoid unnecessary retransmission of data. 
It is based on a singleton
- * programming pattern, so there exists at most on library manager at a time.
+ * programming pattern, so there exists at most one library manager at a time.
  */
 public final class BlobLibraryCacheManager extends TimerTask implements 
LibraryCacheManager {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index af67c3f..3b836ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -54,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeoutException;
@@ -133,6 +136,15 @@ public class Execution implements Serializable {
        @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private ExecutionContext executionContext;
 
+       /* Lock for updating the accumulators atomically. */
+       private final Object accumulatorLock = new Object();
+
+       /* Continuously updated map of user-defined accumulators */
+       private volatile Map<String, Accumulator<?, ?>> userAccumulators;
+
+       /* Continuously updated map of internal accumulators */
+       private volatile Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators;
+
        // 
--------------------------------------------------------------------------------------------
        
        public Execution(
@@ -593,6 +605,10 @@ public class Execution implements Serializable {
        }
 
        void markFinished() {
+               markFinished(null, null);
+       }
+
+       void markFinished(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) {
 
                // this call usually comes during RUNNING, but may also come 
while still in deploying (very fast tasks!)
                while (true) {
@@ -613,6 +629,11 @@ public class Execution implements Serializable {
                                                        }
                                                }
 
+                                               synchronized (accumulatorLock) {
+                                                       this.flinkAccumulators 
= flinkAccumulators;
+                                                       this.userAccumulators = 
userAccumulators;
+                                               }
+
                                                assignedResource.releaseSlot();
                                                
vertex.getExecutionGraph().deregisterExecution(this);
                                        }
@@ -935,6 +956,28 @@ public class Execution implements Serializable {
                return vertex.getSimpleName() + " - execution #" + 
attemptNumber;
        }
 
+       /**
+        * Update accumulators (discarded when the Execution has already been 
terminated).
+        * @param flinkAccumulators the flink internal accumulators
+        * @param userAccumulators the user accumulators
+        */
+       public void setAccumulators(Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators,
+                                                               Map<String, 
Accumulator<?, ?>> userAccumulators) {
+               synchronized (accumulatorLock) {
+                       if (!state.isTerminal()) {
+                               this.flinkAccumulators = flinkAccumulators;
+                               this.userAccumulators = userAccumulators;
+                       }
+               }
+       }
+       public Map<String, Accumulator<?, ?>> getUserAccumulators() {
+               return userAccumulators;
+       }
+
+       public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators() {
+               return flinkAccumulators;
+       }
+
        @Override
        public String toString() {
                return String.format("Attempt #%d (%s) @ %s - [%s]", 
attemptNumber, vertex.getSimpleName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 47b7ae2..6d2262c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -22,8 +22,12 @@ import akka.actor.ActorRef;
 
 import akka.actor.ActorSystem;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -38,6 +42,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.util.InstantiationUtil;
@@ -47,10 +52,12 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -128,6 +135,29 @@ public class ExecutionGraph implements Serializable {
        /** The currently executed tasks, for callbacks */
        private final ConcurrentHashMap<ExecutionAttemptID, Execution> 
currentExecutions;
 
+       /**
+        * Updates the accumulators during the runtime of a job. Final 
accumulator results are transferred
+        * through the UpdateTaskExecutionState message.
+        * @param accumulatorSnapshot The serialized flink and user-defined 
accumulators
+        */
+       public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) 
{
+               Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators = accumulatorSnapshot.getFlinkAccumulators();
+               Map<String, Accumulator<?, ?>> userAccumulators;
+               try {
+                       userAccumulators = 
accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
+
+                       ExecutionAttemptID execID = 
accumulatorSnapshot.getExecutionAttemptID();
+                       Execution execution = currentExecutions.get(execID);
+                       if (execution != null) {
+                               execution.setAccumulators(flinkAccumulators, 
userAccumulators);
+                       } else {
+                               LOG.warn("Received accumulator result for 
unknown execution {}.", execID);
+                       }
+               } catch (Exception e) {
+                       LOG.error("Cannot update accumulators for job " + 
jobID, e);
+               }
+       }
+
        /** A list of all libraries required during the job execution. 
Libraries have to be stored
         * inside the BlobService and are referenced via the BLOB keys. */
        private final List<BlobKey> requiredJarFiles;
@@ -485,6 +515,57 @@ public class ExecutionGraph implements Serializable {
                return executionContext;
        }
 
+       /**
+        * Gets the internal flink accumulator map of maps which contains some 
metrics.
+        * @return A map of accumulators for every executed task.
+        */
+       public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?,?>>> getFlinkAccumulators() {
+               Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>>> flinkAccumulators =
+                               new HashMap<ExecutionAttemptID, 
Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
+
+               for (ExecutionVertex vertex : getAllExecutionVertices()) {
+                       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+                       
flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), 
taskAccs);
+               }
+
+               return flinkAccumulators;
+       }
+
+       /**
+        * Merges all accumulator results from the tasks previously executed in 
the Executions.
+        * @return The accumulator map
+        */
+       public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
+
+               Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<String, Accumulator<?, ?>>();
+
+               for (ExecutionVertex vertex : getAllExecutionVertices()) {
+                       Map<String, Accumulator<?, ?>> next = 
vertex.getCurrentExecutionAttempt().getUserAccumulators();
+                       if (next != null) {
+                               AccumulatorHelper.mergeInto(userAccumulators, 
next);
+                       }
+               }
+
+               return userAccumulators;
+       }
+
+       /**
+        * Gets a serialized accumulator map.
+        * @return The accumulator map with serialized accumulator values.
+        * @throws IOException
+        */
+       public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
throws IOException {
+
+               Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
+
+               Map<String, SerializedValue<Object>> result = new 
HashMap<String, SerializedValue<Object>>();
+               for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
+                       result.put(entry.getKey(), new 
SerializedValue<Object>(entry.getValue().getLocalValue()));
+               }
+
+               return result;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Actions
        // 
--------------------------------------------------------------------------------------------
@@ -791,14 +872,31 @@ public class ExecutionGraph implements Serializable {
        //  Callbacks and Callback Utilities
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * Updates the state of the Task and sets the final accumulator results.
+        * @param state
+        * @return
+        */
        public boolean updateState(TaskExecutionState state) {
                Execution attempt = this.currentExecutions.get(state.getID());
                if (attempt != null) {
+
                        switch (state.getExecutionState()) {
                                case RUNNING:
                                        return attempt.switchToRunning();
                                case FINISHED:
-                                       attempt.markFinished();
+                                       Map<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> flinkAccumulators = null;
+                                       Map<String, Accumulator<?, ?>> 
userAccumulators = null;
+                                       try {
+                                               AccumulatorSnapshot 
accumulators = state.getAccumulators();
+                                               flinkAccumulators = 
accumulators.getFlinkAccumulators();
+                                               userAccumulators = 
accumulators.deserializeUserAccumulators(userClassLoader);
+                                       } catch (Exception e) {
+                                               // Exceptions would be thrown 
in the future here
+                                               LOG.error("Failed to 
deserialize final accumulator results.", e);
+                                       }
+
+                                       attempt.markFinished(flinkAccumulators, 
userAccumulators);
                                        return true;
                                case CANCELED:
                                        attempt.cancelingComplete();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 96b6f99..90564a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -147,4 +147,5 @@ public abstract class AbstractReader implements ReaderBase {
 
                return ++currentNumberOfEndOfSuperstepEvents == 
inputGate.getNumberOfInputChannels();
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 56e5d33..7aa57a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -64,7 +65,9 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                                DeserializationResult result = 
currentRecordDeserializer.getNextRecord(target);
 
                                if (result.isBufferConsumed()) {
-                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
+                                       final Buffer currentBuffer = 
currentRecordDeserializer.getCurrentBuffer();
+
+                                       currentBuffer.recycle();
                                        currentRecordDeserializer = null;
                                }
 
@@ -89,7 +92,7 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                                                        + "If you are using 
custom serialization code (Writable or Value types), check their "
                                                        + "serialization 
routines. In the case of Kryo, check the respective Kryo serializer.");
                                }
-                               
+
                                if (handleEvent(bufferOrEvent.getEvent())) {
                                        if (inputGate.isFinished()) {
                                                isFinished = true;
@@ -112,4 +115,11 @@ abstract class AbstractRecordReader<T extends 
IOReadableWritable> extends Abstra
                        }
                }
        }
+
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       deserializer.setReporter(reporter);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index ca59609..debb352 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -47,4 +48,9 @@ public final class BufferReader extends AbstractReader {
                        }
                }
        }
+
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 2a0a6df..9f8ae20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
@@ -51,4 +52,9 @@ public interface ReaderBase {
 
        boolean hasReachedEndOfSuperstep();
 
+       /**
+        * Setter for the reporter, e.g. for the number of records emitted and 
the number of bytes read.
+        */
+       void setReporter(AccumulatorRegistry.Reporter reporter);
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
index b1395e3..d45920e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
@@ -80,4 +80,5 @@ public class RecordReader<T extends IOReadableWritable> 
extends AbstractRecordRe
                        throw new RuntimeException("Cannot instantiate class " 
+ recordType.getName(), e);
                }
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index 28bcf4a..ec9f4fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -45,6 +46,8 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
 
        private Buffer currentBuffer;
 
+       private AccumulatorRegistry.Reporter reporter;
+
        public AdaptiveSpanningRecordDeserializer() {
                this.nonSpanningWrapper = new NonSpanningWrapper();
                this.spanningWrapper = new SpanningWrapper();
@@ -90,10 +93,18 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                if (nonSpanningRemaining >= 4) {
                        int len = this.nonSpanningWrapper.readInt();
 
+                       if (reporter != null) {
+                               reporter.reportNumBytesIn(len);
+                       }
+
                        if (len <= nonSpanningRemaining - 4) {
                                // we can get a full record from here
                                target.read(this.nonSpanningWrapper);
 
+                               if (reporter != null) {
+                                       reporter.reportNumRecordsIn(1);
+                               }
+
                                return (this.nonSpanningWrapper.remaining() == 
0) ?
                                                
DeserializationResult.LAST_RECORD_FROM_BUFFER :
                                                
DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -117,6 +128,10 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                        // get the full record
                        target.read(this.spanningWrapper);
 
+                       if (reporter != null) {
+                               reporter.reportNumRecordsIn(1);
+                       }
+
                        // move the remainder to the non-spanning wrapper
                        // this does not copy it, only sets the memory segment
                        
this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
@@ -144,6 +159,12 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                return this.nonSpanningWrapper.remaining() > 0 || 
this.spanningWrapper.getNumGatheredBytes() > 0;
        }
 
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               this.reporter = reporter;
+               this.spanningWrapper.setReporter(reporter);
+       }
+
        // 
-----------------------------------------------------------------------------------------------------------------
 
        private static final class NonSpanningWrapper implements DataInputView {
@@ -426,6 +447,8 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
 
                private int recordLimit;
 
+               private AccumulatorRegistry.Reporter reporter;
+
                public SpanningWrapper() {
                        this.lengthBuffer = ByteBuffer.allocate(4);
                        this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
@@ -463,6 +486,10 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                                } else {
                                        this.recordLength = 
this.lengthBuffer.getInt(0);
 
+                                       if (reporter != null) {
+                                               
reporter.reportNumBytesIn(this.recordLength);
+                                       }
+
                                        this.lengthBuffer.clear();
                                        segmentPosition = toPut;
                                }
@@ -607,5 +634,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
                public int read(byte[] b) throws IOException {
                        return this.serializationReadBuffer.read(b);
                }
+
+               public void setReporter(AccumulatorRegistry.Reporter reporter) {
+                       this.reporter = reporter;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index dd8ea06..e4c7890 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -64,4 +65,9 @@ public interface RecordDeserializer<T extends 
IOReadableWritable> {
        void clear();
        
        boolean hasUnfinishedData();
+
+       /**
+        * Setter for the reporter, e.g. for the number of records emitted and 
the number of bytes read.
+        */
+       void setReporter(AccumulatorRegistry.Reporter reporter);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index bb6db55..e9f339a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -63,4 +64,9 @@ public interface RecordSerializer<T extends 
IOReadableWritable> {
        void clear();
        
        boolean hasData();
+
+       /**
+        * Setter for the reporter, e.g. for the number of records emitted and 
the number of bytes read.
+        */
+       void setReporter(AccumulatorRegistry.Reporter reporter);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 38e130d..f163e05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -24,6 +24,7 @@ import java.nio.ByteOrder;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
@@ -50,6 +51,8 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        /** Limit of current {@link MemorySegment} of target buffer */
        private int limit;
 
+       private AccumulatorRegistry.Reporter reporter;
+
        public SpanningRecordSerializer() {
                this.serializationBuffer = new DataOutputSerializer(128);
 
@@ -75,7 +78,13 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                // write data and length
                record.write(this.serializationBuffer);
 
-               this.lengthBuffer.putInt(0, this.serializationBuffer.length());
+               int len = this.serializationBuffer.length();
+               this.lengthBuffer.putInt(0, len);
+
+               if (reporter != null) {
+                       reporter.reportNumBytesOut(len);
+                       reporter.reportNumRecordsOut(1);
+               }
 
                this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
 
@@ -173,4 +182,9 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                // either data in current target buffer or intermediate buffers
                return this.position > 0 || (this.lengthBuffer.hasRemaining() 
|| this.dataBuffer.hasRemaining());
        }
+
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               this.reporter = reporter;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 6b0d836..f3e4892 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;
@@ -61,6 +62,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
        private Buffer currentBuffer;
 
+       private AccumulatorRegistry.Reporter reporter;
+
        public SpillingAdaptiveSpanningRecordDeserializer() {
                
                String tempDirString = GlobalConfiguration.getString(
@@ -112,11 +115,19 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                if (nonSpanningRemaining >= 4) {
                        int len = this.nonSpanningWrapper.readInt();
 
+                       if (reporter != null) {
+                               reporter.reportNumBytesIn(len);
+                       }
+
                        if (len <= nonSpanningRemaining - 4) {
                                // we can get a full record from here
                                try {
                                        target.read(this.nonSpanningWrapper);
 
+                                       if (reporter != null) {
+                                               reporter.reportNumRecordsIn(1);
+                                       }
+
                                        int remaining = 
this.nonSpanningWrapper.remaining();
                                        if (remaining > 0) {
                                                return 
DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -151,6 +162,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                if (this.spanningWrapper.hasFullRecord()) {
                        // get the full record
                        target.read(this.spanningWrapper.getInputView());
+
+                       if (reporter != null) {
+                               reporter.reportNumRecordsIn(1);
+                       }
                        
                        // move the remainder to the non-spanning wrapper
                        // this does not copy it, only sets the memory segment
@@ -176,6 +191,12 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                return this.nonSpanningWrapper.remaining() > 0 || 
this.spanningWrapper.getNumGatheredBytes() > 0;
        }
 
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               this.reporter = reporter;
+               this.spanningWrapper.setReporter(reporter);
+       }
+
        // 
-----------------------------------------------------------------------------------------------------------------
        
        private static final class NonSpanningWrapper implements DataInputView {
@@ -469,7 +490,9 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                private File spillFile;
                
                private InputViewDataInputStreamWrapper spillFileReader;
-               
+
+               private AccumulatorRegistry.Reporter reporter;
+
                public SpanningWrapper(String[] tempDirs) {
                        this.tempDirs = tempDirs;
                        
@@ -522,6 +545,11 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                        return;
                                } else {
                                        this.recordLength = 
this.lengthBuffer.getInt(0);
+
+                                       if (reporter != null) {
+                                               
reporter.reportNumBytesIn(recordLength);
+                                       }
+
                                        this.lengthBuffer.clear();
                                        segmentPosition = toPut;
                                        
@@ -652,5 +680,9 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        random.nextBytes(bytes);
                        return StringUtils.byteToHexString(bytes);
                }
+
+               public void setReporter(AccumulatorRegistry.Reporter reporter) {
+                       this.reporter = reporter;
+               }
        }
 }

Reply via email to