http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index e89559d..39d3453 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -27,7 +27,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -284,11 +283,8 @@ public class DataSourceTask<OT> extends AbstractInvokable { this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>(); this.eventualOutputs = new ArrayList<RecordWriter<?>>(); - final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); - final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); - this.output = BatchTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, - getExecutionConfig(), reporter, getEnvironment().getAccumulatorRegistry().getUserMap()); + getExecutionConfig(), getEnvironment().getAccumulatorRegistry().getUserMap()); } // ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 7ce9b0b..3a5c96f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -414,6 +414,10 @@ public class Task implements Runnable, TaskActions { return accumulatorRegistry; } + public TaskMetricGroup getMetricGroup() { + return metrics; + } + public Thread getExecutingThread() { return executingThread; } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java index 60aadf5..7bea5d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.util.SerializedThrowable; /** @@ -48,6 +49,7 @@ public class TaskExecutionState implements java.io.Serializable { /** Serialized flink and user-defined accumulators */ private final AccumulatorSnapshot accumulators; + private final IOMetrics ioMetrics; /** * Creates a new task execution state update, with no attached exception and no accumulators. @@ -60,7 +62,7 @@ public class TaskExecutionState implements java.io.Serializable { * the execution state to be reported */ public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) { - this(jobID, executionId, executionState, null, null); + this(jobID, executionId, executionState, null, null, null); } /** @@ -75,7 +77,7 @@ public class TaskExecutionState implements java.io.Serializable { */ public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState, Throwable error) { - this(jobID, executionId, executionState, error, null); + this(jobID, executionId, executionState, error, null, null); } /** @@ -95,7 +97,7 @@ public class TaskExecutionState implements java.io.Serializable { */ public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState, Throwable error, - AccumulatorSnapshot accumulators) { + AccumulatorSnapshot accumulators, IOMetrics ioMetrics) { if (jobID == null || executionId == null || executionState == null) { throw new NullPointerException(); @@ -110,6 +112,7 @@ public class TaskExecutionState implements java.io.Serializable { this.throwable = null; } this.accumulators = accumulators; + this.ioMetrics = ioMetrics; } // -------------------------------------------------------------------------------------------- @@ -164,6 +167,10 @@ public class TaskExecutionState implements java.io.Serializable { return accumulators; } + public IOMetrics getIOMetrics() { + return ioMetrics; + } + // -------------------------------------------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7608b87..ce763e7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -23,7 +23,6 @@ import java.lang.management.ManagementFactory import java.net.{InetAddress, InetSocketAddress} import java.util import java.util.UUID -import java.util.concurrent.TimeUnit import _root_.akka.actor._ import _root_.akka.pattern.ask @@ -1296,7 +1295,8 @@ class TaskManager( task.getExecutionId, task.getExecutionState, task.getFailureCause, - accumulators) + accumulators, + task.getMetricGroup.getIOMetricGroup.createSnapshot()) ) ) } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index aac2d00..ff50573 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; @@ -66,8 +65,6 @@ public class ArchivedExecutionGraphTest { private static JobVertexID v1ID = new JobVertexID(); private static JobVertexID v2ID = new JobVertexID(); - private static ExecutionAttemptID executionWithAccumulatorsID; - private static ExecutionGraph runtimeGraph; @BeforeClass @@ -119,15 +116,10 @@ public class ArchivedExecutionGraphTest { null, new TestCheckpointStatsTracker()); - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = new HashMap<>(); - flinkAccumulators.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new LongCounter(32)); - Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>(); userAccumulators.put("userAcc", new LongCounter(64)); Execution executionWithAccumulators = runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt(); - executionWithAccumulators.setAccumulators(flinkAccumulators, userAccumulators); - executionWithAccumulatorsID = executionWithAccumulators.getAttemptId(); runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new RuntimeException("This exception was thrown on purpose.")); } @@ -200,7 +192,6 @@ public class ArchivedExecutionGraphTest { // ------------------------------------------------------------------------------------------------------------- compareStringifiedAccumulators(runtimeGraph.getAccumulatorResultsStringified(), archivedGraph.getAccumulatorResultsStringified()); compareSerializedAccumulators(runtimeGraph.getAccumulatorsSerialized(), archivedGraph.getAccumulatorsSerialized()); - compareFlinkAccumulators(runtimeGraph.getFlinkAccumulators().get(executionWithAccumulatorsID), archivedGraph.getFlinkAccumulators().get(executionWithAccumulatorsID)); // ------------------------------------------------------------------------------------------------------------- // JobVertices @@ -250,7 +241,6 @@ public class ArchivedExecutionGraphTest { compareOperatorCheckpointStats(runtimeJobVertex.getCheckpointStats().get(), archivedJobVertex.getCheckpointStats().get()); compareStringifiedAccumulators(runtimeJobVertex.getAggregatedUserAccumulatorsStringified(), archivedJobVertex.getAggregatedUserAccumulatorsStringified()); - compareFlinkAccumulators(runtimeJobVertex.getAggregatedMetricAccumulators(), archivedJobVertex.getAggregatedMetricAccumulators()); AccessExecutionVertex[] runtimeExecutionVertices = runtimeJobVertex.getTaskVertices(); AccessExecutionVertex[] archivedExecutionVertices = archivedJobVertex.getTaskVertices(); @@ -294,7 +284,6 @@ public class ArchivedExecutionGraphTest { assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELED), archivedExecution.getStateTimestamp(ExecutionState.CANCELED)); assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FAILED), archivedExecution.getStateTimestamp(ExecutionState.FAILED)); compareStringifiedAccumulators(runtimeExecution.getUserAccumulatorsStringified(), archivedExecution.getUserAccumulatorsStringified()); - compareFlinkAccumulators(runtimeExecution.getFlinkAccumulators(), archivedExecution.getFlinkAccumulators()); assertEquals(runtimeExecution.getParallelSubtaskIndex(), archivedExecution.getParallelSubtaskIndex()); } @@ -321,18 +310,6 @@ public class ArchivedExecutionGraphTest { } } - private static void compareFlinkAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> runtimeAccs, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> archivedAccs) { - assertEquals(runtimeAccs == null, archivedAccs == null); - if (runtimeAccs != null && archivedAccs != null) { - assertEquals(runtimeAccs.size(), archivedAccs.size()); - for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> runtimeAcc : runtimeAccs.entrySet()) { - Accumulator<?, ?> archivedAcc = archivedAccs.get(runtimeAcc.getKey()); - - assertEquals(runtimeAcc.getValue().getLocalValue(), archivedAcc.getLocalValue()); - } - } - } - private static void compareOperatorCheckpointStats(OperatorCheckpointStats runtimeStats, OperatorCheckpointStats archivedStats) { assertEquals(runtimeStats.getNumberOfSubTasks(), archivedStats.getNumberOfSubTasks()); assertEquals(runtimeStats.getCheckpointId(), archivedStats.getCheckpointId()); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 2d0ae41..2a287af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -27,15 +27,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; -import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -50,7 +46,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -335,7 +330,7 @@ public class ExecutionGraphDeploymentTest { ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(); eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING)); - eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null, new AccumulatorSnapshot(jobId, attemptID, new HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>>(), new HashMap<String, Accumulator<?, ?>>()))); + eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null)); assertEquals(JobStatus.FAILED, eg.getState()); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java index 6853722..c6c0645 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; @@ -184,10 +183,5 @@ public class AbstractReaderTest { protected MockReader(InputGate inputGate) { super(inputGate); } - - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index d775869..a1f7c2d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -226,10 +226,9 @@ trait TestingJobManagerLike extends FlinkActor { case (jobID, (updated, actors)) if updated => currentJobs.get(jobID) match { case Some((graph, jobInfo)) => - val flinkAccumulators = graph.getFlinkAccumulators val userAccumulators = graph.aggregateUserAccumulators actors foreach { - actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators) + actor => actor ! UpdatedAccumulators(jobID, userAccumulators) } case None => } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index 72cf58b..48c2bfb 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -23,7 +23,6 @@ import java.util.Map import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.api.common.accumulators.Accumulator -import org.apache.flink.runtime.accumulators.AccumulatorRegistry import org.apache.flink.runtime.checkpoint.savepoint.Savepoint import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph} import org.apache.flink.runtime.instance.ActorGateway @@ -73,7 +72,6 @@ object TestingJobManagerMessages { * Reports updated accumulators back to the listener. */ case class UpdatedAccumulators(jobID: JobID, - flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]], userAccumulators: Map[String, Accumulator[_,_]]) /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 5a27564..714317d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -27,7 +27,6 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -204,12 +203,6 @@ public class StreamInputProcessor<IN> { } } } - - public void setReporter(AccumulatorRegistry.Reporter reporter) { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - deserializer.setReporter(reporter); - } - } /** * Sets the metric group for this StreamInputProcessor. http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 075d9e0..5f7ffe4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -269,12 +268,6 @@ public class StreamTwoInputProcessor<IN1, IN2> { } } } - - public void setReporter(AccumulatorRegistry.Reporter reporter) { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - deserializer.setReporter(reporter); - } - } /** * Sets the metric group for this StreamTwoInputProcessor. http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 0f8f4a4..68d3064 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -48,9 +47,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO getEnvironment().getIOManager()); // make sure that stream tasks report their I/O statistics - AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); - inputProcessor.setReporter(reporter); inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 2a4a065..30bc377 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -28,7 +28,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -74,7 +73,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { private final OP headOperator; - public OperatorChain(StreamTask<OUT, OP> containingTask, AccumulatorRegistry.Reporter reporter) { + public OperatorChain(StreamTask<OUT, OP> containingTask) { final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); final StreamConfig configuration = containingTask.getConfiguration(); @@ -99,7 +98,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { RecordWriterOutput<?> streamOutput = createStreamOutput( outEdge, chainedConfigs.get(outEdge.getSourceId()), i, - containingTask.getEnvironment(), reporter, containingTask.getName()); + containingTask.getEnvironment(), containingTask.getName()); this.streamOutputs[i] = streamOutput; streamOutputMap.put(outEdge, streamOutput); @@ -306,7 +305,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { private static <T> RecordWriterOutput<T> createStreamOutput( StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, - AccumulatorRegistry.Reporter reporter, String taskName) + String taskName) { TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader()); @@ -319,7 +318,6 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> { StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout()); - output.setReporter(reporter); output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup()); return new RecordWriterOutput<>(output, outSerializer); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index cd63102..fa7d1b0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -227,7 +227,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } - operatorChain = new OperatorChain<>(this, getEnvironment().getAccumulatorRegistry().getReadWriteReporter()); + operatorChain = new OperatorChain<>(this); headOperator = operatorChain.getHeadOperator(); getEnvironment().getMetricGroup().gauge("lastCheckpointSize", new Gauge<Long>() { http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index d695781..978c9f2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; @@ -73,9 +72,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS getEnvironment().getIOManager()); // make sure that stream tasks report their I/O statistics - AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); - this.inputProcessor.setReporter(reporter); inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java index 32e8ea9..c95a85e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -149,9 +148,7 @@ public class StreamOperatorChainingTest { StreamTask<Integer, StreamMap<Integer, Integer>> mockTask = createMockTask(streamConfig, chainedVertex.getName()); - OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>( - mockTask, - mock(AccumulatorRegistry.Reporter.class)); + OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask); headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint()); @@ -291,9 +288,7 @@ public class StreamOperatorChainingTest { StreamTask<Integer, StreamMap<Integer, Integer>> mockTask = createMockTask(streamConfig, chainedVertex.getName()); - OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>( - mockTask, - mock(AccumulatorRegistry.Reporter.class)); + OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask); headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint()); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 624bfff..c56fa91 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.IntCounter; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.DataSet; @@ -41,7 +40,6 @@ import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -203,32 +201,14 @@ public class AccumulatorLiveITCase { TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT); - Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = msg.flinkAccumulators(); Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators(); ExecutionAttemptID mapperTaskID = null; - // find out the first task's execution attempt id - for (Map.Entry<ExecutionAttemptID, ?> entry : flinkAccumulators.entrySet()) { - if (entry.getValue() != null) { - mapperTaskID = entry.getKey(); - break; - } - } - ExecutionAttemptID sinkTaskID = null; - // find the second's task id - for (ExecutionAttemptID key : flinkAccumulators.keySet()) { - if (key != mapperTaskID) { - sinkTaskID = key; - break; - } - } - /* Check for accumulator values */ - if(checkUserAccumulators(0, userAccumulators) && - checkFlinkAccumulators(mapperTaskID, 0, 0, 0, 0, flinkAccumulators)) { + if(checkUserAccumulators(0, userAccumulators)) { LOG.info("Passed initial check for map task."); } else { fail("Wrong accumulator results when map task begins execution."); @@ -242,17 +222,13 @@ public class AccumulatorLiveITCase { // receive message msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT); - flinkAccumulators = msg.flinkAccumulators(); userAccumulators = msg.userAccumulators(); - LOG.info("{}", flinkAccumulators); LOG.info("{}", userAccumulators); - if (checkUserAccumulators(expectedAccVal, userAccumulators) && - checkFlinkAccumulators(mapperTaskID, 0, i, 0, i * 4, flinkAccumulators)) { + if (checkUserAccumulators(expectedAccVal, userAccumulators)) { LOG.info("Passed round #" + i); - } else if (checkUserAccumulators(expectedAccVal, userAccumulators) && - checkFlinkAccumulators(sinkTaskID, 0, i, 0, i * 4, flinkAccumulators)) { + } else if (checkUserAccumulators(expectedAccVal, userAccumulators)) { // we determined the wrong task id and need to switch the two here ExecutionAttemptID temp = mapperTaskID; mapperTaskID = sinkTaskID; @@ -264,11 +240,9 @@ public class AccumulatorLiveITCase { } msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT); - flinkAccumulators = msg.flinkAccumulators(); userAccumulators = msg.userAccumulators(); - if(checkUserAccumulators(expectedAccVal, userAccumulators) && - checkFlinkAccumulators(sinkTaskID, 0, 0, 0, 0, flinkAccumulators)) { + if(checkUserAccumulators(expectedAccVal, userAccumulators)) { LOG.info("Passed initial check for sink task."); } else { fail("Wrong accumulator results when sink task begins execution."); @@ -280,14 +254,11 @@ public class AccumulatorLiveITCase { // receive message msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT); - flinkAccumulators = msg.flinkAccumulators(); userAccumulators = msg.userAccumulators(); - LOG.info("{}", flinkAccumulators); LOG.info("{}", userAccumulators); - if (checkUserAccumulators(expectedAccVal, userAccumulators) && - checkFlinkAccumulators(sinkTaskID, i, 0, i * 4, 0, flinkAccumulators)) { + if (checkUserAccumulators(expectedAccVal, userAccumulators)) { LOG.info("Passed round #" + i); } else { fail("Failed in round #" + i); @@ -305,49 +276,6 @@ public class AccumulatorLiveITCase { return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter)accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue(); } - private static boolean checkFlinkAccumulators(ExecutionAttemptID taskKey, int expectedRecordsIn, int expectedRecordsOut, int expectedBytesIn, int expectedBytesOut, - Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> accumulatorMap) { - LOG.info("checking flink accumulators"); - - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskMap = accumulatorMap.get(taskKey); - assertTrue(accumulatorMap.size() > 0); - - for (Map.Entry<AccumulatorRegistry.Metric, Accumulator<?, ?>> entry : taskMap.entrySet()) { - switch (entry.getKey()) { - /** - * The following two cases are for the DataSource and Map task - */ - case NUM_RECORDS_OUT: - if(((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsOut) { - return false; - } - break; - case NUM_BYTES_OUT: - if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesOut) { - return false; - } - break; - /** - * The following two cases are for the DataSink task - */ - case NUM_RECORDS_IN: - if (((LongCounter) entry.getValue()).getLocalValue() < expectedRecordsIn) { - return false; - } - break; - case NUM_BYTES_IN: - if (((LongCounter) entry.getValue()).getLocalValue() < expectedBytesIn) { - return false; - } - break; - default: - fail("Unknown accumulator found."); - } - } - return true; - } - - /** * UDF that notifies when it changes the accumulator values */
