http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
index cfd27f7..a7139b6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
@@ -256,6 +257,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, 
T2 extends IOReadable
                }
        }
 
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               for (AdaptiveSpanningRecordDeserializer serializer : 
reader1RecordDeserializers) {
+                       serializer.setReporter(reporter);
+               }
+               for (AdaptiveSpanningRecordDeserializer serializer : 
reader2RecordDeserializers) {
+                       serializer.setReporter(reporter);
+               }
+       }
+
        private class CoBarrierBuffer extends BarrierBuffer {
 
                private CoBarrierBuffer otherBuffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
index 491dc06..44f9a86 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
@@ -57,6 +58,7 @@ public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable
 
        private final BarrierBuffer barrierBuffer;
 
+
        @SuppressWarnings("unchecked")
        protected StreamingAbstractRecordReader(InputGate inputGate) {
                super(inputGate);
@@ -80,7 +82,8 @@ public abstract class StreamingAbstractRecordReader<T extends 
IOReadableWritable
                                DeserializationResult result = 
currentRecordDeserializer.getNextRecord(target);
 
                                if (result.isBufferConsumed()) {
-                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
+                                       Buffer currentBuffer = 
currentRecordDeserializer.getCurrentBuffer();
+                                       currentBuffer.recycle();
                                        currentRecordDeserializer = null;
                                }
 
@@ -130,4 +133,12 @@ public abstract class StreamingAbstractRecordReader<T 
extends IOReadableWritable
        public void cleanup() throws IOException {
                barrierBuffer.cleanup();
        }
+
+       @Override
+       public void setReporter(AccumulatorRegistry.Reporter reporter) {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       deserializer.setReporter(reporter);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
index ad74004..1356af5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
@@ -40,4 +40,5 @@ public class StreamingMutableRecordReader<T extends 
IOReadableWritable> extends
        public void clearBuffers() {
                super.clearBuffers();
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 80239fd..a9ebf5b 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import java.io.IOException;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -52,6 +53,11 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                        InputGate inputGate = 
InputGateFactory.createInputGate(getEnvironment().getAllInputGates());
                        inputs = new 
IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
 
+                       AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
+                       AccumulatorRegistry.Reporter reporter = 
registry.getReadWriteReporter();
+
+                       inputs.setReporter(reporter);
+
                        
inputs.registerTaskEventListener(getSuperstepListener(), 
StreamingSuperstep.class);
 
                        recordIterator = new 
IndexedReaderIterator<StreamRecord<IN>>(inputs, inSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 73f0a89..41ee388 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -24,7 +24,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -59,7 +62,14 @@ public class OutputHandler<OUT> {
        private Map<Integer, StreamConfig> chainedConfigs;
        private List<StreamEdge> outEdgesInOrder;
 
-       public OutputHandler(StreamTask<OUT, ?> vertex) {
+       /**
+        * Counters for the number of records emitted and bytes written.
+        */
+       protected AccumulatorRegistry.Reporter reporter;
+
+
+       public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, 
Accumulator<?,?>> accumulatorMap,
+                                               AccumulatorRegistry.Reporter 
reporter) {
 
                // Initialize some fields
                this.vertex = vertex;
@@ -75,6 +85,8 @@ public class OutputHandler<OUT> {
 
                this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
 
+               this.reporter = reporter;
+
                // We iterate through all the out edges from this job vertex 
and create
                // a stream output
                for (StreamEdge outEdge : outEdgesInOrder) {
@@ -82,13 +94,14 @@ public class OutputHandler<OUT> {
                                        outEdge,
                                        outEdge.getTargetId(),
                                        
chainedConfigs.get(outEdge.getSourceId()),
-                                       outEdgesInOrder.indexOf(outEdge));
+                                       outEdgesInOrder.indexOf(outEdge),
+                                       reporter);
                        outputMap.put(outEdge, streamOutput);
                }
 
                // We create the outer output that will be passed to the first 
task
                // in the chain
-               this.outerOutput = createChainedCollector(configuration);
+               this.outerOutput = createChainedCollector(configuration, 
accumulatorMap);
                
                // Add the head operator to the end of the list
                this.chainedOperators.add(vertex.streamOperator);
@@ -121,7 +134,8 @@ public class OutputHandler<OUT> {
         * config
         */
        @SuppressWarnings({"unchecked", "rawtypes"})
-       private <X> Output<X> createChainedCollector(StreamConfig 
chainedTaskConfig) {
+       private <X> Output<X> createChainedCollector(StreamConfig 
chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
+               Preconditions.checkNotNull(accumulatorMap);
 
 
                // We create a wrapper that will encapsulate the chained 
operators and
@@ -141,7 +155,7 @@ public class OutputHandler<OUT> {
                for (StreamEdge outputEdge : 
chainedTaskConfig.getChainedOutputs(cl)) {
                        Integer output = outputEdge.getTargetId();
 
-                       Collector<?> outCollector = 
createChainedCollector(chainedConfigs.get(output));
+                       Collector<?> outCollector = 
createChainedCollector(chainedConfigs.get(output), accumulatorMap);
 
                        wrapper.addCollector(outCollector, outputEdge);
                }
@@ -155,8 +169,8 @@ public class OutputHandler<OUT> {
                        // operator which will be returned and set it up using 
the wrapper
                        OneInputStreamOperator chainableOperator =
                                        
chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader());
-                       
-                       StreamingRuntimeContext chainedContext = 
vertex.createRuntimeContext(chainedTaskConfig);
+
+                       StreamingRuntimeContext chainedContext = 
vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
                        vertex.contexts.add(chainedContext);
                        
                        chainableOperator.setup(wrapper, chainedContext);
@@ -188,7 +202,7 @@ public class OutputHandler<OUT> {
         * @return The created StreamOutput
         */
        private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer 
outputVertex,
-                       StreamConfig upStreamConfig, int outputIndex) {
+                       StreamConfig upStreamConfig, int outputIndex, 
AccumulatorRegistry.Reporter reporter) {
 
                StreamRecordSerializer<T> outSerializer = upStreamConfig
                                .getTypeSerializerOut1(vertex.userClassLoader);
@@ -207,6 +221,8 @@ public class OutputHandler<OUT> {
                RecordWriter<SerializationDelegate<StreamRecord<T>>> output =
                                
RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, 
upStreamConfig.getBufferTimeout());
 
+               output.setReporter(reporter);
+
                StreamOutput<T> streamOutput = new StreamOutput<T>(output, 
outSerializationDelegate);
 
                if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 25fe83d..e5d58d3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -18,10 +18,13 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,7 +49,11 @@ public class StreamIterationHead<OUT> extends 
OneInputStreamTask<OUT, OUT> {
        @Override
        public void registerInputOutput() {
                super.registerInputOutput();
-               outputHandler = new OutputHandler<OUT>(this);
+
+               final AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
+               Map<String, Accumulator<?, ?>> accumulatorMap = 
registry.getUserMap();
+
+               outputHandler = new OutputHandler<OUT>(this, accumulatorMap, 
outputHandler.reporter);
 
                String iterationId = configuration.getIterationId();
                iterationWaitTime = configuration.getIterationWaitTime();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index f98ed2d..4ffc8f5 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,10 +25,12 @@ import java.util.Map;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -87,13 +89,19 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
 
                streamOperator = 
configuration.getStreamOperator(userClassLoader);
 
-               outputHandler = new OutputHandler<OUT>(this);
+               // Create and register Accumulators
+               Environment env = getEnvironment();
+               AccumulatorRegistry accumulatorRegistry = 
env.getAccumulatorRegistry();
+               Map<String, Accumulator<?, ?>> accumulatorMap = 
accumulatorRegistry.getUserMap();
+               AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
+
+               outputHandler = new OutputHandler<OUT>(this, accumulatorMap, 
reporter);
 
                if (streamOperator != null) {
                        // IterationHead and IterationTail don't have an 
Operator...
 
                        //Create context of the head operator
-                       headContext = createRuntimeContext(configuration);
+                       headContext = createRuntimeContext(configuration, 
accumulatorMap);
                        this.contexts.add(headContext);
                        streamOperator.setup(outputHandler.getOutput(), 
headContext);
                }
@@ -105,14 +113,14 @@ public abstract class StreamTask<OUT, O extends 
StreamOperator<OUT>> extends Abs
                return getEnvironment().getTaskName();
        }
 
-       public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) {
+       public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, 
Map<String, Accumulator<?,?>> accumulatorMap) {
                Environment env = getEnvironment();
                String operatorName = 
conf.getStreamOperator(userClassLoader).getClass().getSimpleName();
 
                KeySelector<?,Serializable> statePartitioner = 
conf.getStatePartitioner(userClassLoader);
 
                return new StreamingRuntimeContext(operatorName, env, 
getUserCodeClassLoader(),
-                               getExecutionConfig(), statePartitioner, 
getStateHandleProvider());
+                               getExecutionConfig(), statePartitioner, 
getStateHandleProvider(), accumulatorMap);
        }
 
        private StateHandleProvider<Serializable> getStateHandleProvider() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 3efd619..7eff16e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.state.OperatorState;
@@ -55,9 +56,9 @@ public class StreamingRuntimeContext extends 
RuntimeUDFContext {
        @SuppressWarnings("unchecked")
        public StreamingRuntimeContext(String name, Environment env, 
ClassLoader userCodeClassLoader,
                        ExecutionConfig executionConfig, KeySelector<?, ?> 
statePartitioner,
-                       StateHandleProvider<?> provider) {
+                       StateHandleProvider<?> provider, Map<String, 
Accumulator<?, ?>> accumulatorMap) {
                super(name, env.getNumberOfSubtasks(), 
env.getIndexInSubtaskGroup(), userCodeClassLoader,
-                               executionConfig, 
env.getDistributedCacheEntries());
+                               executionConfig, 
env.getDistributedCacheEntries(), accumulatorMap);
                this.env = env;
                this.statePartitioner = statePartitioner;
                this.states = new HashMap<String, StreamOperatorState>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 8c7ffeb..eb49e26 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -25,11 +25,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -138,7 +140,7 @@ public class StatefulOperatorTest {
 
                StreamingRuntimeContext context = new 
StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024,
                                new MockInputSplitProvider(), 1024), null, new 
ExecutionConfig(), partitioner,
-                               new LocalStateHandleProvider<Serializable>());
+                               new LocalStateHandleProvider<Serializable>(), 
new HashMap<String, Accumulator<?, ?>>());
 
                StreamMap<Integer, String> op = new StreamMap<Integer, 
String>(new StatefulMapper());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 0afe8b5..89ec7dc 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -24,7 +24,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
@@ -198,6 +200,10 @@ public class BarrierBufferTest {
                        super(inputGate);
                }
 
+               @Override
+               public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+               }
        }
 
        protected static BufferOrEvent createSuperstep(long id, int channel) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 9864115..2092d83 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -82,6 +83,8 @@ public class StreamMockEnvironment implements Environment {
 
        private final BroadcastVariableManager bcVarManager = new 
BroadcastVariableManager();
 
+       private final AccumulatorRegistry accumulatorRegistry;
+
        private final int bufferSize;
 
        public StreamMockEnvironment(long memorySize, MockInputSplitProvider 
inputSplitProvider, int bufferSize) {
@@ -94,6 +97,8 @@ public class StreamMockEnvironment implements Environment {
                this.ioManager = new IOManagerAsync();
                this.inputSplitProvider = inputSplitProvider;
                this.bufferSize = bufferSize;
+
+               this.accumulatorRegistry = new AccumulatorRegistry(jobID, 
getExecutionId());
        }
 
        public IteratorWrappingTestSingleInputGate<Record> 
addInput(MutableObjectIterator<Record> inputIterator) {
@@ -262,8 +267,8 @@ public class StreamMockEnvironment implements Environment {
        }
 
        @Override
-       public void reportAccumulators(Map<String, Accumulator<?, ?>> 
accumulators) {
-               // discard, this is only for testing
+       public AccumulatorRegistry getAccumulatorRegistry() {
+               return accumulatorRegistry;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 344fc7d..0467b5f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -157,7 +159,7 @@ public class MockCoContext<IN1, IN2, OUT> {
                MockCoContext<IN1, IN2, OUT> mockContext = new 
MockCoContext<IN1, IN2, OUT>(input1, input2);
                StreamingRuntimeContext runtimeContext = new 
StreamingRuntimeContext("CoMockTask",
                                new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024), null,
-                               new ExecutionConfig(), null, null);
+                               new ExecutionConfig(), null, null, new 
HashMap<String, Accumulator<?, ?>>());
 
                operator.setup(mockContext.collector, runtimeContext);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index fc5079a..0d09c14 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -105,7 +107,7 @@ public class MockContext<IN, OUT> {
                MockContext<IN, OUT> mockContext = new MockContext<IN, 
OUT>(inputs);
                StreamingRuntimeContext runtimeContext = new 
StreamingRuntimeContext("MockTask",
                                new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024), null,
-                               new ExecutionConfig(), null, null);
+                               new ExecutionConfig(), null, null, new 
HashMap<String, Accumulator<?, ?>>());
 
                operator.setup(mockContext.output, runtimeContext);
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index dafd9a3..764fe5f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -19,9 +19,11 @@ package org.apache.flink.streaming.util;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
@@ -38,7 +40,7 @@ public class SourceFunctionUtil<T> {
                List<T> outputs = new ArrayList<T>();
                if (sourceFunction instanceof RichFunction) {
                        RuntimeContext runtimeContext =  new 
StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new 
MockInputSplitProvider(), 1024), null,
-                                       new ExecutionConfig(), null, new 
LocalStateHandleProvider<Serializable>());
+                                       new ExecutionConfig(), null, new 
LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, 
?>>());
                        ((RichFunction) 
sourceFunction).setRuntimeContext(runtimeContext);
 
                        ((RichFunction) sourceFunction).open(new 
Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
index 6b39734..5bfa49b 100644
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
+++ 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.tez.runtime;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.operators.PactDriver;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
@@ -68,7 +69,8 @@ public class RegularProcessor<S extends Function, OT> extends 
AbstractLogicalIOP
                                getContext().getVertexParallelism(),
                                getContext().getTaskIndex(),
                                getClass().getClassLoader(),
-                               new ExecutionConfig());
+                               new ExecutionConfig(),
+                               new HashMap<String, Accumulator<?, ?>>());
 
                this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, 
this.getContext().getTotalMemoryAvailableToTask());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 1b39dbd..d23469e 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -70,7 +70,7 @@ public abstract class RecordAPITestBase extends 
AbstractTestBase {
                Plan p = getTestJob();
                p.setExecutionConfig(new ExecutionConfig());
                if (p == null) {
-                       Assert.fail("Error: Cannot obtain Pact plan. Did the 
thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
+                       Assert.fail("Error: Cannot obtain Pact plan. Did the 
test forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
                }
                
                Optimizer pc = new Optimizer(new DataStatistics(), this.config);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 0534178..509b86f 100644
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -85,7 +85,6 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration,
       scheduler,
       libraryCacheManager,
       _,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -106,7 +105,6 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration,
         scheduler,
         libraryCacheManager,
         archive,
-        accumulatorManager,
         executionRetries,
         delayBetweenRetries,
         timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index 11d6b83..d7d45fe 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -127,8 +127,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
                        // Add built-in accumulator without convenience function
                        
getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter);
 
-                       // Add custom counter. Didn't find a way to do this with
-                       // getAccumulator()
+                       // Add custom counter
                        this.distinctWords = new SetAccumulator<StringValue>();
                        
this.getRuntimeContext().addAccumulator("distinct-words", distinctWords);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
index 78bbe68..6221c08 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java
@@ -72,7 +72,7 @@ public class AccumulatorIterativeITCase extends 
RecordAPITestBase {
        protected void postSubmit() throws Exception {
                compareResultsByLinesInMemory(EXPECTED, resultPath);
                
-               Integer res = (Integer) 
getJobExecutionResult().getAccumulatorResult("test");
+               Integer res = 
getJobExecutionResult().getAccumulatorResult("test");
                Assert.assertEquals(Integer.valueOf(NUM_ITERATIONS * 6), res);
        }
 
@@ -117,18 +117,12 @@ public class AccumulatorIterativeITCase extends 
RecordAPITestBase {
                private static final long serialVersionUID = 1L;
                
                private IntCounter testCounter = new IntCounter();
-               
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-                       getRuntimeContext().addAccumulator("test", 
this.testCounter);
-               }
-               
+
                @Override
                public void reduce(Iterator<Record> records, Collector<Record> 
out) {
                        // Compute the sum
                        int sum = 0;
-                       
+
                        while (records.hasNext()) {
                                Record r = records.next();
                                Integer value = Integer.parseInt(r.getField(0, 
StringValue.class).getValue());
@@ -137,6 +131,12 @@ public class AccumulatorIterativeITCase extends 
RecordAPITestBase {
                        }
                        out.collect(new Record(new 
StringValue(Integer.toString(sum))));
                }
+
+               @Override
+               public void close() throws Exception {
+                       super.close();
+                       getRuntimeContext().addAccumulator("test", 
this.testCounter);
+               }
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
new file mode 100644
index 0000000..84c50a9
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.*;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Test the availability of accumulator results during runtime.
+ */
+@SuppressWarnings("serial")
+public class AccumulatorLiveITCase {
+
+       private static ActorSystem system;
+       private static ActorRef jobManager;
+
+       // name of accumulator
+       private static String NAME = "test";
+       // time to wait between changing the accumulator value
+       private static long WAIT_TIME = 
TaskManager.HEARTBEAT_INTERVAL().toMillis() + 500;
+
+       // number of heartbeat intervals to check
+       private static int NUM_ITERATIONS = 3;
+       // numer of retries in case the expected value is not seen
+       private static int NUM_RETRIES = 10;
+
+       private static List<String> inputData = new 
ArrayList<String>(NUM_ITERATIONS);
+
+
+       @Before
+       public void before() throws Exception {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+               TestingCluster testingCluster = 
TestingUtils.startTestingCluster(1, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+               jobManager = testingCluster.getJobManager();
+
+               // generate test data
+               for (int i=0; i < NUM_ITERATIONS; i++) {
+                       inputData.add(i, String.valueOf(i+1));
+               }
+       }
+
+       @After
+       public void after() throws Exception {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
+       @Test
+       public void testProgram() throws Exception {
+
+               new JavaTestKit(system) {{
+
+                       /** The program **/
+                       ExecutionEnvironment env = new PlanExtractor();
+                       DataSet<String> input = env.fromCollection(inputData);
+                       input
+                                       .flatMap(new Tokenizer())
+                                       .flatMap(new WaitingUDF())
+                                       .output(new WaitingOutputFormat());
+                       env.execute();
+
+                       /** Extract job graph **/
+                       JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) 
env).plan);
+
+                       jobManager.tell(new 
JobManagerMessages.SubmitJob(jobGraph, false), getRef());
+                       expectMsgClass(Status.Success.class);
+
+                       /* Check for accumulator values */
+                       int i = 0, retries = 0;
+                       int expectedAccVal = 0;
+                       while(i <= NUM_ITERATIONS) {
+                               if (retries > 0) {
+                                       // retry fast
+                                       Thread.sleep(WAIT_TIME / NUM_RETRIES);
+                               } else {
+                                       // wait for heartbeat interval
+                                       Thread.sleep(WAIT_TIME);
+                               }
+
+                               jobManager.tell(new 
RequestAccumulatorValues(jobGraph.getJobID()), getRef());
+                               RequestAccumulatorValuesResponse response =
+                                               
expectMsgClass(RequestAccumulatorValuesResponse.class);
+
+                               Map<String, Accumulator<?, ?>> userAccumulators 
= response.userAccumulators();
+                               Map<ExecutionAttemptID, 
Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> flinkAccumulators =
+                                               response.flinkAccumulators();
+
+                               if (checkUserAccumulators(expectedAccVal, 
userAccumulators) && checkFlinkAccumulators(i == NUM_ITERATIONS, i, i * 4, 
flinkAccumulators)) {
+//                                     System.out.println("Passed round " + i);
+                                       // We passed this round
+                                       i += 1;
+                                       expectedAccVal += i;
+                                       retries = 0;
+                               } else {
+                                       if (retries < NUM_RETRIES) {
+//                                             System.out.println("retrying 
for the " + retries + " time.");
+                                               // try again
+                                               retries += 1;
+                                       } else {
+                                               fail("Failed in round #" + i + 
" after " + retries + " retries.");
+                                       }
+                               }
+                       }
+
+//                     expectMsgClass(new FiniteDuration(10, 
TimeUnit.SECONDS), JobManagerMessages.JobResultSuccess.class);
+               }};
+       }
+
+       private static boolean checkUserAccumulators(int expected, Map<String, 
Accumulator<?,?>> accumulatorMap) {
+//             System.out.println("checking user accumulators");
+               return accumulatorMap.containsKey(NAME) && expected == 
((IntCounter)accumulatorMap.get(NAME)).getLocalValue();
+       }
+
+       private static boolean checkFlinkAccumulators(boolean lastRound, int 
expectedRecords, int expectedBytes,
+                                                                               
                  Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?,?>>> accumulatorMap) {
+//             System.out.println("checking flink accumulators");
+               boolean returnValue = false;
+
+               for(Map<AccumulatorRegistry.Metric, Accumulator<?,?>> taskMap : 
accumulatorMap.values()) {
+                       if (taskMap != null) {
+                               for (Map.Entry<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> entry : taskMap.entrySet()) {
+                                       switch (entry.getKey()) {
+                                               /**
+                                                * The following two cases are 
for the DataSource and Map task
+                                                */
+                                               case NUM_RECORDS_OUT:
+                                                       if (!lastRound) {
+                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
+                                                               returnValue = 
true;
+                                                       }
+                                                       break;
+                                               case NUM_BYTES_OUT:
+                                                       if (!lastRound) {
+                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
+                                                               returnValue = 
true;
+                                                       }
+                                                       break;
+                                               /**
+                                                * The following two cases are 
for the DataSink task
+                                                */
+                                               case NUM_RECORDS_IN:
+                                                       // check if we are in 
last round and in current task accumulator map
+                                                       if (lastRound && 
((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue()
 == 0) {
+                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
+                                                               returnValue = 
true;
+                                                       }
+                                                       break;
+                                               case NUM_BYTES_IN:
+                                                       if (lastRound && 
((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue()
 == 0) {
+                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
+                                                               returnValue = 
true;
+                                                       }
+                                                       break;
+                                               default:
+                                                       fail("Unknown 
accumulator found.");
+                                       }
+                               }
+                       }
+               }
+               return returnValue;
+       }
+
+
+       public static class Tokenizer implements FlatMapFunction<String, 
String> {
+
+               @Override
+               public void flatMap(String value, Collector<String> out) throws 
Exception {
+                       for (String str : value.split("\n")) {
+                               out.collect(str);
+                       }
+               }
+       }
+
+       /**
+        * UDF that waits for at least the heartbeat interval's duration.
+        */
+       private static class WaitingUDF extends RichFlatMapFunction<String, 
Integer> {
+
+               private IntCounter counter = new IntCounter();
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       getRuntimeContext().addAccumulator(NAME, counter);
+               }
+
+               @Override
+               public void flatMap(String value, Collector<Integer> out) 
throws Exception {
+                       /* Wait here to check the accumulator value in the 
meantime */
+                       Thread.sleep(WAIT_TIME);
+                       int val = Integer.valueOf(value);
+                       counter.add(val);
+                       out.collect(val);
+               }
+       }
+
+       private static class WaitingOutputFormat implements 
OutputFormat<Integer> {
+
+               @Override
+               public void configure(Configuration parameters) {
+
+               }
+
+               @Override
+               public void open(int taskNumber, int numTasks) throws 
IOException {
+
+               }
+
+               @Override
+               public void writeRecord(Integer record) throws IOException {
+               }
+
+               @Override
+               public void close() throws IOException {
+                       try {
+//                             System.out.println("starting output task");
+                               Thread.sleep(WAIT_TIME);
+                       } catch (InterruptedException e) {
+                               fail("Interrupted test.");
+                       }
+               }
+       }
+
+       /**
+        * Helpers to generate the JobGraph
+        */
+       private static JobGraph getOptimizedPlan(Plan plan) {
+               Optimizer pc = new Optimizer(new DataStatistics(), new 
Configuration());
+               JobGraphGenerator jgg = new JobGraphGenerator();
+               OptimizedPlan op = pc.compile(plan);
+               return jgg.compileJobGraph(op);
+       }
+
+       private static class PlanExtractor extends LocalEnvironment {
+
+               private Plan plan = null;
+
+               @Override
+               public JobExecutionResult execute(String jobName) throws 
Exception {
+                       plan = createProgramPlan();
+                       return new JobExecutionResult(new JobID(), -1, null);
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 8cfeead..c497a90 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -240,7 +240,6 @@ object ApplicationMaster {
       scheduler,
       libraryCacheManager,
       archiveProps,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -257,7 +256,6 @@ object ApplicationMaster {
         scheduler,
         libraryCacheManager,
         archiver,
-        accumulatorManager,
         executionRetries,
         delayBetweenRetries,
         timeout,

Reply via email to