Repository: flink Updated Branches: refs/heads/master 365cd987c -> 47db9cb1a
[FLINK-4906] [metrics] Introduce constants for IO metrics This closes #2980. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc5dd510 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc5dd510 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc5dd510 Branch: refs/heads/master Commit: dc5dd5106738e393761a62a56d9e684c722c516f Parents: 4befbb8 Author: zentol <[email protected]> Authored: Fri Dec 9 14:10:10 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Dec 12 12:45:26 2016 +0100 ---------------------------------------------------------------------- .../webmonitor/handlers/JobDetailsHandler.java | 9 ++--- .../handlers/JobVertexDetailsHandler.java | 9 ++--- .../handlers/JobVertexTaskManagersHandler.java | 9 ++--- .../SubtaskExecutionAttemptDetailsHandler.java | 9 ++--- .../flink/runtime/metrics/MetricNames.java | 38 ++++++++++++++++++++ .../metrics/groups/OperatorIOMetricGroup.java | 9 ++--- .../metrics/groups/TaskIOMetricGroup.java | 21 +++++------ 7 files changed, 74 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 6de6dc5..35e6ca7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.MetricStore; @@ -161,10 +162,10 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { fetcher.update(); MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex()); if (metrics != null) { - numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); - numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0")); - numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0")); - numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); + numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); + numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); + numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index 14dcd39..32626ba 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; @@ -99,10 +100,10 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { fetcher.update(); MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex()); if (metrics != null) { - numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); - numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0")); - numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0")); - numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); + numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); + numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); + numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index c1fabf8..f468d35 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; @@ -120,10 +121,10 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle fetcher.update(); MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), vertex.getParallelSubtaskIndex()); if (metrics != null) { - numBytesIn += Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); - numBytesOut += Long.valueOf(metrics.getMetric("numBytesOut", "0")); - numRecordsIn += Long.valueOf(metrics.getMetric("numRecordsIn", "0")); - numRecordsOut += Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); + numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); + numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); + numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index ca9c7ad..da8db02 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; @@ -86,10 +87,10 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp fetcher.update(); MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), execAttempt.getParallelSubtaskIndex()); if (metrics != null) { - numBytesIn = Long.valueOf(metrics.getMetric("numBytesInLocal", "0")) + Long.valueOf(metrics.getMetric("numBytesInRemote", "0")); - numBytesOut = Long.valueOf(metrics.getMetric("numBytesOut", "0")); - numRecordsIn = Long.valueOf(metrics.getMetric("numRecordsIn", "0")); - numRecordsOut = Long.valueOf(metrics.getMetric("numRecordsOut", "0")); + numBytesIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); + numBytesOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); + numRecordsIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); + numRecordsOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java new file mode 100644 index 0000000..9202ca1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics; + +public class MetricNames { + private MetricNames() { + } + + private static final String SUFFIX_RATE = "PerSecond"; + + public static final String IO_NUM_RECORDS_IN = "numRecordsIn"; + public static final String IO_NUM_RECORDS_OUT = "numRecordsOut"; + public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + SUFFIX_RATE; + public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT + SUFFIX_RATE; + + public static final String IO_NUM_BYTES_IN = "numBytesIn"; + public static final String IO_NUM_BYTES_IN_LOCAL = IO_NUM_BYTES_IN + "Local"; + public static final String IO_NUM_BYTES_IN_REMOTE = IO_NUM_BYTES_IN + "Remote"; + public static final String IO_NUM_BYTES_OUT = "numBytesOut"; + public static final String IO_NUM_BYTES_IN_LOCAL_RATE = IO_NUM_BYTES_IN_LOCAL + SUFFIX_RATE; + public static final String IO_NUM_BYTES_IN_REMOTE_RATE = IO_NUM_BYTES_IN_REMOTE + SUFFIX_RATE; + public static final String IO_NUM_BYTES_OUT_RATE = IO_NUM_BYTES_OUT + SUFFIX_RATE; +} http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java index 2e321fe..5bf7d1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.metrics.MetricNames; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -35,10 +36,10 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> public OperatorIOMetricGroup(OperatorMetricGroup parentMetricGroup) { super(parentMetricGroup); - numRecordsIn = parentMetricGroup.counter("numRecordsIn"); - numRecordsOut = parentMetricGroup.counter("numRecordsOut"); - numRecordsInRate = parentMetricGroup.meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60)); - numRecordsOutRate = parentMetricGroup.meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60)); + numRecordsIn = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_IN); + numRecordsOut = parentMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT); + numRecordsInRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60)); + numRecordsOutRate = parentMetricGroup.meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60)); } public Counter getNumRecordsInCounter() { http://git-wip-us.apache.org/repos/asf/flink/blob/dc5dd510/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index c5296fb..fcea098 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.executiongraph.IOMetrics; @@ -53,16 +54,16 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); - this.numBytesOut = counter("numBytesOut"); - this.numBytesInLocal = counter("numBytesInLocal"); - this.numBytesInRemote = counter("numBytesInRemote"); - this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60)); - this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60)); - this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60)); - this.numRecordsIn = counter("numRecordsIn", new SumCounter()); - this.numRecordsOut = counter("numRecordsOut", new SumCounter()); - this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60)); - this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60)); + this.numBytesOut = counter(MetricNames.IO_NUM_BYTES_OUT); + this.numBytesInLocal = counter(MetricNames.IO_NUM_BYTES_IN_LOCAL); + this.numBytesInRemote = counter(MetricNames.IO_NUM_BYTES_IN_REMOTE); + this.numBytesOutRate = meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut, 60)); + this.numBytesInRateLocal = meter(MetricNames.IO_NUM_BYTES_IN_LOCAL_RATE, new MeterView(numBytesInLocal, 60)); + this.numBytesInRateRemote = meter(MetricNames.IO_NUM_BYTES_IN_REMOTE_RATE, new MeterView(numBytesInRemote, 60)); + this.numRecordsIn = counter(MetricNames.IO_NUM_RECORDS_IN, new SumCounter()); + this.numRecordsOut = counter(MetricNames.IO_NUM_RECORDS_OUT, new SumCounter()); + this.numRecordsInRate = meter(MetricNames.IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60)); + this.numRecordsOutRate = meter(MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60)); } public IOMetrics createSnapshot() {
