http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4381fd0..29efc4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
@@ -49,6 +50,12 @@ public class RecordWriter<T extends IOReadableWritable> {
 
        private final int numChannels;
 
+       /**
+        * Counter for the number of records emitted and for the number of 
bytes written.
+        * @param counter
+        */
+       private AccumulatorRegistry.Reporter reporter;
+
        /** {@link RecordSerializer} per outgoing channel */
        private final RecordSerializer<T>[] serializers;
 
@@ -81,6 +88,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 
                        synchronized (serializer) {
                                SerializationResult result = 
serializer.addRecord(record);
+
                                while (result.isFullBuffer()) {
                                        Buffer buffer = 
serializer.getCurrentBuffer();
 
@@ -90,8 +98,18 @@ public class RecordWriter<T extends IOReadableWritable> {
                                        }
 
                                        buffer = 
writer.getBufferProvider().requestBufferBlocking();
+                                       if (reporter != null) {
+                                               // increase the number of 
written bytes by the memory segment's size
+                                               
reporter.reportNumBytesOut(buffer.getSize());
+                                       }
+
                                        result = 
serializer.setNextBuffer(buffer);
                                }
+
+                               if(reporter != null) {
+                                       // count number of emitted records
+                                       reporter.reportNumRecordsOut(1);
+                               }
                        }
                }
        }
@@ -173,4 +191,14 @@ public class RecordWriter<T extends IOReadableWritable> {
                        }
                }
        }
+
+       /**
+        * Counter for the number of records emitted and the records processed.
+        */
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               for(RecordSerializer<?> serializer : serializers) {
+                       serializer.setReporter(reporter);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 9c5fdca..72434e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.iterative.task;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
@@ -52,6 +53,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
 
 /**
  * The abstract base class for all tasks able to participate in an iteration.
@@ -166,7 +169,7 @@ public abstract class AbstractIterativePactTask<S extends 
Function, OT> extends
        public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
                Environment env = getEnvironment();
                return new IterativeRuntimeUdfContext(taskName, 
env.getNumberOfSubtasks(),
-                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig());
+                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(), this.accumulatorMap);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -356,8 +359,10 @@ public abstract class AbstractIterativePactTask<S extends 
Function, OT> extends
 
        private class IterativeRuntimeUdfContext extends 
DistributedRuntimeUDFContext implements IterationRuntimeContext {
 
-               public IterativeRuntimeUdfContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, 
ExecutionConfig executionConfig) {
-                       super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig);
+               public IterativeRuntimeUdfContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+                                                                               
ExecutionConfig executionConfig,
+                                                                               
Map<String, Accumulator<?,?>> accumulatorMap) {
+                       super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulatorMap);
                }
 
                @Override
@@ -375,6 +380,14 @@ public abstract class AbstractIterativePactTask<S extends 
Function, OT> extends
                public <T extends Value> T getPreviousIterationAggregate(String 
name) {
                        return (T) 
getIterationAggregators().getPreviousGlobalAggregate(name);
                }
+
+               @Override
+               public <V, A extends Serializable> void addAccumulator(String 
name, Accumulator<V, A> newAccumulator) {
+                       // only add accumulator on first iteration
+                       if (inFirstIteration()) {
+                               super.addAccumulator(name, newAccumulator);
+                       }
+               }
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index cf02bdf..9cb045f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.slf4j.Logger;
@@ -112,8 +113,9 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
                List<RecordWriter<?>> finalOutputWriters = new 
ArrayList<RecordWriter<?>>();
                final TaskConfig finalOutConfig = 
this.config.getIterationHeadFinalOutputConfig();
                final ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
+               AccumulatorRegistry.Reporter reporter = 
getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
                this.finalOutputCollector = 
RegularPactTask.getOutputCollector(this, finalOutConfig,
-                       userCodeClassLoader, finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs());
+                       userCodeClassLoader, finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
 
                // sanity check the setup
                final int writersIntoStepFunction = this.eventualOutputs.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 85dd5c5..df41672 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This is the abstract base class for every task that can be executed ba a 
TaskManager.
+ * This is the abstract base class for every task that can be executed by a 
TaskManager.
  * Concrete tasks like the stream vertices of the batch tasks
  * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit 
from this class.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
deleted file mode 100644
index c824232..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.accumulators;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.util.SerializedValue;
-
-/**
- * This class manages the accumulators for different jobs. Either the jobs are
- * running and new accumulator results have to be merged in, or the jobs are no
- * longer running and the results shall be still available for the client or 
the
- * web interface. Accumulators for older jobs are automatically removed when 
new
- * arrive, based on a maximum number of entries.
- * 
- * All functions are thread-safe and thus can be called directly from
- * JobManager.
- */
-public class AccumulatorManager {
-
-       /** Map of accumulators belonging to recently started jobs */
-       private final Map<JobID, JobAccumulators> jobAccumulators = new 
HashMap<JobID, JobAccumulators>();
-
-       private final LinkedList<JobID> lru = new LinkedList<JobID>();
-       private int maxEntries;
-
-
-       public AccumulatorManager(int maxEntries) {
-               this.maxEntries = maxEntries;
-       }
-
-       /**
-        * Merges the new accumulators with the existing accumulators collected 
for
-        * the job.
-        */
-       public void processIncomingAccumulators(JobID jobID,
-                                                                               
        Map<String, Accumulator<?, ?>> newAccumulators) {
-               synchronized (this.jobAccumulators) {
-                       
-                       JobAccumulators jobAccumulators = 
this.jobAccumulators.get(jobID);
-                       if (jobAccumulators == null) {
-                               jobAccumulators = new JobAccumulators();
-                               this.jobAccumulators.put(jobID, 
jobAccumulators);
-                               cleanup(jobID);
-                       }
-                       jobAccumulators.processNew(newAccumulators);
-               }
-       }
-
-       public Map<String, Object> getJobAccumulatorResults(JobID jobID) {
-               Map<String, Object> result = new HashMap<String, Object>();
-
-               JobAccumulators acc;
-               synchronized (jobAccumulators) {
-                       acc = jobAccumulators.get(jobID);
-               }
-
-               if (acc != null) {
-                       for (Map.Entry<String, Accumulator<?, ?>> entry : 
acc.getAccumulators().entrySet()) {
-                               result.put(entry.getKey(), 
entry.getValue().getLocalValue());
-                       }
-               }
-
-               return result;
-       }
-
-       public Map<String, SerializedValue<Object>> 
getJobAccumulatorResultsSerialized(JobID jobID) throws IOException {
-               JobAccumulators acc;
-               synchronized (jobAccumulators) {
-                       acc = jobAccumulators.get(jobID);
-               }
-
-               if (acc == null || acc.getAccumulators().isEmpty()) {
-                       return Collections.emptyMap();
-               }
-
-               Map<String, SerializedValue<Object>> result = new 
HashMap<String, SerializedValue<Object>>();
-               for (Map.Entry<String, Accumulator<?, ?>> entry : 
acc.getAccumulators().entrySet()) {
-                       result.put(entry.getKey(), new 
SerializedValue<Object>(entry.getValue().getLocalValue()));
-               }
-
-               return result;
-       }
-
-       public StringifiedAccumulatorResult[] 
getJobAccumulatorResultsStringified(JobID jobID) throws IOException {
-               JobAccumulators acc;
-               synchronized (jobAccumulators) {
-                       acc = jobAccumulators.get(jobID);
-               }
-
-               if (acc == null || acc.getAccumulators().isEmpty()) {
-                       return new StringifiedAccumulatorResult[0];
-               }
-
-               Map<String, Accumulator<?, ?>> accMap = acc.getAccumulators();
-
-               StringifiedAccumulatorResult[] result = new 
StringifiedAccumulatorResult[accMap.size()];
-               int i = 0;
-               for (Map.Entry<String, Accumulator<?, ?>> entry : 
accMap.entrySet()) {
-                       String type = entry.getValue() == null ? "(null)" : 
entry.getValue().getClass().getSimpleName();
-                       String value = entry.getValue() == null ? "(null)" : 
entry.getValue().toString();
-                       result[i++] = new 
StringifiedAccumulatorResult(entry.getKey(), type, value);
-               }
-               return result;
-       }
-
-       /**
-        * Cleanup data for the oldest jobs if the maximum number of entries is 
reached.
-        *
-        * @param jobId The (potentially new) JobId.
-        */
-       private void cleanup(JobID jobId) {
-               if (!lru.contains(jobId)) {
-                       lru.addFirst(jobId);
-               }
-               if (lru.size() > this.maxEntries) {
-                       JobID toRemove = lru.removeLast();
-                       this.jobAccumulators.remove(toRemove);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
deleted file mode 100644
index 970d993..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.accumulators;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-
-/**
- * Simple class wrapping a map of accumulators for a single job. Just for 
better
- * handling.
- */
-public class JobAccumulators {
-
-       private final Map<String, Accumulator<?, ?>> accumulators = new 
HashMap<String, Accumulator<?, ?>>();
-
-       public Map<String, Accumulator<?, ?>> getAccumulators() {
-               return this.accumulators;
-       }
-
-       public void processNew(Map<String, Accumulator<?, ?>> newAccumulators) {
-               AccumulatorHelper.mergeInto(this.accumulators, newAccumulators);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 345e1ab..b3130a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
@@ -356,6 +357,11 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                        throw new Exception("Illegal input group size in task 
configuration: " + groupSize);
                }
 
+               final AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
+               final AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
+
+               inputReader.setReporter(reporter);
+
                this.inputTypeSerializerFactory = 
this.config.getInputSerializer(0, getUserCodeClassLoader());
                @SuppressWarnings({ "rawtypes" })
                final MutableObjectIterator<?> iter = new 
ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 0bbe4bf..3f1c642 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -39,9 +40,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
@@ -187,25 +188,22 @@ public class DataSourceTask<OT> extends AbstractInvokable 
{
                                        format.close();
                                }
                        } // end for all input splits
-                       
+
                        // close the collector. if it is a chaining task 
collector, it will close its chained tasks
                        this.output.close();
-                       
+
                        // close all chained tasks letting them report failure
                        RegularPactTask.closeChainedTasks(this.chainedTasks, 
this);
-                       
-                       // Merge and report accumulators
-                       
RegularPactTask.reportAndClearAccumulators(getEnvironment(),
-                                       new HashMap<String, 
Accumulator<?,?>>(), chainedTasks);
+
                }
                catch (Exception ex) {
                        // close the input, but do not report any exceptions, 
since we already have another root cause
                        try {
                                this.format.close();
                        } catch (Throwable ignored) {}
-                       
+
                        RegularPactTask.cancelChainedTasks(this.chainedTasks);
-                       
+
                        ex = 
ExceptionInChainedStubException.exceptionUnwrap(ex);
 
                        if (ex instanceof CancelTaskException) {
@@ -275,7 +273,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                catch (Throwable t) {
                        throw new RuntimeException("The user defined 
'configure()' method caused an error: " + t.getMessage(), t);
                }
-               
+
                // get the factory for the type serializer
                this.serializerFactory = 
this.config.getOutputSerializer(userCodeClassLoader);
        }
@@ -287,7 +285,14 @@ public class DataSourceTask<OT> extends AbstractInvokable {
        private void initOutputs(ClassLoader cl) throws Exception {
                this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
                this.eventualOutputs = new ArrayList<RecordWriter<?>>();
-               this.output = RegularPactTask.initOutputs(this, cl, 
this.config, this.chainedTasks, this.eventualOutputs, getExecutionConfig());
+
+               final AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
+               final AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
+
+               Map<String, Accumulator<?, ?>> accumulatorMap = 
accumulatorRegistry.getUserMap();
+
+               this.output = RegularPactTask.initOutputs(this, cl, 
this.config, this.chainedTasks, this.eventualOutputs,
+                               getExecutionConfig(), reporter, accumulatorMap);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index d0f4116..a53f5bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -74,7 +74,7 @@ public interface PactDriver<S extends Function, OT> {
         *                   code typically signal situations where this 
instance in unable to proceed, exceptions
         *                   from the user code should be forwarded.
         */
-       void run() throws Exception; 
+       void run() throws Exception;
        
        /**
         * This method is invoked in any case (clean termination and exception) 
at the end of the tasks operation.

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index b296506..bc23fa3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -50,7 +50,7 @@ public interface PactTaskContext<S, OT> {
        MemoryManager getMemoryManager();
        
        IOManager getIOManager();
-       
+
        <X> MutableObjectIterator<X> getInput(int index);
        
        <X> TypeSerializerFactory<X> getInputSerializer(int index);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 1c3328e..78bf383 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -20,18 +20,17 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -71,7 +70,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -114,7 +112,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
        protected List<RecordWriter<?>> eventualOutputs;
 
        /**
-        * The input readers to this task.
+        * The input readers of this task.
         */
        protected MutableReader<?>[] inputReaders;
 
@@ -212,7 +210,11 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
         */
        protected volatile boolean running = true;
 
-       
+       /**
+        * The accumulator map used in the RuntimeContext.
+        */
+       protected Map<String, Accumulator<?,?>> accumulatorMap;
+
        // 
--------------------------------------------------------------------------------------------
        //                                  Task Interface
        // 
--------------------------------------------------------------------------------------------
@@ -273,7 +275,9 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                        LOG.debug(formatLogString("Start task code."));
                }
 
-               this.runtimeUdfContext = 
createRuntimeContext(getEnvironment().getTaskName());
+               Environment env = getEnvironment();
+
+               this.runtimeUdfContext = 
createRuntimeContext(env.getTaskName());
 
                // whatever happens in this scope, make sure that the local 
strategies are cleaned up!
                // note that the initialization of the local strategies is in 
the try-finally block as well,
@@ -367,6 +371,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
 
                        clearReaders(inputReaders);
                        clearWriters(eventualOutputs);
+
                }
 
                if (this.running) {
@@ -505,18 +510,6 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
 
                        // close all chained tasks letting them report failure
                        RegularPactTask.closeChainedTasks(this.chainedTasks, 
this);
-
-                       // Collect the accumulators of all involved UDFs and 
send them to the
-                       // JobManager. close() has been called earlier for all 
involved UDFs
-                       // (using this.stub.close() and closeChainedTasks()), 
so UDFs can no longer
-                       // modify accumulators;
-
-                       // collect the counters from the udf in the core driver
-                       Map<String, Accumulator<?, ?>> accumulators =
-                                       
FunctionUtils.getFunctionRuntimeContext(this.stub, 
this.runtimeUdfContext).getAllAccumulators();
-                       
-                       // collect accumulators from chained tasks and report 
them
-                       reportAndClearAccumulators(getEnvironment(), 
accumulators, this.chainedTasks);
                }
                catch (Exception ex) {
                        // close the input, but do not report any exceptions, 
since we already have another root cause
@@ -557,60 +550,6 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                }
        }
 
-       /**
-        * This method is called at the end of a task, receiving the 
accumulators of
-        * the task and the chained tasks. It merges them into a single map of
-        * accumulators and sends them to the JobManager.
-        *
-        * @param chainedTasks
-        *          Each chained task might have accumulators which will be 
merged
-        *          with the accumulators of the stub.
-        */
-       protected static void reportAndClearAccumulators(Environment env,
-                                                                               
                        Map<String, Accumulator<?, ?>> accumulators,
-                                                                               
                        ArrayList<ChainedDriver<?, ?>> chainedTasks) {
-
-               // We can merge here the accumulators from the stub and the 
chained
-               // tasks. Type conflicts can occur here if counters with same 
name but
-               // different type were used.
-               
-               if (!chainedTasks.isEmpty()) {
-                       if (accumulators == null) {
-                               accumulators = new HashMap<String, 
Accumulator<?, ?>>();
-                       }
-                       
-                       for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-                               RuntimeContext rc = 
FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
-                               if (rc != null) {
-                                       Map<String, Accumulator<?, ?>> 
chainedAccumulators = rc.getAllAccumulators();
-                                       if (chainedAccumulators != null) {
-                                               
AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
-                                       }
-                               }
-                       }
-               }
-
-               // Don't report if the UDF didn't collect any accumulators
-               if (accumulators == null || accumulators.size() == 0) {
-                       return;
-               }
-
-               // Report accumulators to JobManager
-               env.reportAccumulators(accumulators);
-
-               // We also clear the accumulators, since stub instances might 
be reused
-               // (e.g. in iterations) and we don't want to count twice. This 
may not be
-               // done before sending
-               AccumulatorHelper.resetAndClearAccumulators(accumulators);
-               
-               for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-                       RuntimeContext rc = 
FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
-                       if (rc != null) {
-                               
AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators());
-                       }
-               }
-       }
-
        protected void closeLocalStrategiesAndCaches() {
                
                // make sure that all broadcast variable references held by 
this task are released
@@ -725,6 +664,9 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
 
                int currentReaderOffset = 0;
 
+               AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
+               AccumulatorRegistry.Reporter reporter = 
registry.getReadWriteReporter();
+
                for (int i = 0; i < numInputs; i++) {
                        //  ---------------- create the input readers 
---------------------
                        // in case where a logical input unions multiple 
physical inputs, create a union reader
@@ -744,6 +686,8 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                throw new Exception("Illegal input group size 
in task configuration: " + groupSize);
                        }
 
+                       inputReaders[i].setReporter(reporter);
+
                        currentReaderOffset += groupSize;
                }
                this.inputReaders = inputReaders;
@@ -1073,14 +1017,21 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
 
                ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 
-               this.output = initOutputs(this, userCodeClassLoader, 
this.config, this.chainedTasks, this.eventualOutputs, 
this.getExecutionConfig());
+               AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
+               AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
+
+               this.accumulatorMap = accumulatorRegistry.getUserMap();
+
+               this.output = initOutputs(this, userCodeClassLoader, 
this.config, this.chainedTasks, this.eventualOutputs,
+                               this.getExecutionConfig(), reporter, 
this.accumulatorMap);
        }
 
        public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
                Environment env = getEnvironment();
+
                return new DistributedRuntimeUDFContext(taskName, 
env.getNumberOfSubtasks(),
                                env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(),
-                               env.getDistributedCacheEntries());
+                               env.getDistributedCacheEntries(), 
this.accumulatorMap);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -1257,7 +1208,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
         * @return The OutputCollector that data produced in this task is 
submitted to.
         */
        public static <T> Collector<T> getOutputCollector(AbstractInvokable 
task, TaskConfig config, ClassLoader cl,
-                       List<RecordWriter<?>> eventualOutputs, int 
outputOffset, int numOutputs) throws Exception
+                       List<RecordWriter<?>> eventualOutputs, int 
outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws 
Exception
        {
                if (numOutputs == 0) {
                        return null;
@@ -1286,11 +1237,15 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                        }
                                        final DataDistribution distribution = 
config.getOutputDataDistribution(i, cl);
                                        final Partitioner<?> partitioner = 
config.getOutputPartitioner(i, cl);
-                                       
+
                                        oe = new RecordOutputEmitter(strategy, 
comparator, partitioner, distribution);
                                }
 
-                               writers.add(new 
RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe));
+                               // setup accumulator counters
+                               final RecordWriter<Record> recordWriter = new 
RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe);
+                               recordWriter.setReporter(reporter);
+
+                               writers.add(recordWriter);
                        }
                        if (eventualOutputs != null) {
                                eventualOutputs.addAll(writers);
@@ -1318,12 +1273,18 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                else {
                                        final DataDistribution dataDist = 
config.getOutputDataDistribution(i, cl);
                                        final Partitioner<?> partitioner = 
config.getOutputPartitioner(i, cl);
-                                       
+
                                        final TypeComparator<T> comparator = 
compFactory.createComparator();
                                        oe = new OutputEmitter<T>(strategy, 
comparator, partitioner, dataDist);
                                }
 
-                               writers.add(new 
RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset
 + i), oe));
+                               final RecordWriter<SerializationDelegate<T>> 
recordWriter =
+                                               new 
RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset
 + i), oe);
+
+                               // setup live accumulator counters
+                               recordWriter.setReporter(reporter);
+
+                               writers.add(recordWriter);
                        }
                        if (eventualOutputs != null) {
                                eventualOutputs.addAll(writers);
@@ -1338,7 +1299,11 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
         */
        @SuppressWarnings("unchecked")
        public static <T> Collector<T> initOutputs(AbstractInvokable 
nepheleTask, ClassLoader cl, TaskConfig config,
-                                       List<ChainedDriver<?, ?>> 
chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig 
executionConfig)
+                                                                               
List<ChainedDriver<?, ?>> chainedTasksTarget,
+                                                                               
List<RecordWriter<?>> eventualOutputs,
+                                                                               
ExecutionConfig executionConfig,
+                                                                               
AccumulatorRegistry.Reporter reporter,
+                                                                               
Map<String, Accumulator<?,?>> accumulatorMap)
        throws Exception
        {
                final int numOutputs = config.getNumOutputs();
@@ -1370,12 +1335,12 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                                final TaskConfig chainedStubConf = 
config.getChainedStubConfig(i);
                                final String taskName = 
config.getChainedTaskName(i);
 
-                               if (i == numChained -1) {
+                               if (i == numChained - 1) {
                                        // last in chain, instantiate the 
output collector for this task
-                                       previous = 
getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, 
chainedStubConf.getNumOutputs());
+                                       previous = 
getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, 
chainedStubConf.getNumOutputs(), reporter);
                                }
 
-                               ct.setup(chainedStubConf, taskName, previous, 
nepheleTask, cl, executionConfig);
+                               ct.setup(chainedStubConf, taskName, previous, 
nepheleTask, cl, executionConfig, accumulatorMap);
                                chainedTasksTarget.add(0, ct);
 
                                previous = ct;
@@ -1386,7 +1351,7 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
                // else
 
                // instantiate the output collector the default way from this 
configuration
-               return getOutputCollector(nepheleTask , config, cl, 
eventualOutputs, 0, numOutputs);
+               return getOutputCollector(nepheleTask , config, cl, 
eventualOutputs, 0, numOutputs, reporter);
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index b4cfa27..ea6cfe3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.chaining;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.runtime.execution.Environment;
@@ -28,6 +29,8 @@ import 
org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 
+import java.util.Map;
+
 /**
  * The interface to be implemented by drivers that do not run in an own pact 
task context, but are chained to other
  * tasks.
@@ -50,20 +53,23 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
 
        
        public void setup(TaskConfig config, String taskName, Collector<OT> 
outputCollector,
-                       AbstractInvokable parent, ClassLoader 
userCodeClassLoader, ExecutionConfig executionConfig)
+                       AbstractInvokable parent, ClassLoader 
userCodeClassLoader, ExecutionConfig executionConfig,
+                       Map<String, Accumulator<?,?>> accumulatorMap)
        {
                this.config = config;
                this.taskName = taskName;
                this.outputCollector = outputCollector;
                this.userCodeClassLoader = userCodeClassLoader;
-               
+
+               Environment env = parent.getEnvironment();
+
                if (parent instanceof RegularPactTask) {
                        this.udfContext = ((RegularPactTask<?, ?>) 
parent).createRuntimeContext(taskName);
                } else {
-                       Environment env = parent.getEnvironment();
                        this.udfContext = new 
DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
                                        env.getIndexInSubtaskGroup(), 
userCodeClassLoader, parent.getExecutionConfig(),
-                                       env.getDistributedCacheEntries());
+                                       env.getDistributedCacheEntries(), 
accumulatorMap
+                       );
                }
 
                this.executionConfig = executionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index f4cd354..4b480ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
@@ -41,12 +42,14 @@ public class DistributedRuntimeUDFContext extends 
AbstractRuntimeUDFContext {
        private final HashMap<String, BroadcastVariableMaterialization<?, ?>> 
broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
        
        
-       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, 
ExecutionConfig executionConfig) {
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig);
+       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+                                                                               
ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators);
        }
        
-       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, 
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks) {
-               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, cpTasks);
+       public DistributedRuntimeUDFContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+                                                                               
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, 
Accumulator<?,?>> accumulators) {
+               super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, accumulators, cpTasks);
        }
        
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 5ab0150..f166c36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.taskmanager;
 import akka.actor.ActorRef;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -34,12 +33,10 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializedValue;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -76,7 +73,9 @@ public class RuntimeEnvironment implements Environment {
        private final InputGate[] inputGates;
        
        private final ActorRef jobManagerActor;
-       
+
+       private final AccumulatorRegistry accumulatorRegistry;
+
        // 
------------------------------------------------------------------------
 
        public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, 
ExecutionAttemptID executionId,
@@ -86,6 +85,7 @@ public class RuntimeEnvironment implements Environment {
                                                                ClassLoader 
userCodeClassLoader,
                                                                MemoryManager 
memManager, IOManager ioManager,
                                                                
BroadcastVariableManager bcVarManager,
+                                                               
AccumulatorRegistry accumulatorRegistry,
                                                                
InputSplitProvider splitProvider,
                                                                Map<String, 
Future<Path>> distCacheEntries,
                                                                
ResultPartitionWriter[] writers,
@@ -93,7 +93,7 @@ public class RuntimeEnvironment implements Environment {
                                                                ActorRef 
jobManagerActor) {
                
                checkArgument(parallelism > 0 && subtaskIndex >= 0 && 
subtaskIndex < parallelism);
-               
+
                this.jobId = checkNotNull(jobId);
                this.jobVertexId = checkNotNull(jobVertexId);
                this.executionId = checkNotNull(executionId);
@@ -107,6 +107,7 @@ public class RuntimeEnvironment implements Environment {
                this.memManager = checkNotNull(memManager);
                this.ioManager = checkNotNull(ioManager);
                this.bcVarManager = checkNotNull(bcVarManager);
+               this.accumulatorRegistry = checkNotNull(accumulatorRegistry);
                this.splitProvider = checkNotNull(splitProvider);
                this.distCacheEntries = checkNotNull(distCacheEntries);
                this.writers = checkNotNull(writers);
@@ -183,6 +184,11 @@ public class RuntimeEnvironment implements Environment {
        }
 
        @Override
+       public AccumulatorRegistry getAccumulatorRegistry() {
+               return accumulatorRegistry;
+       }
+
+       @Override
        public InputSplitProvider getInputSplitProvider() {
                return splitProvider;
        }
@@ -213,20 +219,6 @@ public class RuntimeEnvironment implements Environment {
        }
 
        @Override
-       public void reportAccumulators(Map<String, Accumulator<?, ?>> 
accumulators) {
-               AccumulatorEvent evt;
-               try {
-                       evt = new AccumulatorEvent(getJobID(), accumulators);
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Cannot serialize 
accumulators to send them to JobManager", e);
-               }
-
-               ReportAccumulatorResult accResult = new 
ReportAccumulatorResult(jobId, executionId, evt);
-               jobManagerActor.tell(accResult, ActorRef.noSender());
-       }
-
-       @Override
        public void acknowledgeCheckpoint(long checkpointId) {
                acknowledgeCheckpoint(checkpointId, null);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 616998c..13a2ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -100,10 +101,10 @@ public class Task implements Runnable {
 
        /** The class logger. */
        private static final Logger LOG = LoggerFactory.getLogger(Task.class);
-       
+
        /** The tread group that contains all task threads */
        private static final ThreadGroup TASK_THREADS_GROUP = new 
ThreadGroup("Flink Task Threads");
-       
+
        /** For atomic state updates */
        private static final AtomicReferenceFieldUpdater<Task, ExecutionState> 
STATE_UPDATER =
                        AtomicReferenceFieldUpdater.newUpdater(Task.class, 
ExecutionState.class, "executionState");
@@ -176,13 +177,16 @@ public class Task implements Runnable {
 
        /** The library cache, from which the task can request its required JAR 
files */
        private final LibraryCacheManager libraryCache;
-       
+
        /** The cache for user-defined files that the invokable requires */
        private final FileCache fileCache;
-       
+
        /** The gateway to the network stack, which handles inputs and produced 
results */
        private final NetworkEnvironment network;
 
+       /** The registry of this task which enables live reporting of 
accumulators */
+       private final AccumulatorRegistry accumulatorRegistry;
+
        /** The thread that executes the task */
        private final Thread executingThread;
 
@@ -194,10 +198,10 @@ public class Task implements Runnable {
 
        /** atomic flag that makes sure the invokable is canceled exactly once 
upon error */
        private final AtomicBoolean invokableHasBeenCanceled;
-       
+
        /** The invokable of this task, if initialized */
        private volatile AbstractInvokable invokable;
-       
+
        /** The current execution state of the task */
        private volatile ExecutionState executionState = ExecutionState.CREATED;
 
@@ -245,12 +249,13 @@ public class Task implements Runnable {
 
                this.memoryManager = checkNotNull(memManager);
                this.ioManager = checkNotNull(ioManager);
-               this.broadcastVariableManager =checkNotNull(bcVarManager);
+               this.broadcastVariableManager = checkNotNull(bcVarManager);
+               this.accumulatorRegistry = new AccumulatorRegistry(jobId, 
executionId);
 
                this.jobManager = checkNotNull(jobManagerActor);
                this.taskManager = checkNotNull(taskManagerActor);
                this.actorAskTimeout = new 
Timeout(checkNotNull(actorAskTimeout));
-               
+
                this.libraryCache = checkNotNull(libraryCache);
                this.fileCache = checkNotNull(fileCache);
                this.network = checkNotNull(networkEnvironment);
@@ -361,6 +366,10 @@ public class Task implements Runnable {
                return inputGatesById.get(id);
        }
 
+       public AccumulatorRegistry getAccumulatorRegistry() {
+               return accumulatorRegistry;
+       }
+
        public Thread getExecutingThread() {
                return executingThread;
        }
@@ -499,7 +508,8 @@ public class Task implements Runnable {
                        Environment env = new RuntimeEnvironment(jobId, 
vertexId, executionId,
                                        taskName, taskNameWithSubtask, 
subtaskIndex, parallelism,
                                        jobConfiguration, taskConfiguration,
-                                       userCodeClassLoader, memoryManager, 
ioManager, broadcastVariableManager,
+                                       userCodeClassLoader, memoryManager, 
ioManager,
+                                       broadcastVariableManager, 
accumulatorRegistry,
                                        splitProvider, distributedCacheEntries,
                                        writers, inputGates, jobManager);
 
@@ -518,7 +528,7 @@ public class Task implements Runnable {
 
                        // get our private reference onto the stack (be safe 
against concurrent changes) 
                        SerializedValue<StateHandle<?>> operatorState = 
this.operatorState;
-                       
+
                        if (operatorState != null) {
                                if (invokable instanceof OperatorStateCarrier) {
                                        try {
@@ -553,7 +563,7 @@ public class Task implements Runnable {
                        if (!STATE_UPDATER.compareAndSet(this, 
ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                                throw new CancelTaskException();
                        }
-                       
+
                        // notify everyone that we switched to running. 
especially the TaskManager needs
                        // to know this!
                        notifyObservers(ExecutionState.RUNNING, null);
@@ -653,7 +663,7 @@ public class Task implements Runnable {
                finally {
                        try {
                                LOG.info("Freeing task resources for " + 
taskNameWithSubtask);
-                               
+
                                // stop the async dispatcher.
                                // copy dispatcher reference to stack, against 
concurrent release
                                ExecutorService dispatcher = 
this.asyncCallDispatcher;
@@ -867,15 +877,15 @@ public class Task implements Runnable {
         */
        public void triggerCheckpointBarrier(final long checkpointID, final 
long checkpointTimestamp) {
                AbstractInvokable invokable = this.invokable;
-               
+
                if (executionState == ExecutionState.RUNNING && invokable != 
null) {
                        if (invokable instanceof CheckpointedOperator) {
-                               
+
                                // build a local closure 
                                final CheckpointedOperator checkpointer = 
(CheckpointedOperator) invokable;
                                final Logger logger = LOG;
                                final String taskName = taskNameWithSubtask;
-                               
+
                                Runnable runnable = new Runnable() {
                                        @Override
                                        public void run() {
@@ -1038,7 +1048,7 @@ public class Task implements Runnable {
        public String toString() {
                return getTaskNameWithSubtasks() + " [" + executionState + ']';
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Task Names
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 6c85ab5..0637017 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import java.util.Arrays;
 
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
@@ -52,8 +53,11 @@ public class TaskExecutionState implements 
java.io.Serializable {
        // class may not be part of the system class loader.
        private transient Throwable cachedError;
 
+       /** Serialized flink and user-defined accumulators */
+       private final AccumulatorSnapshot accumulators;
+
        /**
-        * Creates a new task execution state update, with no attached 
exception.
+        * Creates a new task execution state update, with no attached 
exception and no accumulators.
         *
         * @param jobID
         *        the ID of the job the task belongs to
@@ -63,13 +67,28 @@ public class TaskExecutionState implements 
java.io.Serializable {
         *        the execution state to be reported
         */
        public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, 
ExecutionState executionState) {
-               this(jobID, executionId, executionState, null);
+               this(jobID, executionId, executionState, null, null);
        }
-       
+
+       /**
+        * Creates a new task execution state update, with an attached 
exception but no accumulators.
+        *
+        * @param jobID
+        *        the ID of the job the task belongs to
+        * @param executionId
+        *        the ID of the task execution whose state is to be reported
+        * @param executionState
+        *        the execution state to be reported
+        */
+       public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
+                                                       ExecutionState 
executionState, Throwable error) {
+               this(jobID, executionId, executionState, error, null);
+       }
+
        /**
         * Creates a new task execution state update, with an attached 
exception.
         * This constructor may never throw an exception.
-        * 
+        *
         * @param jobID
         *        the ID of the job the task belongs to
         * @param executionId
@@ -78,11 +97,15 @@ public class TaskExecutionState implements 
java.io.Serializable {
         *        the execution state to be reported
         * @param error
         *        an optional error
+        * @param accumulators
+        *        The flink and user-defined accumulators which may be null.
         */
        public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
-                                                               ExecutionState 
executionState, Throwable error) {
+                       ExecutionState executionState, Throwable error,
+                       AccumulatorSnapshot accumulators) {
 
-               if (jobID == null || executionId == null || executionState == 
null) {
+
+                       if (jobID == null || executionId == null || 
executionState == null) {
                        throw new NullPointerException();
                }
 
@@ -90,6 +113,7 @@ public class TaskExecutionState implements 
java.io.Serializable {
                this.executionId = executionId;
                this.executionState = executionState;
                this.cachedError = error;
+               this.accumulators = accumulators;
 
                if (error != null) {
                        byte[] serializedError;
@@ -178,6 +202,13 @@ public class TaskExecutionState implements 
java.io.Serializable {
                return this.jobID;
        }
 
+       /**
+        * Gets flink and user-defined accumulators in serialized form.
+        */
+       public AccumulatorSnapshot getAccumulators() {
+               return accumulators;
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
index f5e897b..6a5468a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
  * special class loader, the deserialization fails with a {@code 
ClassNotFoundException}.
  *
  * To work around that issue, the SerializedValue serialized data immediately 
into a byte array.
- * When send through RPC or another service that uses serialization, the only 
the byte array is
+ * When send through RPC or another service that uses serialization, only the 
byte array is
  * transferred. The object is deserialized later (upon access) and requires 
the accessor to
  * provide the corresponding class loader.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3b4ce15..8823041 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -29,39 +29,39 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
-import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
-import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, 
UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint}
-import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.{EnvironmentInformation, SerializedValue, 
ZooKeeperUtil}
-import org.apache.flink.runtime.{ActorLogMessages, ActorSynchronousLogging, 
StreamingMode}
+import org.apache.flink.runtime.util.ZooKeeperUtil
+import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, 
ActorLogMessages}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, 
Heartbeat}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 
-import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the 
tasks, gathering the
@@ -97,7 +97,6 @@ class JobManager(
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
-    protected val accumulatorManager: AccumulatorManager,
     protected val defaultExecutionRetries: Int,
     protected val delayBetweenRetries: Long,
     protected val timeout: FiniteDuration,
@@ -221,7 +220,6 @@ class JobManager(
               originalSender ! result
             }(context.dispatcher)
 
-            sender ! true
           case None => log.error("Cannot find execution graph for ID " +
             s"${taskExecutionState.getJobID} to change state to " +
             s"${taskExecutionState.getExecutionState}.")
@@ -298,7 +296,7 @@ class JobManager(
             newJobStatus match {
               case JobStatus.FINISHED =>
                 val accumulatorResults: java.util.Map[String, 
SerializedValue[AnyRef]] = try {
-                  accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
+                  executionGraph.getAccumulatorsSerialized
                 } catch {
                   case e: Exception =>
                     log.error(s"Cannot fetch serialized accumulators for job 
$jobID", e)
@@ -402,13 +400,22 @@ class JobManager(
       import scala.collection.JavaConverters._
       sender ! 
RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-    case Heartbeat(instanceID, metricsReport) =>
-      try {
-        log.debug(s"Received hearbeat message from $instanceID.")
-        instanceManager.reportHeartBeat(instanceID, metricsReport)
-      } catch {
-        case t: Throwable => log.error(s"Could not report heart beat from 
${sender().path}.", t)
-      }
+    case Heartbeat(instanceID, metricsReport, accumulators) =>
+      log.debug(s"Received hearbeat message from $instanceID.")
+
+      Future {
+        accumulators foreach {
+          case accumulators =>
+              currentJobs.get(accumulators.getJobID) match {
+                case Some((jobGraph, jobInfo)) =>
+                  jobGraph.updateAccumulators(accumulators)
+                case None =>
+                  // ignore accumulator values for old job
+              }
+        }
+      }(context.dispatcher)
+
+      instanceManager.reportHeartBeat(instanceID, metricsReport)
 
     case message: AccumulatorMessage => handleAccumulatorMessage(message)
 
@@ -676,33 +683,18 @@ class JobManager(
    * @param message The accumulator message.
    */
   private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {
-
     message match {
-      case ReportAccumulatorResult(jobId, _, accumulatorEvent) =>
-        val classLoader = try {
-          libraryCacheManager.getClassLoader(jobId)
-        } catch {
-          case e: Exception =>
-            log.error("Dropping accumulators. No class loader available for 
job " + jobId, e)
-            return
-        }
-
-        if (classLoader != null) {
-          try {
-            val accumulators = accumulatorEvent.deserializeValue(classLoader)
-            accumulatorManager.processIncomingAccumulators(jobId, accumulators)
-          }
-          catch {
-            case e: Exception => log.error("Cannot update accumulators for job 
" + jobId, e)
-          }
-        } else {
-          log.error("Dropping accumulators. No class loader available for job 
" + jobId)
-        }
 
       case RequestAccumulatorResults(jobID) =>
         try {
-          val accumulatorValues: java.util.Map[String, 
SerializedValue[Object]] =
-            accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
+          val accumulatorValues: java.util.Map[String, 
SerializedValue[Object]] = {
+            currentJobs.get(jobID) match {
+              case Some((graph, jobInfo)) =>
+                graph.getAccumulatorsSerialized
+              case None =>
+                null // TODO check also archive
+            }
+          }
 
           sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
         }
@@ -714,8 +706,31 @@ class JobManager(
 
       case RequestAccumulatorResultsStringified(jobId) =>
         try {
-          val accumulatorValues: Array[StringifiedAccumulatorResult] =
-            accumulatorManager.getJobAccumulatorResultsStringified(jobId)
+          val accumulatorValues: Array[StringifiedAccumulatorResult] = {
+            currentJobs.get(jobId) match {
+              case Some((graph, jobInfo)) =>
+                val accumulators = graph.aggregateUserAccumulators()
+
+                val result: Array[StringifiedAccumulatorResult] = new
+                    Array[StringifiedAccumulatorResult](accumulators.size)
+
+                var i = 0
+                accumulators foreach {
+                  case (name, accumulator) =>
+                    val (typeString, valueString) =
+                      if (accumulator != null) {
+                        (accumulator.getClass.getSimpleName, 
accumulator.toString)
+                      } else {
+                        (null, null)
+                      }
+                    result(i) = new StringifiedAccumulatorResult(name, 
typeString, valueString)
+                    i += 1
+                }
+                result
+              case None =>
+                null // TODO check also archive
+            }
+          }
 
           sender() ! AccumulatorResultStringsFound(jobId, accumulatorValues)
         }
@@ -1058,7 +1073,7 @@ object JobManager {
    */
   def createJobManagerComponents(configuration: Configuration)
     : (ExecutionContext, InstanceManager, FlinkScheduler, 
BlobLibraryCacheManager,
-      Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
+      Props, Int, Long, FiniteDuration, Int) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -1091,8 +1106,6 @@ object JobManager {
 
     val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
 
-    val accumulatorManager: AccumulatorManager = new 
AccumulatorManager(Math.min(1, archiveCount))
-
     val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
 
     var blobServer: BlobServer = null
@@ -1131,7 +1144,6 @@ object JobManager {
       scheduler,
       libraryCacheManager,
       archiveProps,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -1179,7 +1191,6 @@ object JobManager {
       scheduler,
       libraryCacheManager,
       archiveProps,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -1199,7 +1210,6 @@ object JobManager {
       scheduler,
       libraryCacheManager,
       archiver,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index b12f1b5..6cb571c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.messages
 
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.instance.InstanceID
 
 /**
@@ -52,8 +53,10 @@ object TaskManagerMessages {
    *
    * @param instanceID The instance ID of the reporting TaskManager.
    * @param metricsReport utf-8 encoded JSON metrics report from the 
metricRegistry.
+   * @param accumulators Accumulators of tasks serialized as Tuple2[internal, 
user-defined]
    */
-  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
+  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte],
+     accumulators: Seq[AccumulatorSnapshot])
 
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
index 82c4ab6..015c96e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
@@ -19,8 +19,7 @@
 package org.apache.flink.runtime.messages.accumulators
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.accumulators.{StringifiedAccumulatorResult, 
AccumulatorEvent}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
 import org.apache.flink.runtime.util.SerializedValue
 
 /**
@@ -38,18 +37,6 @@ sealed trait AccumulatorMessage {
 sealed trait AccumulatorResultsResponse extends AccumulatorMessage
 
 /**
- * Reports the accumulator results of the individual tasks to the job manager.
- *
- * @param jobID The ID of the job the accumulator belongs to
- * @param executionId The ID of the task execution that the accumulator 
belongs to.
- * @param accumulatorEvent The serialized accumulators
- */
-case class ReportAccumulatorResult(jobID: JobID,
-                                   executionId: ExecutionAttemptID,
-                                   accumulatorEvent: AccumulatorEvent)
-  extends AccumulatorMessage
-
-/**
  * Requests the accumulator results of the job identified by [[jobID]] from 
the job manager.
  * The result is sent back to the sender as a [[AccumulatorResultsResponse]] 
message.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1a35d01..f07fa0c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,6 +37,7 @@ import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{Configuration, ConfigConstants, 
GlobalConfiguration, IllegalConfigurationException}
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, 
TriggerCheckpoint, AbstractCheckpointMessage}
+import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, 
AccumulatorRegistry}
 import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, 
ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
@@ -68,6 +69,7 @@ import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success}
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 import scala.language.postfixOps
 
@@ -328,7 +330,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
               
network.getPartitionManager.releasePartitionsProducedBy(executionID)
             } catch {
               case t: Throwable => killTaskManagerFatal(
-                "Fatal leak: Unable to release intermediate result partition 
data", t)
+              "Fatal leak: Unable to release intermediate result partition 
data", t)
             }
           }
 
@@ -389,7 +391,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
           } else {
             log.debug(s"Cannot find task to cancel for execution 
${executionID})")
             sender ! new TaskOperationResult(executionID, false,
-              "No task with that execution ID was found.")
+            "No task with that execution ID was found.")
           }
 
         case PartitionState(taskExecutionId, taskResultId, partitionId, state) 
=>
@@ -400,7 +402,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
               log.debug(s"Cannot find task $taskExecutionId to respond with 
partition state.")
           }
       }
-    }
+      }
   }
 
   /**
@@ -793,12 +795,12 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
       // create the task. this does not grab any TaskManager resources or 
download
       // and libraries - the operation does not block
-      val execId = tdd.getExecutionId
       val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
                           self, jobManagerActor, config.timeout, libCache, 
fileCache)
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
-      
+
+      val execId = tdd.getExecutionId
       // add the task to the map
       val prevTask = runningTasks.put(execId, task)
       if (prevTask != null) {
@@ -898,22 +900,28 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
     val task = runningTasks.remove(executionID)
     if (task != null) {
-      
-        // the task must be in a terminal state
-        if (!task.getExecutionState.isTerminal) {
-          try {
-            task.failExternally(new Exception("Task is being removed from 
TaskManager"))
-          } catch {
-            case e: Exception => log.error("Could not properly fail task", e)
-          }
+
+      // the task must be in a terminal state
+      if (!task.getExecutionState.isTerminal) {
+        try {
+          task.failExternally(new Exception("Task is being removed from 
TaskManager"))
+        } catch {
+          case e: Exception => log.error("Could not properly fail task", e)
         }
+      }
+
+      log.info(s"Unregistering task and sending final execution state " +
+        s"${task.getExecutionState} to JobManager for task ${task.getTaskName} 
" +
+        s"(${task.getExecutionId})")
 
-        log.info(s"Unregistering task and sending final execution state " +
-          s"${task.getExecutionState} to JobManager for task 
${task.getTaskName} " +
-          s"(${task.getExecutionId})")
+      val accumulators = {
+        val registry = task.getAccumulatorRegistry
+        registry.getSnapshot
+      }
 
-        self ! UpdateTaskExecutionState(new TaskExecutionState(
-          task.getJobID, task.getExecutionId, task.getExecutionState, 
task.getFailureCause))
+      self ! UpdateTaskExecutionState(new TaskExecutionState(
+        task.getJobID, task.getExecutionId, task.getExecutionState, 
task.getFailureCause,
+        accumulators))
     }
     else {
       log.error(s"Cannot find task with ID $executionID to unregister.")
@@ -931,9 +939,20 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
   private def sendHeartbeatToJobManager(): Unit = {
     try {
       log.debug("Sending heartbeat to JobManager")
-      val report: Array[Byte] = 
metricRegistryMapper.writeValueAsBytes(metricRegistry)
-      currentJobManager foreach {
-        jm => jm ! Heartbeat(instanceID, report)
+      val metricsReport: Array[Byte] = 
metricRegistryMapper.writeValueAsBytes(metricRegistry)
+
+      val accumulatorEvents =
+        scala.collection.mutable.Buffer[AccumulatorSnapshot]()
+
+      runningTasks foreach {
+        case (execID, task) =>
+          val registry = task.getAccumulatorRegistry
+          val accumulators = registry.getSnapshot
+          accumulatorEvents.append(accumulators)
+      }
+
+       currentJobManager foreach {
+        jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents)
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 4e5fb40..14bf022 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
+import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -183,5 +185,10 @@ public class AbstractReaderTest {
                protected MockReader(InputGate inputGate) {
                        super(inputGate);
                }
+
+               @Override
+               public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 9b9609b..0aab5fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -61,7 +61,7 @@ public class DriverTestBase<S extends Function> implements 
PactTaskContext<S, Re
        private final IOManager ioManager;
        
        private final MemoryManager memManager;
-       
+
        private final List<MutableObjectIterator<Record>> inputs;
        
        private final List<TypeComparator<Record>> comparators;
@@ -105,7 +105,7 @@ public class DriverTestBase<S extends Function> implements 
PactTaskContext<S, Re
                this.perSortFractionMem = (double)perSortMemory/totalMem;
                this.ioManager = new IOManagerAsync();
                this.memManager = totalMem > 0 ? new 
DefaultMemoryManager(totalMem,1) : null;
-               
+
                this.inputs = new ArrayList<MutableObjectIterator<Record>>();
                this.comparators = new ArrayList<TypeComparator<Record>>();
                this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
@@ -295,7 +295,7 @@ public class DriverTestBase<S extends Function> implements 
PactTaskContext<S, Re
        public IOManager getIOManager() {
                return this.ioManager;
        }
-       
+
        @Override
        public MemoryManager getMemoryManager() {
                return this.memManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0f62b27..b9cb416 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -79,6 +80,8 @@ public class MockEnvironment implements Environment {
 
        private final BroadcastVariableManager bcVarManager = new 
BroadcastVariableManager();
 
+       private final AccumulatorRegistry accumulatorRegistry;
+
        private final int bufferSize;
 
        public MockEnvironment(long memorySize, MockInputSplitProvider 
inputSplitProvider, int bufferSize) {
@@ -91,6 +94,8 @@ public class MockEnvironment implements Environment {
                this.ioManager = new IOManagerAsync();
                this.inputSplitProvider = inputSplitProvider;
                this.bufferSize = bufferSize;
+
+               this.accumulatorRegistry = new AccumulatorRegistry(jobID, 
getExecutionId());
        }
 
        public IteratorWrappingTestSingleInputGate<Record> 
addInput(MutableObjectIterator<Record> inputIterator) {
@@ -259,8 +264,8 @@ public class MockEnvironment implements Environment {
        }
 
        @Override
-       public void reportAccumulators(Map<String, Accumulator<?, ?>> 
accumulators) {
-               // discard, this is only for testing
+       public AccumulatorRegistry getAccumulatorRegistry() {
+               return this.accumulatorRegistry;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index e9e761c..bd36dd4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 219e5ae..f2535fa 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -65,7 +65,6 @@ class TestingCluster(userConfiguration: Configuration,
       scheduler,
       libraryCacheManager,
       _,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -82,7 +81,6 @@ class TestingCluster(userConfiguration: Configuration,
         scheduler,
         libraryCacheManager,
         archive,
-        accumulatorManager,
         executionRetries,
         delayBetweenRetries,
         timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 5747b7e..6d316ca 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -147,6 +147,16 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
         case None => sender ! WorkingTaskManager(None)
       }
 
+    case RequestAccumulatorValues(jobID) =>
+
+      val (flinkAccumulators, userAccumulators) = currentJobs.get(jobID) match 
{
+        case Some((graph, jobInfo)) =>
+          (graph.getFlinkAccumulators, graph.aggregateUserAccumulators)
+        case None => null
+      }
+
+      sender ! RequestAccumulatorValuesResponse(jobID, flinkAccumulators, 
userAccumulators)
+
     case NotifyWhenJobStatus(jobID, state) =>
       val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
         scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 241c6c0..46e8486 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph}
 import org.apache.flink.runtime.instance.InstanceGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
+import java.util.Map
+import org.apache.flink.api.common.accumulators.Accumulator
 
 object TestingJobManagerMessages {
 
@@ -53,4 +56,9 @@ object TestingJobManagerMessages {
 
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
+
+  case class RequestAccumulatorValues(jobID: JobID)
+  case class RequestAccumulatorValuesResponse(jobID: JobID,
+    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, 
Accumulator[_,_]]],
+    userAccumulators: Map[String, Accumulator[_,_]])
 }

Reply via email to