[FLINK-4733] Reuse operator IO metrics for task
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f4f6f92 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f4f6f92 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f4f6f92 Branch: refs/heads/master Commit: 1f4f6f928cb71fcbcba8d5aca731e738352d1175 Parents: 99f1dc3 Author: zentol <[email protected]> Authored: Mon Oct 31 14:08:26 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Oct 31 15:12:02 2016 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/IOMetrics.java | 99 ++++++++++++++++++++ .../metrics/groups/OperatorIOMetricGroup.java | 17 ++++ .../metrics/groups/TaskIOMetricGroup.java | 67 ++++++++++++- .../flink/runtime/operators/BatchTask.java | 8 ++ .../flink/runtime/operators/DataSinkTask.java | 2 + .../flink/runtime/operators/DataSourceTask.java | 4 + .../operators/chaining/ChainedDriver.java | 4 + .../metrics/groups/TaskIOMetricGroupTest.java | 61 ++++++++++++ .../flink/streaming/api/graph/StreamConfig.java | 9 ++ .../api/graph/StreamingJobGraphGenerator.java | 3 + .../api/operators/AbstractStreamOperator.java | 6 ++ 11 files changed, 278 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java new file mode 100644 index 0000000..15c54b4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.Meter; + +import java.io.Serializable; + +/** + * An instance of this class represents a snapshot of the io-related metrics of a single task. + */ +public class IOMetrics implements Serializable { + private static final long serialVersionUID = -7208093607556457183L; + private final long numRecordsIn; + private final long numRecordsOut; + + private final double numRecordsInPerSecond; + private final double numRecordsOutPerSecond; + + private final long numBytesInLocal; + private final long numBytesInRemote; + private final long numBytesOut; + + private final double numBytesInLocalPerSecond; + private final double numBytesInRemotePerSecond; + private final double numBytesOutPerSecond; + + public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { + this.numRecordsIn = recordsIn.getCount(); + this.numRecordsInPerSecond = recordsIn.getRate(); + this.numRecordsOut = recordsOut.getCount(); + this.numRecordsOutPerSecond = recordsOut.getRate(); + this.numBytesInLocal = bytesLocalIn.getCount(); + this.numBytesInLocalPerSecond = bytesLocalIn.getRate(); + this.numBytesInRemote = bytesRemoteIn.getCount(); + this.numBytesInRemotePerSecond = bytesRemoteIn.getRate(); + this.numBytesOut = bytesOut.getCount(); + this.numBytesOutPerSecond = bytesOut.getRate(); + } + + public long getNumRecordsIn() { + return numRecordsIn; + } + + public long getNumRecordsOut() { + return numRecordsOut; + } + + public long getNumBytesInLocal() { + return numBytesInLocal; + } + + public long getNumBytesInRemote() { + return numBytesInRemote; + } + + public long getNumBytesInTotal() { + return numBytesInLocal + numBytesInRemote; + } + + public long getNumBytesOut() { + return numBytesOut; + } + + public double getNumRecordsInPerSecond() { + return numRecordsInPerSecond; + } + + public double getNumRecordsOutPerSecond() { + return numRecordsOutPerSecond; + } + + public double getNumBytesInLocalPerSecond() { + return numBytesInLocalPerSecond; + } + + public double getNumBytesInRemotePerSecond() { + return numBytesInRemotePerSecond; + } + + public double getNumBytesOutPerSecond() { + return numBytesOutPerSecond; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java index 32611fd..2e321fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorIOMetricGroup.java @@ -56,4 +56,21 @@ public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> public Meter getNumRecordsOutRate() { return numRecordsOutRate; } + + /** + * Causes the containing task to use this operators input record counter. + */ + public void reuseInputMetricsForTask() { + TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup(); + taskIO.reuseRecordsInputCounter(this.numRecordsIn); + + } + + /** + * Causes the containing task to use this operators output record counter. + */ + public void reuseOutputMetricsForTask() { + TaskIOMetricGroup taskIO = parentMetricGroup.parent().getIOMetricGroup(); + taskIO.reuseRecordsOutputCounter(this.numRecordsOut); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index b2884ec..4e32563 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -23,9 +23,14 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.executiongraph.IOMetrics; + +import java.util.ArrayList; +import java.util.List; /** * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is @@ -36,10 +41,14 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { private final Counter numBytesOut; private final Counter numBytesInLocal; private final Counter numBytesInRemote; + private final SumCounter numRecordsIn; + private final SumCounter numRecordsOut; private final Meter numBytesInRateLocal; private final Meter numBytesInRateRemote; private final Meter numBytesOutRate; + private final Meter numRecordsInRate; + private final Meter numRecordsOutRate; private final MetricGroup buffers; @@ -52,10 +61,21 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { this.numBytesOutRate = meter("numBytesOutPerSecond", new MeterView(numBytesOut, 60)); this.numBytesInRateLocal = meter("numBytesInLocalPerSecond", new MeterView(numBytesInLocal, 60)); this.numBytesInRateRemote = meter("numBytesInRemotePerSecond", new MeterView(numBytesInRemote, 60)); + this.numRecordsIn = counter("numRecordsIn", new SumCounter()); + this.numRecordsOut = counter("numRecordsOut", new SumCounter()); + this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60)); + this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60)); this.buffers = addGroup("buffers"); } + public IOMetrics createSnapshot() { + return new IOMetrics(numRecordsInRate, numRecordsOutRate, numBytesInRateLocal, numBytesInRateRemote, numBytesOutRate); + } + + // ============================================================================================ + // Getters + // ============================================================================================ public Counter getNumBytesOutCounter() { return numBytesOut; } @@ -68,11 +88,19 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { return numBytesInRemote; } - public Meter getNumBytesInRateLocalMeter() { + public Counter getNumRecordsInCounter() { + return numRecordsIn; + } + + public Counter getNumRecordsOutCounter() { + return numRecordsOut; + } + + public Meter getNumBytesInLocalRateMeter() { return numBytesInRateLocal; } - public Meter getNumBytesInRateRemoteMeter() { + public Meter getNumBytesInRemoteRateMeter() { return numBytesInRateRemote; } @@ -134,6 +162,41 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { } } + // ============================================================================================ + // Metric Reuse + // ============================================================================================ + public void reuseRecordsInputCounter(Counter numRecordsInCounter) { + this.numRecordsIn.addCounter(numRecordsInCounter); + } + + public void reuseRecordsOutputCounter(Counter numRecordsOutCounter) { + this.numRecordsOut.addCounter(numRecordsOutCounter); + } + + /** + * A {@link SimpleCounter} that can contain other {@link Counter}s. A call to {@link SumCounter#getCount()} returns + * the sum of this counters and all contained counters. + */ + private static class SumCounter extends SimpleCounter { + private final List<Counter> internalCounters = new ArrayList<>(); + + SumCounter() { + } + + public void addCounter(Counter toAdd) { + internalCounters.add(toAdd); + } + + @Override + public long getCount() { + long sum = super.getCount(); + for (Counter counter : internalCounters) { + sum += counter.getCount(); + } + return sum; + } + } + /** * Input buffer pool usage gauge of a task */ http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index 354dbac..e896639 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -243,6 +243,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme String headName = getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim(); this.metrics = getEnvironment().getMetricGroup() .addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName); + this.metrics.getIOMetricGroup().reuseInputMetricsForTask(); + if (config.getNumberOfChainedStubs() == 0) { + this.metrics.getIOMetricGroup().reuseOutputMetricsForTask(); + } // initialize the readers. // this does not yet trigger any stream consuming or processing. @@ -1306,6 +1310,10 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme ct.setup(chainedStubConf, taskName, previous, containingTask, cl, executionConfig, accumulatorMap); chainedTasksTarget.add(0, ct); + if (i == numChained - 1) { + ct.getIOMetrics().reuseOutputMetricsForTask(); + } + previous = ct; } // the collector of the first in the chain is the collector for the task http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 4626b69..eb7999c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -109,6 +109,8 @@ public class DataSinkTask<IT> extends AbstractInvokable { RuntimeContext ctx = createRuntimeContext(); final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask(); + ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask(); if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){ ((RichOutputFormat) this.format).setRuntimeContext(ctx); http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 4dc3ef5..e89559d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -103,7 +103,11 @@ public class DataSourceTask<OT> extends AbstractInvokable { RuntimeContext ctx = createRuntimeContext(); Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed"); + ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask(); Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter(); + if (this.config.getNumberOfChainedStubs() == 0) { + ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask(); + } if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(ctx); http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index cf62dfa..442a53c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext; @@ -104,6 +105,9 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> { @Override public abstract void collect(IT record); + public OperatorIOMetricGroup getIOMetrics() { + return this.metrics.getIOMetricGroup(); + } protected RuntimeContext getUdfRuntimeContext() { return this.udfContext; http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java new file mode 100644 index 0000000..564a518 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TaskIOMetricGroupTest { + @Test + public void testTaskIOMetricGroup() { + TaskMetricGroup task = new UnregisteredTaskMetricsGroup(); + TaskIOMetricGroup taskIO = task.getIOMetricGroup(); + + // test counter forwarding + assertNotNull(taskIO.getNumRecordsInCounter()); + assertNotNull(taskIO.getNumRecordsOutCounter()); + + Counter c1 = new SimpleCounter(); + c1.inc(32L); + Counter c2 = new SimpleCounter(); + c2.inc(64L); + + taskIO.reuseRecordsInputCounter(c1); + taskIO.reuseRecordsOutputCounter(c2); + assertEquals(32L, taskIO.getNumRecordsInCounter().getCount()); + assertEquals(64L, taskIO.getNumRecordsOutCounter().getCount()); + + // test IOMetrics instantiation + taskIO.getNumBytesInLocalCounter().inc(100L); + taskIO.getNumBytesInRemoteCounter().inc(150L); + taskIO.getNumBytesOutCounter().inc(250L); + + IOMetrics io = taskIO.createSnapshot(); + assertEquals(32L, io.getNumRecordsIn()); + assertEquals(64L, io.getNumRecordsOut()); + assertEquals(100L, io.getNumBytesInLocal()); + assertEquals(150L, io.getNumBytesInRemote()); + assertEquals(250L, io.getNumBytesOut()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 2d38fb9..dd4c55c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -69,6 +69,7 @@ public class StreamConfig implements Serializable { private static final String OUT_STREAM_EDGES = "outStreamEdges"; private static final String IN_STREAM_EDGES = "inStreamEdges"; private static final String OPERATOR_NAME = "operatorName"; + private static final String CHAIN_END = "chainEnd"; private static final String CHECKPOINTING_ENABLED = "checkpointing"; private static final String CHECKPOINT_MODE = "checkpointMode"; @@ -478,6 +479,14 @@ public class StreamConfig implements Serializable { return config.getBoolean(IS_CHAINED_VERTEX, false); } + public void setChainEnd() { + config.setBoolean(CHAIN_END, true); + } + + public boolean isChainEnd() { + return config.getBoolean(CHAIN_END, false); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 87fd7eb..8f9da8a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -231,6 +231,9 @@ public class StreamingJobGraphGenerator { config.setChainIndex(chainIndex); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); chainedConfigs.get(startNodeId).put(currentNodeId, config); + if (chainableOutputs.isEmpty()) { + config.setChainEnd(); + } } return transitiveOutEdges; http://git-wip-us.apache.org/repos/asf/flink/blob/1f4f6f92/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 5f0dd85..a659866 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -163,6 +163,12 @@ public abstract class AbstractStreamOperator<OUT> this.metrics = container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName()); this.output = new CountingOutput(output, ((OperatorMetricGroup) this.metrics).getIOMetricGroup().getNumRecordsOutCounter()); + if (config.isChainStart()) { + ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseInputMetricsForTask(); + } + if (config.isChainEnd()) { + ((OperatorMetricGroup) this.metrics).getIOMetricGroup().reuseOutputMetricsForTask(); + } Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration(); int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE); if (historySize <= 0) {
