[FLINK-4733] Port Task IO metrics
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba2d007e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba2d007e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba2d007e Branch: refs/heads/master Commit: ba2d007e5ad270b9a403d037d186de61cdaac742 Parents: cf4f364 Author: zentol <[email protected]> Authored: Mon Oct 31 14:17:05 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Oct 31 15:12:04 2016 +0100 ---------------------------------------------------------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 12 +-- .../webmonitor/handlers/JobDetailsHandler.java | 53 +++++++++---- .../handlers/JobVertexDetailsHandler.java | 63 ++++++++------- .../handlers/JobVertexTaskManagersHandler.java | 57 +++++++------- .../SubtaskCurrentAttemptDetailsHandler.java | 5 +- .../SubtaskExecutionAttemptDetailsHandler.java | 63 ++++++++------- .../accumulators/AccumulatorRegistry.java | 80 +------------------ .../accumulators/AccumulatorSnapshot.java | 15 ---- .../runtime/executiongraph/AccessExecution.java | 15 +--- .../executiongraph/AccessExecutionGraph.java | 11 --- .../AccessExecutionJobVertex.java | 13 ---- .../executiongraph/ArchivedExecution.java | 18 ++--- .../executiongraph/ArchivedExecutionGraph.java | 21 ----- .../ArchivedExecutionJobVertex.java | 8 -- .../flink/runtime/executiongraph/Execution.java | 24 +++--- .../runtime/executiongraph/ExecutionGraph.java | 28 +------ .../executiongraph/ExecutionJobVertex.java | 33 -------- .../api/reader/AbstractRecordReader.java | 8 -- .../io/network/api/reader/BufferReader.java | 6 -- .../io/network/api/reader/ReaderBase.java | 6 -- .../AdaptiveSpanningRecordDeserializer.java | 31 -------- .../api/serialization/RecordDeserializer.java | 6 -- .../api/serialization/RecordSerializer.java | 6 -- .../serialization/SpanningRecordSerializer.java | 13 ---- ...llingAdaptiveSpanningRecordDeserializer.java | 31 -------- .../io/network/api/writer/RecordWriter.java | 10 --- .../iterative/task/IterationHeadTask.java | 4 +- .../flink/runtime/operators/BatchTask.java | 22 ++---- .../flink/runtime/operators/DataSinkTask.java | 6 -- .../flink/runtime/operators/DataSourceTask.java | 6 +- .../apache/flink/runtime/taskmanager/Task.java | 4 + .../runtime/taskmanager/TaskExecutionState.java | 13 +++- .../flink/runtime/taskmanager/TaskManager.scala | 4 +- .../ArchivedExecutionGraphTest.java | 23 ------ .../ExecutionGraphDeploymentTest.java | 7 +- .../network/api/reader/AbstractReaderTest.java | 6 -- .../testingUtils/TestingJobManagerLike.scala | 3 +- .../TestingJobManagerMessages.scala | 2 - .../runtime/io/StreamInputProcessor.java | 7 -- .../runtime/io/StreamTwoInputProcessor.java | 7 -- .../runtime/tasks/OneInputStreamTask.java | 4 - .../streaming/runtime/tasks/OperatorChain.java | 8 +- .../streaming/runtime/tasks/StreamTask.java | 2 +- .../runtime/tasks/TwoInputStreamTask.java | 4 - .../operators/StreamOperatorChainingTest.java | 9 +-- .../accumulators/AccumulatorLiveITCase.java | 82 ++------------------ 46 files changed, 207 insertions(+), 652 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index a0afba2..7d2b5b6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -262,12 +262,12 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobs", handler(new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT))) - .GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs))) + .GET("/jobs/:jobid", handler(new JobDetailsHandler(currentGraphs, metricFetcher))) + .GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(currentGraphs, metricFetcher))) - .GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs))) + .GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs, metricFetcher))) .GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs))) + .GET("/jobs/:jobid/vertices/:vertexid/taskmanagers", handler(new JobVertexTaskManagersHandler(currentGraphs, metricFetcher))) .GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler( @@ -276,8 +276,8 @@ public class WebRuntimeMonitor implements WebMonitor { refreshInterval))) .GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs))) - .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs))) + .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher))) + .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs))) .GET("/jobs/:jobid/plan", handler(new JobPlanHandler(currentGraphs))) http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index e7a2a8c..6de6dc5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -20,16 +20,16 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import java.io.StringWriter; import java.util.Map; @@ -45,9 +45,12 @@ import java.util.Map; * </ul> */ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { - - public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder) { + + private final MetricFetcher fetcher; + + public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { super(executionGraphHolder); + this.fetcher = fetcher; } @Override @@ -124,13 +127,6 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { ExecutionState jobVertexState = ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism()); jobVerticesPerState[jobVertexState.ordinal()]++; - - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = ejv.getAggregatedMetricAccumulators(); - - LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); - LongCounter writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT); - LongCounter readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN); - LongCounter writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT); gen.writeStartObject(); gen.writeStringField("id", ejv.getJobVertexId().toString()); @@ -148,11 +144,36 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { } gen.writeEndObject(); + long numBytesIn = 0; + long numBytesOut = 0; + long numRecordsIn = 0; + long numRecordsOut = 0; + + for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { + IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); + + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + numBytesOut += ioMetrics.getNumBytesOut(); + numRecordsIn += ioMetrics.getNumRecordsIn(); + numRecordsOut += ioMetrics.getNumRecordsOut(); + } else { // execAttempt is still running, use MetricQueryService instead + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex()); + if (metrics != null) { + numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); + numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0")); + numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0")); + numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + } + } + } + gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L); - gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L); - gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L); - gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L); + gen.writeNumberField("read-bytes", numBytesIn); + gen.writeNumberField("write-bytes", numBytesOut); + gen.writeNumberField("read-records", numRecordsIn); + gen.writeNumberField("write-records", numRecordsOut); gen.writeEndObject(); gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index fbdd86b..14dcd39 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -20,14 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import java.io.StringWriter; import java.util.Map; @@ -37,9 +37,12 @@ import java.util.Map; * and the runtime and metrics of all its subtasks. */ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { - - public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder) { + + private final MetricFetcher fetcher; + + public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { super(executionGraphHolder); + this.fetcher = fetcher; } @Override @@ -71,25 +74,6 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1; long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators(); - LongCounter readBytes; - LongCounter writeBytes; - LongCounter readRecords; - LongCounter writeRecords; - - if (metrics != null) { - readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); - writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT); - readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN); - writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT); - } - else { - readBytes = null; - writeBytes = null; - readRecords = null; - writeRecords = null; - } - gen.writeStartObject(); gen.writeNumberField("subtask", num); gen.writeStringField("status", status.name()); @@ -99,11 +83,34 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { gen.writeNumberField("end-time", endTime); gen.writeNumberField("duration", duration); + IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); + + long numBytesIn = 0; + long numBytesOut = 0; + long numRecordsIn = 0; + long numRecordsOut = 0; + + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + numBytesOut = ioMetrics.getNumBytesOut(); + numRecordsIn = ioMetrics.getNumRecordsIn(); + numRecordsOut = ioMetrics.getNumRecordsOut(); + } else { // execAttempt is still running, use MetricQueryService instead + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex()); + if (metrics != null) { + numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); + numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0")); + numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0")); + numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + } + } + gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L); - gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L); - gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L); - gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L); + gen.writeNumberField("read-bytes", numBytesIn); + gen.writeNumberField("write-bytes", numBytesOut); + gen.writeNumberField("read-records", numRecordsIn); + gen.writeNumberField("write-records", numRecordsOut); gen.writeEndObject(); gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index 0e94334..c1fabf8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -19,15 +19,15 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import java.io.StringWriter; import java.util.ArrayList; @@ -42,8 +42,11 @@ import java.util.Map.Entry; */ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandler { - public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder) { + private final MetricFetcher fetcher; + + public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { super(executionGraphHolder); + this.fetcher = fetcher; } @Override @@ -88,10 +91,10 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle long endTime = 0; boolean allFinished = true; - LongCounter tmReadBytes = new LongCounter(); - LongCounter tmWriteBytes = new LongCounter(); - LongCounter tmReadRecords = new LongCounter(); - LongCounter tmWriteRecords = new LongCounter(); + long numBytesIn = 0; + long numBytesOut = 0; + long numRecordsIn = 0; + long numRecordsOut = 0; for (AccessExecutionVertex vertex : taskVertices) { final ExecutionState state = vertex.getExecutionState(); @@ -106,20 +109,22 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle allFinished &= state.isTerminal(); endTime = Math.max(endTime, vertex.getStateTimestamp(state)); - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = vertex.getCurrentExecutionAttempt().getFlinkAccumulators(); - - if (metrics != null) { - LongCounter readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); - tmReadBytes.merge(readBytes); - - LongCounter writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT); - tmWriteBytes.merge(writeBytes); - - LongCounter readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN); - tmReadRecords.merge(readRecords); - - LongCounter writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT); - tmWriteRecords.merge(writeRecords); + IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); + + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + numBytesOut += ioMetrics.getNumBytesOut(); + numRecordsIn += ioMetrics.getNumRecordsIn(); + numRecordsOut += ioMetrics.getNumRecordsOut(); + } else { // execAttempt is still running, use MetricQueryService instead + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), vertex.getParallelSubtaskIndex()); + if (metrics != null) { + numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); + numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0")); + numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0")); + numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + } } } @@ -152,10 +157,10 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle gen.writeNumberField("duration", duration); gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", tmReadBytes.getLocalValuePrimitive()); - gen.writeNumberField("write-bytes", tmWriteBytes.getLocalValuePrimitive()); - gen.writeNumberField("read-records", tmReadRecords.getLocalValuePrimitive()); - gen.writeNumberField("write-records", tmWriteRecords.getLocalValuePrimitive()); + gen.writeNumberField("read-bytes", numBytesIn); + gen.writeNumberField("write-bytes", numBytesOut); + gen.writeNumberField("read-records", numRecordsIn); + gen.writeNumberField("write-records", numRecordsOut); gen.writeEndObject(); gen.writeObjectFieldStart("status-counts"); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java index 811bea6..6d09513 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import java.util.Map; @@ -28,8 +29,8 @@ import java.util.Map; */ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler { - public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) { - super(executionGraphHolder); + public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { + super(executionGraphHolder, fetcher); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index 3cc7376..ca9c7ad 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -20,13 +20,13 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import java.io.StringWriter; import java.util.Map; @@ -35,9 +35,12 @@ import java.util.Map; * Request handler providing details about a single task execution attempt. */ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptRequestHandler { - - public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder) { + + private final MetricFetcher fetcher; + + public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) { super(executionGraphHolder); + this.fetcher = fetcher; } @Override @@ -55,25 +58,6 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1; long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = execAttempt.getFlinkAccumulators(); - LongCounter readBytes; - LongCounter writeBytes; - LongCounter readRecords; - LongCounter writeRecords; - - if (metrics != null) { - readBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); - writeBytes = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT); - readRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN); - writeRecords = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT); - } - else { - readBytes = null; - writeBytes = null; - readRecords = null; - writeRecords = null; - } - StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); @@ -86,11 +70,34 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp gen.writeNumberField("end-time", endTime); gen.writeNumberField("duration", duration); + IOMetrics ioMetrics = execAttempt.getIOMetrics(); + + long numBytesIn = 0; + long numBytesOut = 0; + long numRecordsIn = 0; + long numRecordsOut = 0; + + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + numBytesOut = ioMetrics.getNumBytesOut(); + numRecordsIn = ioMetrics.getNumRecordsIn(); + numRecordsOut = ioMetrics.getNumRecordsOut(); + } else { // execAttempt is still running, use MetricQueryService instead + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), execAttempt.getParallelSubtaskIndex()); + if (metrics != null) { + numBytesIn = Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); + numBytesOut = Long.valueOf(metrics.getMetric("numBytesOut", "0")); + numRecordsIn = Long.valueOf(metrics.getMetric("numRecordsIn", "0")); + numRecordsOut = Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + } + } + gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", readBytes != null ? readBytes.getLocalValuePrimitive() : -1L); - gen.writeNumberField("write-bytes", writeBytes != null ? writeBytes.getLocalValuePrimitive() : -1L); - gen.writeNumberField("read-records", readRecords != null ? readRecords.getLocalValuePrimitive() : -1L); - gen.writeNumberField("write-records",writeRecords != null ? writeRecords.getLocalValuePrimitive() : -1L); + gen.writeNumberField("read-bytes", numBytesIn); + gen.writeNumberField("write-bytes", numBytesOut); + gen.writeNumberField("read-records", numRecordsIn); + gen.writeNumberField("write-records", numRecordsOut); gen.writeEndObject(); gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java index 44714e7..ce6cb1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java @@ -20,18 +20,16 @@ package org.apache.flink.runtime.accumulators; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * Main accumulator registry which encapsulates internal and user-defined accumulators. + * Main accumulator registry which encapsulates user-defined accumulators. */ public class AccumulatorRegistry { @@ -40,32 +38,13 @@ public class AccumulatorRegistry { protected final JobID jobID; protected final ExecutionAttemptID taskID; - /* Flink's internal Accumulator values stored for the executing task. */ - private final Map<Metric, Accumulator<?, ?>> flinkAccumulators = - new HashMap<Metric, Accumulator<?, ?>>(); - /* User-defined Accumulator values stored for the executing task. */ private final Map<String, Accumulator<?, ?>> userAccumulators = new ConcurrentHashMap<>(4); - /* The reporter reference that is handed to the reporting tasks. */ - private final ReadWriteReporter reporter; - - /** - * Flink metrics supported - */ - public enum Metric { - NUM_RECORDS_IN, - NUM_RECORDS_OUT, - NUM_BYTES_IN, - NUM_BYTES_OUT - } - - public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) { this.jobID = jobID; this.taskID = taskID; - this.reporter = new ReadWriteReporter(flinkAccumulators); } /** @@ -74,7 +53,7 @@ public class AccumulatorRegistry { */ public AccumulatorSnapshot getSnapshot() { try { - return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators); + return new AccumulatorSnapshot(jobID, taskID, userAccumulators); } catch (Throwable e) { LOG.warn("Failed to serialize accumulators for task.", e); return null; @@ -88,59 +67,4 @@ public class AccumulatorRegistry { return userAccumulators; } - /** - * Gets the reporter for flink internal metrics. - */ - public Reporter getReadWriteReporter() { - return reporter; - } - - /** - * Interface for Flink's internal accumulators. - */ - public interface Reporter { - void reportNumRecordsIn(long value); - void reportNumRecordsOut(long value); - void reportNumBytesIn(long value); - void reportNumBytesOut(long value); - } - - /** - * Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out) - */ - private static class ReadWriteReporter implements Reporter { - - private LongCounter numRecordsIn = new LongCounter(); - private LongCounter numRecordsOut = new LongCounter(); - private LongCounter numBytesIn = new LongCounter(); - private LongCounter numBytesOut = new LongCounter(); - - private ReadWriteReporter(Map<Metric, Accumulator<?,?>> accumulatorMap) { - accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn); - accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut); - accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn); - accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut); - } - - @Override - public void reportNumRecordsIn(long value) { - numRecordsIn.add(value); - } - - @Override - public void reportNumRecordsOut(long value) { - numRecordsOut.add(value); - } - - @Override - public void reportNumBytesIn(long value) { - numBytesIn.add(value); - } - - @Override - public void reportNumBytesOut(long value) { - numBytesOut.add(value); - } - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java index d0f4bad..0bfb1ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java @@ -40,21 +40,14 @@ public class AccumulatorSnapshot implements Serializable { private final ExecutionAttemptID executionAttemptID; /** - * Flink internal accumulators which can be deserialized using the system class loader. - */ - private final SerializedValue<Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators; - - /** * Serialized user accumulators which may require the custom user class loader. */ private final SerializedValue<Map<String, Accumulator<?, ?>>> userAccumulators; public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID executionAttemptID, - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) throws IOException { this.jobID = jobID; this.executionAttemptID = executionAttemptID; - this.flinkAccumulators = new SerializedValue<Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>(flinkAccumulators); this.userAccumulators = new SerializedValue<Map<String, Accumulator<?, ?>>>(userAccumulators); } @@ -67,14 +60,6 @@ public class AccumulatorSnapshot implements Serializable { } /** - * Gets the Flink (internal) accumulators values. - * @return the serialized map - */ - public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> deserializeFlinkAccumulators() throws IOException, ClassNotFoundException { - return flinkAccumulators.deserializeValue(getClass().getClassLoader()); - } - - /** * Gets the user-defined accumulators values. * @return the serialized map */ http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java index aefc17d..df558c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java @@ -17,14 +17,10 @@ */ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import java.util.Map; - /** * Common interface for the runtime {@link Execution and {@link ArchivedExecution}. */ @@ -88,18 +84,11 @@ public interface AccessExecution { StringifiedAccumulatorResult[] getUserAccumulatorsStringified(); /** - * Returns the system-defined accumulators. - * - * @return system-defined accumulators. - * @deprecated Will be removed in FLINK-4527 - */ - @Deprecated - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators(); - - /** * Returns the subtask index of this execution. * * @return subtask index of this execution. */ int getParallelSubtaskIndex(); + + IOMetrics getIOMetrics(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index 0fd97da..e7fe1b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; @@ -153,15 +151,6 @@ public interface AccessExecutionGraph { Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException; /** - * Returns the aggregated system-defined accumulators. - * - * @return aggregated system-defined accumulators. - * @deprecated Will be removed in FLINK-4527 - */ - @Deprecated - Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators(); - - /** * Returns whether this execution graph was archived. * * @return true, if the execution graph was archived, false otherwise http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java index c9bf604..92af0c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java @@ -17,16 +17,12 @@ */ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; import scala.Option; -import java.util.Map; - /** * Common interface for the runtime {@link ExecutionJobVertex} and {@link ArchivedExecutionJobVertex}. */ @@ -81,15 +77,6 @@ public interface AccessExecutionJobVertex { Option<OperatorCheckpointStats> getCheckpointStats(); /** - * Returns the aggregated system-defined accumulators. - * - * @return aggregated system-defined accumulators. - * @deprecated Will be removed in FLINK-4527 - */ - @Deprecated - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators(); - - /** * Returns the aggregated user-defined accumulators as strings. * * @return aggregated user-defined accumulators as strings. http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java index 0b2992f..c189d42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java @@ -17,15 +17,12 @@ */ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import java.io.Serializable; -import java.util.Map; public class ArchivedExecution implements AccessExecution, Serializable { private static final long serialVersionUID = 4817108757483345173L; @@ -46,13 +43,12 @@ public class ArchivedExecution implements AccessExecution, Serializable { /* Continuously updated map of user-defined accumulators */ private final StringifiedAccumulatorResult[] userAccumulators; - /* Continuously updated map of internal accumulators */ - private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators; private final int parallelSubtaskIndex; + private final IOMetrics ioMetrics; + public ArchivedExecution(Execution execution) { this.userAccumulators = execution.getUserAccumulatorsStringified(); - this.flinkAccumulators = execution.getFlinkAccumulators(); this.attemptId = execution.getAttemptId(); this.attemptNumber = execution.getAttemptNumber(); this.stateTimestamps = execution.getStateTimestamps(); @@ -60,6 +56,7 @@ public class ArchivedExecution implements AccessExecution, Serializable { this.state = execution.getState(); this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause()); this.assignedResourceLocation = execution.getAssignedResourceLocation(); + this.ioMetrics = execution.getIOMetrics(); } // -------------------------------------------------------------------------------------------- @@ -106,13 +103,14 @@ public class ArchivedExecution implements AccessExecution, Serializable { return userAccumulators; } - @Override - public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() { - return flinkAccumulators; - } @Override public int getParallelSubtaskIndex() { return parallelSubtaskIndex; } + + @Override + public IOMetrics getIOMetrics() { + return ioMetrics; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index d8c58c8..0bd5319 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -19,8 +19,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; @@ -31,7 +29,6 @@ import org.apache.flink.util.SerializedValue; import java.io.Serializable; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -208,24 +205,6 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl return tracker; } - /** - * Gets the internal flink accumulator map of maps which contains some metrics. - * - * @return A map of accumulators for every executed task. - */ - @Override - public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators() { - Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = - new HashMap<>(); - - for (AccessExecutionVertex vertex : getAllExecutionVertices()) { - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators(); - flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs); - } - - return flinkAccumulators; - } - @Override public boolean isArchived() { return true; http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java index 4857bf5..8ae6bbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; @@ -46,7 +45,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser private final int maxParallelism; - private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> aggregatedMetricAccumulators; private final Option<OperatorCheckpointStats> checkpointStats; private final StringifiedAccumulatorResult[] archivedUserAccumulators; @@ -56,8 +54,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser taskVertices[x] = jobVertex.getTaskVertices()[x].archive(); } - aggregatedMetricAccumulators = jobVertex.getAggregatedMetricAccumulators(); - Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = new HashMap<>(); for (ExecutionVertex vertex : jobVertex.getTaskVertices()) { Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); @@ -116,10 +112,6 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser return getAggregateJobVertexState(num, parallelism); } - public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() { - return this.aggregatedMetricAccumulators; - } - // -------------------------------------------------------------------------------------------- // Static / pre-assigned input splits // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 17e0df1..788dee4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -23,7 +23,6 @@ import akka.dispatch.OnFailure; import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.BiFunction; @@ -143,9 +142,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution /* Continuously updated map of user-defined accumulators */ private volatile Map<String, Accumulator<?, ?>> userAccumulators; - - /* Continuously updated map of internal accumulators */ - private volatile Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators; + private IOMetrics ioMetrics; // -------------------------------------------------------------------------------------------- @@ -651,7 +648,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution markFinished(null, null); } - void markFinished(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) { + void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) { // this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!) while (true) { @@ -673,9 +670,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } synchronized (accumulatorLock) { - this.flinkAccumulators = flinkAccumulators; this.userAccumulators = userAccumulators; } + this.ioMetrics = metrics; assignedResource.releaseSlot(); vertex.getExecutionGraph().deregisterExecution(this); @@ -1010,14 +1007,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution /** * Update accumulators (discarded when the Execution has already been terminated). - * @param flinkAccumulators the flink internal accumulators * @param userAccumulators the user accumulators */ - public void setAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators, - Map<String, Accumulator<?, ?>> userAccumulators) { + public void setAccumulators(Map<String, Accumulator<?, ?>> userAccumulators) { synchronized (accumulatorLock) { if (!state.isTerminal()) { - this.flinkAccumulators = flinkAccumulators; this.userAccumulators = userAccumulators; } } @@ -1033,14 +1027,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } @Override - public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() { - return flinkAccumulators; - } - - @Override public int getParallelSubtaskIndex() { return getVertex().getParallelSubtaskIndex(); } + + @Override + public IOMetrics getIOMetrics() { + return ioMetrics; + } // ------------------------------------------------------------------------ // Standard utilities http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 0a79cf2..074a04d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -29,7 +29,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.blob.BlobKey; @@ -600,23 +599,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } /** - * Gets the internal flink accumulator map of maps which contains some metrics. - * @return A map of accumulators for every executed task. - */ - @Override - public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> getFlinkAccumulators() { - Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators = - new HashMap<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>(); - - for (ExecutionVertex vertex : getAllExecutionVertices()) { - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators(); - flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs); - } - - return flinkAccumulators; - } - - /** * Merges all accumulator results from the tasks previously executed in the Executions. * @return The accumulator map */ @@ -1075,7 +1057,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive /** * Updates the state of one of the ExecutionVertex's Execution attempts. - * If the new status if "FINISHED", this also updates the + * If the new status if "FINISHED", this also updates the accumulators. * * @param state The state update. * @return True, if the task update was properly applied, false, if the execution attempt was not found. @@ -1090,11 +1072,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive case FINISHED: try { AccumulatorSnapshot accumulators = state.getAccumulators(); - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = - accumulators.deserializeFlinkAccumulators(); Map<String, Accumulator<?, ?>> userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader); - attempt.markFinished(flinkAccumulators, userAccumulators); + attempt.markFinished(userAccumulators, state.getIOMetrics()); } catch (Exception e) { LOG.error("Failed to deserialize final accumulator results.", e); @@ -1160,16 +1140,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive * @param accumulatorSnapshot The serialized flink and user-defined accumulators */ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators; Map<String, Accumulator<?, ?>> userAccumulators; try { - flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators(); userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader); ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID(); Execution execution = currentExecutions.get(execID); if (execution != null) { - execution.setAccumulators(flinkAccumulators, userAccumulators); + execution.setAccumulators(userAccumulators); } else { LOG.warn("Received accumulator result for unknown execution {}.", execID); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index e7f16a2..2d9ec88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -20,13 +20,11 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats; @@ -443,37 +441,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable // -------------------------------------------------------------------------------------------- // Accumulators / Metrics // -------------------------------------------------------------------------------------------- - - public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() { - // some specialized code to speed things up - long bytesRead = 0; - long bytesWritten = 0; - long recordsRead = 0; - long recordsWritten = 0; - - for (ExecutionVertex v : getTaskVertices()) { - Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = v.getCurrentExecutionAttempt().getFlinkAccumulators(); - - if (metrics != null) { - LongCounter br = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_IN); - LongCounter bw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_BYTES_OUT); - LongCounter rr = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_IN); - LongCounter rw = (LongCounter) metrics.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT); - - bytesRead += br != null ? br.getLocalValuePrimitive() : 0; - bytesWritten += bw != null ? bw.getLocalValuePrimitive() : 0; - recordsRead += rr != null ? rr.getLocalValuePrimitive() : 0; - recordsWritten += rw != null ? rw.getLocalValuePrimitive() : 0; - } - } - - HashMap<AccumulatorRegistry.Metric, Accumulator<?, ?>> agg = new HashMap<>(); - agg.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, new LongCounter(bytesRead)); - agg.put(AccumulatorRegistry.Metric.NUM_BYTES_OUT, new LongCounter(bytesWritten)); - agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_IN, new LongCounter(recordsRead)); - agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_OUT, new LongCounter(recordsWritten)); - return agg; - } public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() { Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>(); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index e0fe355..c5aeef7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; @@ -123,11 +122,4 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra } } } - - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - for (RecordDeserializer<?> deserializer : recordDeserializers) { - deserializer.setReporter(reporter); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index debb352..ca59609 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.api.reader; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -48,9 +47,4 @@ public final class BufferReader extends AbstractReader { } } } - - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - - } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java index a1d705f..0cc77f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.reader; import java.io.IOException; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; @@ -52,9 +51,4 @@ public interface ReaderBase { boolean hasReachedEndOfSuperstep(); - /** - * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. - */ - void setReporter(AccumulatorRegistry.Reporter reporter); - } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index cdd8731..8f2c8fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; @@ -46,8 +45,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im private Buffer currentBuffer; - private AccumulatorRegistry.Reporter reporter; - public AdaptiveSpanningRecordDeserializer() { this.nonSpanningWrapper = new NonSpanningWrapper(); this.spanningWrapper = new SpanningWrapper(); @@ -93,18 +90,10 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im if (nonSpanningRemaining >= 4) { int len = this.nonSpanningWrapper.readInt(); - if (reporter != null) { - reporter.reportNumBytesIn(len); - } - if (len <= nonSpanningRemaining - 4) { // we can get a full record from here target.read(this.nonSpanningWrapper); - if (reporter != null) { - reporter.reportNumRecordsIn(1); - } - return (this.nonSpanningWrapper.remaining() == 0) ? DeserializationResult.LAST_RECORD_FROM_BUFFER : DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; @@ -128,10 +117,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im // get the full record target.read(this.spanningWrapper); - if (reporter != null) { - reporter.reportNumRecordsIn(1); - } - // move the remainder to the non-spanning wrapper // this does not copy it, only sets the memory segment this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper); @@ -159,12 +144,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0; } - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - this.reporter = reporter; - this.spanningWrapper.setReporter(reporter); - } - // ----------------------------------------------------------------------------------------------------------------- private static final class NonSpanningWrapper implements DataInputView { @@ -447,8 +426,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im private int recordLimit; - private AccumulatorRegistry.Reporter reporter; - public SpanningWrapper() { this.lengthBuffer = ByteBuffer.allocate(4); this.lengthBuffer.order(ByteOrder.BIG_ENDIAN); @@ -486,10 +463,6 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im } else { this.recordLength = this.lengthBuffer.getInt(0); - if (reporter != null) { - reporter.reportNumBytesIn(this.recordLength); - } - this.lengthBuffer.clear(); segmentPosition = toPut; } @@ -634,9 +607,5 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im public int read(byte[] b) throws IOException { return this.serializationReadBuffer.read(b); } - - public void setReporter(AccumulatorRegistry.Reporter reporter) { - this.reporter = reporter; - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java index e4c7890..dd8ea06 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; /** @@ -65,9 +64,4 @@ public interface RecordDeserializer<T extends IOReadableWritable> { void clear(); boolean hasUnfinishedData(); - - /** - * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. - */ - void setReporter(AccumulatorRegistry.Reporter reporter); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index c76dd00..e8179dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; /** @@ -67,11 +66,6 @@ public interface RecordSerializer<T extends IOReadableWritable> { boolean hasData(); /** - * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. - */ - void setReporter(AccumulatorRegistry.Reporter reporter); - - /** * Insantiates all metrics. * * @param metrics metric group http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 7c4d937..e36a16f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -26,7 +26,6 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataOutputSerializer; @@ -53,8 +52,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R /** Limit of current {@link MemorySegment} of target buffer */ private int limit; - private AccumulatorRegistry.Reporter reporter; - private transient Counter numBytesOut; public SpanningRecordSerializer() { @@ -84,11 +81,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R int len = this.serializationBuffer.length(); this.lengthBuffer.putInt(0, len); - - if (reporter != null) { - reporter.reportNumBytesOut(len); - reporter.reportNumRecordsOut(1); - } if (numBytesOut != null) { numBytesOut.inc(len); @@ -192,11 +184,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R } @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - this.reporter = reporter; - } - - @Override public void instantiateMetrics(TaskIOMetricGroup metrics) { numBytesOut = metrics.getNumBytesOutCounter(); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index eab8e7c..7c213b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -22,7 +22,6 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.util.StringUtils; @@ -59,8 +58,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit private Buffer currentBuffer; - private AccumulatorRegistry.Reporter reporter; - public SpillingAdaptiveSpanningRecordDeserializer(String[] tmpDirectories) { this.nonSpanningWrapper = new NonSpanningWrapper(); this.spanningWrapper = new SpanningWrapper(tmpDirectories); @@ -106,19 +103,11 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit if (nonSpanningRemaining >= 4) { int len = this.nonSpanningWrapper.readInt(); - if (reporter != null) { - reporter.reportNumBytesIn(len); - } - if (len <= nonSpanningRemaining - 4) { // we can get a full record from here try { target.read(this.nonSpanningWrapper); - if (reporter != null) { - reporter.reportNumRecordsIn(1); - } - int remaining = this.nonSpanningWrapper.remaining(); if (remaining > 0) { return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; @@ -153,10 +142,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit if (this.spanningWrapper.hasFullRecord()) { // get the full record target.read(this.spanningWrapper.getInputView()); - - if (reporter != null) { - reporter.reportNumRecordsIn(1); - } // move the remainder to the non-spanning wrapper // this does not copy it, only sets the memory segment @@ -182,12 +167,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0; } - @Override - public void setReporter(AccumulatorRegistry.Reporter reporter) { - this.reporter = reporter; - this.spanningWrapper.setReporter(reporter); - } - // ----------------------------------------------------------------------------------------------------------------- @@ -483,8 +462,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit private DataInputViewStreamWrapper spillFileReader; - private AccumulatorRegistry.Reporter reporter; - public SpanningWrapper(String[] tempDirs) { this.tempDirs = tempDirs; @@ -538,10 +515,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit } else { this.recordLength = this.lengthBuffer.getInt(0); - if (reporter != null) { - reporter.reportNumBytesIn(recordLength); - } - this.lengthBuffer.clear(); segmentPosition = toPut; @@ -672,9 +645,5 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit random.nextBytes(bytes); return StringUtils.byteToHexString(bytes); } - - public void setReporter(AccumulatorRegistry.Reporter reporter) { - this.reporter = reporter; - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 96eea23..1e224c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; @@ -201,15 +200,6 @@ public class RecordWriter<T extends IOReadableWritable> { } /** - * Counter for the number of records emitted and the records processed. - */ - public void setReporter(AccumulatorRegistry.Reporter reporter) { - for(RecordSerializer<?> serializer : serializers) { - serializer.setReporter(reporter); - } - } - - /** * Sets the metric group for this RecordWriter. * @param metrics */ http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java index 4bc4532..2e3285c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.operators.Driver; @@ -114,9 +113,8 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>(); final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig(); final ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter(); this.finalOutputCollector = BatchTask.getOutputCollector(this, finalOutConfig, - userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter); + userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs()); // sanity check the setup final int writersIntoStepFunction = this.eventualOutputs.size(); http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index e896639..f748079 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -31,7 +31,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -654,9 +653,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme int currentReaderOffset = 0; - AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); - for (int i = 0; i < numInputs; i++) { // ---------------- create the input readers --------------------- // in case where a logical input unions multiple physical inputs, create a union reader @@ -680,8 +676,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme throw new Exception("Illegal input group size in task configuration: " + groupSize); } - inputReaders[i].setReporter(reporter); - currentReaderOffset += groupSize; } this.inputReaders = inputReaders; @@ -1015,13 +1009,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme ClassLoader userCodeClassLoader = getUserCodeClassLoader(); - AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); - - this.accumulatorMap = accumulatorRegistry.getUserMap(); + this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap(); this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, - this.getExecutionConfig(), reporter, this.accumulatorMap); + this.getExecutionConfig(), this.accumulatorMap); } public DistributedRuntimeUDFContext createRuntimeContext(MetricGroup metrics) { @@ -1215,7 +1206,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme * @return The OutputCollector that data produced in this task is submitted to. */ public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, - List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception + List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception { if (numOutputs == 0) { return null; @@ -1248,8 +1239,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme final RecordWriter<SerializationDelegate<T>> recordWriter = new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe); - // setup live accumulator counters - recordWriter.setReporter(reporter); recordWriter.setMetricGroup(task.getEnvironment().getMetricGroup().getIOMetricGroup()); writers.add(recordWriter); @@ -1269,7 +1258,6 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme List<ChainedDriver<?, ?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig, - AccumulatorRegistry.Reporter reporter, Map<String, Accumulator<?,?>> accumulatorMap) throws Exception { @@ -1304,7 +1292,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme if (i == numChained - 1) { // last in chain, instantiate the output collector for this task - previous = getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter); + previous = getOutputCollector(containingTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs()); } ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap); @@ -1322,7 +1310,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme // else // instantiate the output collector the default way from this configuration - return getOutputCollector(containingTask , config, cl, eventualOutputs, 0, numOutputs, reporter); + return getOutputCollector(containingTask , config, cl, eventualOutputs, 0, numOutputs); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba2d007e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index eb7999c..bd052f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.reader.MutableReader; @@ -354,11 +353,6 @@ public class DataSinkTask<IT> extends AbstractInvokable { } else { throw new Exception("Illegal input group size in task configuration: " + groupSize); } - - final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); - final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); - - inputReader.setReporter(reporter); this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader()); @SuppressWarnings({ "rawtypes" })
