http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java index cfd27f7..a7139b6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java @@ -22,6 +22,7 @@ import java.util.LinkedList; import java.util.concurrent.LinkedBlockingDeque; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader; import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; @@ -256,6 +257,16 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable } } + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + for (AdaptiveSpanningRecordDeserializer serializer : reader1RecordDeserializers) { + serializer.setReporter(reporter); + } + for (AdaptiveSpanningRecordDeserializer serializer : reader2RecordDeserializers) { + serializer.setReporter(reporter); + } + } + private class CoBarrierBuffer extends BarrierBuffer { private CoBarrierBuffer otherBuffer;
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java index 491dc06..44f9a86 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io; import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.task.AbstractEvent; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; import org.apache.flink.runtime.io.network.api.reader.ReaderBase; @@ -57,6 +58,7 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable private final BarrierBuffer barrierBuffer; + @SuppressWarnings("unchecked") protected StreamingAbstractRecordReader(InputGate inputGate) { super(inputGate); @@ -80,7 +82,8 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable DeserializationResult result = currentRecordDeserializer.getNextRecord(target); if (result.isBufferConsumed()) { - currentRecordDeserializer.getCurrentBuffer().recycle(); + Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer(); + currentBuffer.recycle(); currentRecordDeserializer = null; } @@ -130,4 +133,12 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable public void cleanup() throws IOException { barrierBuffer.cleanup(); } + + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + deserializer.setReporter(reporter); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java index ad74004..1356af5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java @@ -40,4 +40,5 @@ public class StreamingMutableRecordReader<T extends IOReadableWritable> extends public void clearBuffers() { super.clearBuffers(); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 80239fd..a9ebf5b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import java.io.IOException; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -52,6 +53,11 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO InputGate inputGate = InputGateFactory.createInputGate(getEnvironment().getAllInputGates()); inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate); + AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); + AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); + + inputs.setReporter(reporter); + inputs.registerTaskEventListener(getSuperstepListener(), StreamingSuperstep.class); recordIterator = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inSerializer); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java index 73f0a89..41ee388 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java @@ -24,7 +24,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; @@ -59,7 +62,14 @@ public class OutputHandler<OUT> { private Map<Integer, StreamConfig> chainedConfigs; private List<StreamEdge> outEdgesInOrder; - public OutputHandler(StreamTask<OUT, ?> vertex) { + /** + * Counters for the number of records emitted and bytes written. + */ + protected AccumulatorRegistry.Reporter reporter; + + + public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap, + AccumulatorRegistry.Reporter reporter) { // Initialize some fields this.vertex = vertex; @@ -75,6 +85,8 @@ public class OutputHandler<OUT> { this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl); + this.reporter = reporter; + // We iterate through all the out edges from this job vertex and create // a stream output for (StreamEdge outEdge : outEdgesInOrder) { @@ -82,13 +94,14 @@ public class OutputHandler<OUT> { outEdge, outEdge.getTargetId(), chainedConfigs.get(outEdge.getSourceId()), - outEdgesInOrder.indexOf(outEdge)); + outEdgesInOrder.indexOf(outEdge), + reporter); outputMap.put(outEdge, streamOutput); } // We create the outer output that will be passed to the first task // in the chain - this.outerOutput = createChainedCollector(configuration); + this.outerOutput = createChainedCollector(configuration, accumulatorMap); // Add the head operator to the end of the list this.chainedOperators.add(vertex.streamOperator); @@ -121,7 +134,8 @@ public class OutputHandler<OUT> { * config */ @SuppressWarnings({"unchecked", "rawtypes"}) - private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig) { + private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) { + Preconditions.checkNotNull(accumulatorMap); // We create a wrapper that will encapsulate the chained operators and @@ -141,7 +155,7 @@ public class OutputHandler<OUT> { for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) { Integer output = outputEdge.getTargetId(); - Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output)); + Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output), accumulatorMap); wrapper.addCollector(outCollector, outputEdge); } @@ -155,8 +169,8 @@ public class OutputHandler<OUT> { // operator which will be returned and set it up using the wrapper OneInputStreamOperator chainableOperator = chainedTaskConfig.getStreamOperator(vertex.getUserCodeClassLoader()); - - StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig); + + StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap); vertex.contexts.add(chainedContext); chainableOperator.setup(wrapper, chainedContext); @@ -188,7 +202,7 @@ public class OutputHandler<OUT> { * @return The created StreamOutput */ private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex, - StreamConfig upStreamConfig, int outputIndex) { + StreamConfig upStreamConfig, int outputIndex, AccumulatorRegistry.Reporter reporter) { StreamRecordSerializer<T> outSerializer = upStreamConfig .getTypeSerializerOut1(vertex.userClassLoader); @@ -207,6 +221,8 @@ public class OutputHandler<OUT> { RecordWriter<SerializationDelegate<StreamRecord<T>>> output = RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout()); + output.setReporter(reporter); + StreamOutput<T> streamOutput = new StreamOutput<T>(output, outSerializationDelegate); if (LOG.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index 25fe83d..e5d58d3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -18,10 +18,13 @@ package org.apache.flink.streaming.runtime.tasks; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.streaming.api.collector.StreamOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -46,7 +49,11 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> { @Override public void registerInputOutput() { super.registerInputOutput(); - outputHandler = new OutputHandler<OUT>(this); + + final AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); + Map<String, Accumulator<?, ?>> accumulatorMap = registry.getUserMap(); + + outputHandler = new OutputHandler<OUT>(this, accumulatorMap, outputHandler.reporter); String iterationId = configuration.getIterationId(); iterationWaitTime = configuration.getIterationWaitTime(); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index f98ed2d..4ffc8f5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -25,10 +25,12 @@ import java.util.Map; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.NotNullPredicate; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -87,13 +89,19 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs streamOperator = configuration.getStreamOperator(userClassLoader); - outputHandler = new OutputHandler<OUT>(this); + // Create and register Accumulators + Environment env = getEnvironment(); + AccumulatorRegistry accumulatorRegistry = env.getAccumulatorRegistry(); + Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap(); + AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); + + outputHandler = new OutputHandler<OUT>(this, accumulatorMap, reporter); if (streamOperator != null) { // IterationHead and IterationTail don't have an Operator... //Create context of the head operator - headContext = createRuntimeContext(configuration); + headContext = createRuntimeContext(configuration, accumulatorMap); this.contexts.add(headContext); streamOperator.setup(outputHandler.getOutput(), headContext); } @@ -105,14 +113,14 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs return getEnvironment().getTaskName(); } - public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) { + public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map<String, Accumulator<?,?>> accumulatorMap) { Environment env = getEnvironment(); String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName(); KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader); return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(), - getExecutionConfig(), statePartitioner, getStateHandleProvider()); + getExecutionConfig(), statePartitioner, getStateHandleProvider(), accumulatorMap); } private StateHandleProvider<Serializable> getStateHandleProvider() { http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java index 3efd619..7eff16e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.state.OperatorState; @@ -55,9 +56,9 @@ public class StreamingRuntimeContext extends RuntimeUDFContext { @SuppressWarnings("unchecked") public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, KeySelector<?, ?> statePartitioner, - StateHandleProvider<?> provider) { + StateHandleProvider<?> provider, Map<String, Accumulator<?, ?>> accumulatorMap) { super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, - executionConfig, env.getDistributedCacheEntries()); + executionConfig, env.getDistributedCacheEntries(), accumulatorMap); this.env = env; this.statePartitioner = statePartitioner; this.states = new HashMap<String, StreamOperatorState>(); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index 8c7ffeb..eb49e26 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -25,11 +25,13 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.java.functions.KeySelector; @@ -138,7 +140,7 @@ public class StatefulOperatorTest { StreamingRuntimeContext context = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, new ExecutionConfig(), partitioner, - new LocalStateHandleProvider<Serializable>()); + new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>()); StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper()); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java index 0afe8b5..89ec7dc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java @@ -24,7 +24,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; +import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; @@ -198,6 +200,10 @@ public class BarrierBufferTest { super(inputGate); } + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + + } } protected static BufferOrEvent createSuperstep(long id, int channel) { http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 9864115..2092d83 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -82,6 +83,8 @@ public class StreamMockEnvironment implements Environment { private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager(); + private final AccumulatorRegistry accumulatorRegistry; + private final int bufferSize; public StreamMockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { @@ -94,6 +97,8 @@ public class StreamMockEnvironment implements Environment { this.ioManager = new IOManagerAsync(); this.inputSplitProvider = inputSplitProvider; this.bufferSize = bufferSize; + + this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId()); } public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) { @@ -262,8 +267,8 @@ public class StreamMockEnvironment implements Environment { } @Override - public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) { - // discard, this is only for testing + public AccumulatorRegistry getAccumulatorRegistry() { + return accumulatorRegistry; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java index 344fc7d..0467b5f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java @@ -20,10 +20,12 @@ package org.apache.flink.streaming.util; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -157,7 +159,7 @@ public class MockCoContext<IN1, IN2, OUT> { MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2); StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("CoMockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig(), null, null); + new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>()); operator.setup(mockContext.collector, runtimeContext); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index fc5079a..0d09c14 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -20,10 +20,12 @@ package org.apache.flink.streaming.util; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -105,7 +107,7 @@ public class MockContext<IN, OUT> { MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs); StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig(), null, null); + new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>()); operator.setup(mockContext.output, runtimeContext); try { http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java index dafd9a3..764fe5f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java @@ -19,9 +19,11 @@ package org.apache.flink.streaming.util; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; @@ -38,7 +40,7 @@ public class SourceFunctionUtil<T> { List<T> outputs = new ArrayList<T>(); if (sourceFunction instanceof RichFunction) { RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig(), null, new LocalStateHandleProvider<Serializable>()); + new ExecutionConfig(), null, new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>()); ((RichFunction) sourceFunction).setRuntimeContext(runtimeContext); ((RichFunction) sourceFunction).open(new Configuration()); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java index 6b39734..5bfa49b 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java @@ -19,6 +19,7 @@ package org.apache.flink.tez.runtime; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.Function; import org.apache.flink.runtime.operators.PactDriver; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; @@ -68,7 +69,8 @@ public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOP getContext().getVertexParallelism(), getContext().getTaskIndex(), getClass().getClassLoader(), - new ExecutionConfig()); + new ExecutionConfig(), + new HashMap<String, Accumulator<?, ?>>()); this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask()); } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java index 1b39dbd..d23469e 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java @@ -70,7 +70,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase { Plan p = getTestJob(); p.setExecutionConfig(new ExecutionConfig()); if (p == null) { - Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?"); + Assert.fail("Error: Cannot obtain Pact plan. Did the test forget to override either 'getPactPlan()' or 'getJobGraph()' ?"); } Optimizer pc = new Optimizer(new DataStatistics(), this.config); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 0534178..509b86f 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -85,7 +85,6 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, scheduler, libraryCacheManager, _, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, @@ -106,7 +105,6 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, scheduler, libraryCacheManager, archive, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java index 11d6b83..d7d45fe 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java @@ -127,8 +127,7 @@ public class AccumulatorITCase extends JavaProgramTestBase { // Add built-in accumulator without convenience function getRuntimeContext().addAccumulator("open-close-counter", this.openCloseCounter); - // Add custom counter. Didn't find a way to do this with - // getAccumulator() + // Add custom counter this.distinctWords = new SetAccumulator<StringValue>(); this.getRuntimeContext().addAccumulator("distinct-words", distinctWords); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java index 78bbe68..6221c08 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase.java @@ -72,7 +72,7 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase { protected void postSubmit() throws Exception { compareResultsByLinesInMemory(EXPECTED, resultPath); - Integer res = (Integer) getJobExecutionResult().getAccumulatorResult("test"); + Integer res = getJobExecutionResult().getAccumulatorResult("test"); Assert.assertEquals(Integer.valueOf(NUM_ITERATIONS * 6), res); } @@ -117,18 +117,12 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase { private static final long serialVersionUID = 1L; private IntCounter testCounter = new IntCounter(); - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - getRuntimeContext().addAccumulator("test", this.testCounter); - } - + @Override public void reduce(Iterator<Record> records, Collector<Record> out) { // Compute the sum int sum = 0; - + while (records.hasNext()) { Record r = records.next(); Integer value = Integer.parseInt(r.getField(0, StringValue.class).getValue()); @@ -137,6 +131,12 @@ public class AccumulatorIterativeITCase extends RecordAPITestBase { } out.collect(new Record(new StringValue(Integer.toString(sum)))); } + + @Override + public void close() throws Exception { + super.close(); + getRuntimeContext().addAccumulator("test", this.testCounter); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java new file mode 100644 index 0000000..84c50a9 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.accumulators; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Status; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.LocalEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.*; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.Collector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + + +/** + * Test the availability of accumulator results during runtime. + */ +@SuppressWarnings("serial") +public class AccumulatorLiveITCase { + + private static ActorSystem system; + private static ActorRef jobManager; + + // name of accumulator + private static String NAME = "test"; + // time to wait between changing the accumulator value + private static long WAIT_TIME = TaskManager.HEARTBEAT_INTERVAL().toMillis() + 500; + + // number of heartbeat intervals to check + private static int NUM_ITERATIONS = 3; + // numer of retries in case the expected value is not seen + private static int NUM_RETRIES = 10; + + private static List<String> inputData = new ArrayList<String>(NUM_ITERATIONS); + + + @Before + public void before() throws Exception { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + TestingCluster testingCluster = TestingUtils.startTestingCluster(1, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + jobManager = testingCluster.getJobManager(); + + // generate test data + for (int i=0; i < NUM_ITERATIONS; i++) { + inputData.add(i, String.valueOf(i+1)); + } + } + + @After + public void after() throws Exception { + JavaTestKit.shutdownActorSystem(system); + } + + @Test + public void testProgram() throws Exception { + + new JavaTestKit(system) {{ + + /** The program **/ + ExecutionEnvironment env = new PlanExtractor(); + DataSet<String> input = env.fromCollection(inputData); + input + .flatMap(new Tokenizer()) + .flatMap(new WaitingUDF()) + .output(new WaitingOutputFormat()); + env.execute(); + + /** Extract job graph **/ + JobGraph jobGraph = getOptimizedPlan(((PlanExtractor) env).plan); + + jobManager.tell(new JobManagerMessages.SubmitJob(jobGraph, false), getRef()); + expectMsgClass(Status.Success.class); + + /* Check for accumulator values */ + int i = 0, retries = 0; + int expectedAccVal = 0; + while(i <= NUM_ITERATIONS) { + if (retries > 0) { + // retry fast + Thread.sleep(WAIT_TIME / NUM_RETRIES); + } else { + // wait for heartbeat interval + Thread.sleep(WAIT_TIME); + } + + jobManager.tell(new RequestAccumulatorValues(jobGraph.getJobID()), getRef()); + RequestAccumulatorValuesResponse response = + expectMsgClass(RequestAccumulatorValuesResponse.class); + + Map<String, Accumulator<?, ?>> userAccumulators = response.userAccumulators(); + Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> flinkAccumulators = + response.flinkAccumulators(); + + if (checkUserAccumulators(expectedAccVal, userAccumulators) && checkFlinkAccumulators(i == NUM_ITERATIONS, i, i * 4, flinkAccumulators)) { +// System.out.println("Passed round " + i); + // We passed this round + i += 1; + expectedAccVal += i; + retries = 0; + } else { + if (retries < NUM_RETRIES) { +// System.out.println("retrying for the " + retries + " time."); + // try again + retries += 1; + } else { + fail("Failed in round #" + i + " after " + retries + " retries."); + } + } + } + +// expectMsgClass(new FiniteDuration(10, TimeUnit.SECONDS), JobManagerMessages.JobResultSuccess.class); + }}; + } + + private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?,?>> accumulatorMap) { +// System.out.println("checking user accumulators"); + return accumulatorMap.containsKey(NAME) && expected == ((IntCounter)accumulatorMap.get(NAME)).getLocalValue(); + } + + private static boolean checkFlinkAccumulators(boolean lastRound, int expectedRecords, int expectedBytes, + Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> accumulatorMap) { +// System.out.println("checking flink accumulators"); + boolean returnValue = false; + + for(Map<AccumulatorRegistry.Metric, Accumulator<?,?>> taskMap : accumulatorMap.values()) { + if (taskMap != null) { + for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> entry : taskMap.entrySet()) { + switch (entry.getKey()) { + /** + * The following two cases are for the DataSource and Map task + */ + case NUM_RECORDS_OUT: + if (!lastRound) { + assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords); + returnValue = true; + } + break; + case NUM_BYTES_OUT: + if (!lastRound) { + assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes); + returnValue = true; + } + break; + /** + * The following two cases are for the DataSink task + */ + case NUM_RECORDS_IN: + // check if we are in last round and in current task accumulator map + if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) { + assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords); + returnValue = true; + } + break; + case NUM_BYTES_IN: + if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) { + assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes); + returnValue = true; + } + break; + default: + fail("Unknown accumulator found."); + } + } + } + } + return returnValue; + } + + + public static class Tokenizer implements FlatMapFunction<String, String> { + + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + for (String str : value.split("\n")) { + out.collect(str); + } + } + } + + /** + * UDF that waits for at least the heartbeat interval's duration. + */ + private static class WaitingUDF extends RichFlatMapFunction<String, Integer> { + + private IntCounter counter = new IntCounter(); + + @Override + public void open(Configuration parameters) throws Exception { + getRuntimeContext().addAccumulator(NAME, counter); + } + + @Override + public void flatMap(String value, Collector<Integer> out) throws Exception { + /* Wait here to check the accumulator value in the meantime */ + Thread.sleep(WAIT_TIME); + int val = Integer.valueOf(value); + counter.add(val); + out.collect(val); + } + } + + private static class WaitingOutputFormat implements OutputFormat<Integer> { + + @Override + public void configure(Configuration parameters) { + + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + + } + + @Override + public void writeRecord(Integer record) throws IOException { + } + + @Override + public void close() throws IOException { + try { +// System.out.println("starting output task"); + Thread.sleep(WAIT_TIME); + } catch (InterruptedException e) { + fail("Interrupted test."); + } + } + } + + /** + * Helpers to generate the JobGraph + */ + private static JobGraph getOptimizedPlan(Plan plan) { + Optimizer pc = new Optimizer(new DataStatistics(), new Configuration()); + JobGraphGenerator jgg = new JobGraphGenerator(); + OptimizedPlan op = pc.compile(plan); + return jgg.compileJobGraph(op); + } + + private static class PlanExtractor extends LocalEnvironment { + + private Plan plan = null; + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + plan = createProgramPlan(); + return new JobExecutionResult(new JobID(), -1, null); + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index 8cfeead..c497a90 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -240,7 +240,6 @@ object ApplicationMaster { scheduler, libraryCacheManager, archiveProps, - accumulatorManager, executionRetries, delayBetweenRetries, timeout, @@ -257,7 +256,6 @@ object ApplicationMaster { scheduler, libraryCacheManager, archiver, - accumulatorManager, executionRetries, delayBetweenRetries, timeout,
