This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 23e48db [SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer
variables to expose correct units
23e48db is described below
commit 23e48dbf77ba59a24c7c1eaf5bb2be003c6258d0
Author: Erik Krogen <[email protected]>
AuthorDate: Sat Jul 24 21:26:18 2021 +0800
[SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer variables to
expose correct units
### What changes were proposed in this pull request?
`ExternalBlockHandler` exposes 4 metrics which are Dropwizard `Timer`
metrics, and are named with a `millis` suffix:
```
private final Timer openBlockRequestLatencyMillis = new Timer();
private final Timer registerExecutorRequestLatencyMillis = new Timer();
private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
```
However these Dropwizard Timers by default use nanoseconds
([documentation](https://metrics.dropwizard.io/3.2.3/getting-started.html#timers)).
This causes `YarnShuffleServiceMetrics` to expose confusingly-named metrics
like `openBlockRequestLatencyMillis_nanos_max` (the actual values are currently
in nanos).
This PR adds a new `Timer` subclass, `TimerWithCustomTimeUnit`, which
accepts a `TimeUnit` at creation time and exposes timing information using this
time unit when values are read. Internally, values are still stored with
nanosecond-level precision. The `Timer` metrics within `ExternalBlockHandler`
are updated to use the new class with milliseconds as the unit. The logic to
include the `nanos` suffix in the metric name within
`YarnShuffleServiceMetrics` has also been removed, with th [...]
### Does this PR introduce _any_ user-facing change?
Yes, there are two changes.
First, the names for metrics exposed by `ExternalBlockHandler` via
`YarnShuffleServiceMetrics` such as `openBlockRequestLatencyMillis_nanos_max`
and `openBlockRequestLatencyMillis_nanos_50thPercentile` have been changed to
remove the `_nanos` suffix. This would be considered a breaking change, but
these names were only exposed as part of #32388, which has not yet been
released (slated for 3.2.0). New names are like
`openBlockRequestLatencyMillis_max` and `openBlockRequestLatencyMillis [...]
Second, the values of the metrics themselves have changed, to expose
milliseconds instead of nanoseconds. Note that this does not affect metrics
such as `openBlockRequestLatencyMillis_count` or
`openBlockRequestLatencyMillis_rate1`, only the `Snapshot`-related metrics
(`max`, `median`, percentiles, etc.). For the YARN case, these metrics were
also introduced by #32388, and thus also have not yet been released. It was
possible for the nanosecond values to be consumed by some other metr [...]
### How was this patch tested?
Unit tests have been updated.
Closes #33116 from xkrogen/xkrogen-SPARK-35259-ess-fix-metric-unit-prefix.
Authored-by: Erik Krogen <[email protected]>
Signed-off-by: yi.wu <[email protected]>
(cherry picked from commit 70a15868fc97e2b86c5ecc7bcf812bfdb05d98ea)
Signed-off-by: yi.wu <[email protected]>
---
.../network/util/TimerWithCustomTimeUnit.java | 124 +++++++++++++++++++++
.../network/util/TimerWithCustomUnitSuite.java | 109 ++++++++++++++++++
.../network/shuffle/ExternalBlockHandler.java | 14 ++-
.../network/yarn/YarnShuffleServiceMetrics.java | 11 +-
.../yarn/YarnShuffleServiceMetricsSuite.scala | 18 ++-
5 files changed, 255 insertions(+), 21 deletions(-)
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java
b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java
new file mode 100644
index 0000000..86fc4f1
--- /dev/null
+++
b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
+/**
+ * A custom version of a {@link Timer} which allows for specifying a specific
{@link TimeUnit} to
+ * be used when accessing timing values via {@link #getSnapshot()}. Normally,
though the
+ * {@link #update(long, TimeUnit)} method requires a unit, the extraction
methods on the snapshot
+ * do not specify a unit, and always return nanoseconds. It can be useful to
specify that a timer
+ * should use a different unit for its snapshot. Note that internally, all
values are still stored
+ * with nanosecond-precision; it is only before being returned to the caller
that the nanosecond
+ * value is converted to the custom time unit.
+ */
+public class TimerWithCustomTimeUnit extends Timer {
+
+ private final TimeUnit timeUnit;
+ private final double nanosPerUnit;
+
+ public TimerWithCustomTimeUnit(TimeUnit timeUnit) {
+ this(timeUnit, Clock.defaultClock());
+ }
+
+ TimerWithCustomTimeUnit(TimeUnit timeUnit, Clock clock) {
+ super(new ExponentiallyDecayingReservoir(), clock);
+ this.timeUnit = timeUnit;
+ this.nanosPerUnit = timeUnit.toNanos(1);
+ }
+
+ @Override
+ public Snapshot getSnapshot() {
+ return new SnapshotWithCustomTimeUnit(super.getSnapshot());
+ }
+
+ private double toUnit(double nanos) {
+ // TimeUnit.convert() truncates (loses precision), so floating-point
division is used instead
+ return nanos / nanosPerUnit;
+ }
+
+ private long toUnit(long nanos) {
+ return timeUnit.convert(nanos, TimeUnit.NANOSECONDS);
+ }
+
+ private class SnapshotWithCustomTimeUnit extends Snapshot {
+
+ private final Snapshot wrappedSnapshot;
+
+ SnapshotWithCustomTimeUnit(Snapshot wrappedSnapshot) {
+ this.wrappedSnapshot = wrappedSnapshot;
+ }
+
+ @Override
+ public double getValue(double v) {
+ return toUnit(wrappedSnapshot.getValue(v));
+ }
+
+ @Override
+ public long[] getValues() {
+ long[] nanoValues = wrappedSnapshot.getValues();
+ long[] customUnitValues = new long[nanoValues.length];
+ for (int i = 0; i < nanoValues.length; i++) {
+ customUnitValues[i] = toUnit(nanoValues[i]);
+ }
+ return customUnitValues;
+ }
+
+ @Override
+ public int size() {
+ return wrappedSnapshot.size();
+ }
+
+ @Override
+ public long getMax() {
+ return toUnit(wrappedSnapshot.getMax());
+ }
+
+ @Override
+ public double getMean() {
+ return toUnit(wrappedSnapshot.getMean());
+ }
+
+ @Override
+ public long getMin() {
+ return toUnit(wrappedSnapshot.getMin());
+ }
+
+ @Override
+ public double getStdDev() {
+ return toUnit(wrappedSnapshot.getStdDev());
+ }
+
+ @Override
+ public void dump(OutputStream outputStream) {
+ try (PrintWriter writer = new PrintWriter(outputStream)) {
+ for (long value : getValues()) {
+ writer.println(value);
+ }
+ }
+ }
+ }
+}
diff --git
a/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java
b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java
new file mode 100644
index 0000000..1da0912
--- /dev/null
+++
b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link TimerWithCustomTimeUnit} */
+public class TimerWithCustomUnitSuite {
+
+ private static final double EPSILON = 1.0 / 1_000_000_000;
+
+ @Test
+ public void testTimerWithMillisecondTimeUnit() {
+ testTimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testTimerWithNanosecondTimeUnit() {
+ testTimerWithCustomTimeUnit(TimeUnit.NANOSECONDS);
+ }
+
+ private void testTimerWithCustomTimeUnit(TimeUnit timeUnit) {
+ Timer timer = new TimerWithCustomTimeUnit(timeUnit);
+ Duration[] durations = {
+ Duration.ofNanos(1),
+ Duration.ofMillis(1),
+ Duration.ofMillis(5),
+ Duration.ofMillis(100),
+ Duration.ofSeconds(10)
+ };
+ Arrays.stream(durations).forEach(timer::update);
+
+ Snapshot snapshot = timer.getSnapshot();
+ assertEquals(toTimeUnit(durations[0], timeUnit), snapshot.getMin());
+ assertEquals(toTimeUnitFloating(durations[0], timeUnit),
snapshot.getValue(0), EPSILON);
+ assertEquals(toTimeUnitFloating(durations[2], timeUnit),
snapshot.getMedian(), EPSILON);
+ assertEquals(toTimeUnitFloating(durations[3], timeUnit),
snapshot.get75thPercentile(), EPSILON);
+ assertEquals(toTimeUnit(durations[4], timeUnit), snapshot.getMax());
+
+ assertArrayEquals(Arrays.stream(durations).mapToLong(d -> toTimeUnit(d,
timeUnit)).toArray(),
+ snapshot.getValues());
+ double total = Arrays.stream(durations).mapToDouble(d ->
toTimeUnitFloating(d, timeUnit)).sum();
+ assertEquals(total / durations.length, snapshot.getMean(), EPSILON);
+ }
+
+ @Test
+ public void testTimingViaContext() {
+ ManualClock clock = new ManualClock();
+ Timer timer = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS, clock);
+ Duration[] durations = { Duration.ofNanos(1), Duration.ofMillis(100),
Duration.ofMillis(1000) };
+ for (Duration d : durations) {
+ Timer.Context context = timer.time();
+ clock.advance(toTimeUnit(d, TimeUnit.NANOSECONDS));
+ context.stop();
+ }
+
+ Snapshot snapshot = timer.getSnapshot();
+ assertEquals(0, snapshot.getMin());
+ assertEquals(100, snapshot.getMedian(), EPSILON);
+ assertEquals(1000, snapshot.getMax(), EPSILON);
+ }
+
+ private static long toTimeUnit(Duration duration, TimeUnit timeUnit) {
+ return timeUnit.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
+ }
+
+ private static double toTimeUnitFloating(Duration duration, TimeUnit
timeUnit) {
+ return ((double) duration.toNanos()) / timeUnit.toNanos(1);
+ }
+
+ private static class ManualClock extends Clock {
+
+ private long currTick = 1;
+
+ void advance(long nanos) {
+ currTick += nanos;
+ }
+
+ @Override
+ public long getTick() {
+ return currTick;
+ }
+ }
+}
diff --git
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 922bb96..e0f2e95 100644
---
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.codahale.metrics.Gauge;
@@ -49,6 +50,7 @@ import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.protocol.*;
+import org.apache.spark.network.util.TimerWithCustomTimeUnit;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;
@@ -299,13 +301,17 @@ public class ExternalBlockHandler extends RpcHandler
public class ShuffleMetrics implements MetricSet {
private final Map<String, Metric> allMetrics;
// Time latency for open block request in ms
- private final Timer openBlockRequestLatencyMillis = new Timer();
+ private final Timer openBlockRequestLatencyMillis =
+ new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for executor registration latency in ms
- private final Timer registerExecutorRequestLatencyMillis = new Timer();
+ private final Timer registerExecutorRequestLatencyMillis =
+ new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for processing fetch merged blocks meta request latency in
ms
- private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
+ private final Timer fetchMergedBlocksMetaLatencyMillis =
+ new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for processing finalize shuffle merge request latency in ms
- private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
+ private final Timer finalizeShuffleMergeLatencyMillis =
+ new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Block transfer rate in blocks per second
private final Meter blockTransferRate = new Meter();
// Block fetch message rate per second. When using non-batch fetches
diff --git
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
index 964d8f9..d843a67 100644
---
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
+++
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
@@ -68,7 +68,6 @@ class YarnShuffleServiceMetrics implements MetricsSource {
// Snapshot inside the Timer provides the information for the operation
delay
Timer t = (Timer) metric;
Snapshot snapshot = t.getSnapshot();
- String timingName = name + "_nanos";
metricsRecordBuilder
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of
timer " + name),
t.getCount())
@@ -84,13 +83,13 @@ class YarnShuffleServiceMetrics implements MetricsSource {
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate
of timer " + name),
t.getMeanRate())
.addGauge(
- getShuffleServiceMetricsInfoForGenericValue(timingName, "max"),
snapshot.getMax())
+ getShuffleServiceMetricsInfoForGenericValue(name, "max"),
snapshot.getMax())
.addGauge(
- getShuffleServiceMetricsInfoForGenericValue(timingName, "min"),
snapshot.getMin())
+ getShuffleServiceMetricsInfoForGenericValue(name, "min"),
snapshot.getMin())
.addGauge(
- getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"),
snapshot.getMean())
+ getShuffleServiceMetricsInfoForGenericValue(name, "mean"),
snapshot.getMean())
.addGauge(
- getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"),
snapshot.getStdDev());
+ getShuffleServiceMetricsInfoForGenericValue(name, "stdDev"),
snapshot.getStdDev());
for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950,
980, 990, 999 }) {
String percentileStr;
switch (percentileThousands) {
@@ -105,7 +104,7 @@ class YarnShuffleServiceMetrics implements MetricsSource {
break;
}
metricsRecordBuilder.addGauge(
- getShuffleServiceMetricsInfoForGenericValue(timingName,
percentileStr),
+ getShuffleServiceMetricsInfoForGenericValue(name, percentileStr),
snapshot.getValue(percentileThousands / 1000.0));
}
} else if (metric instanceof Meter) {
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
index eff2de7..f040594 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
@@ -75,23 +75,19 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite
with Matchers {
metrics.getMetrics.get(testname))
assert(counterNames === Seq(s"${testname}_count"))
+ val rates = Seq("rate1", "rate5", "rate15", "rateMean")
+ val percentiles =
+ "1stPercentile" +: Seq(5, 25, 50, 75, 95, 98, 99, 999).map(_ +
"thPercentile")
val (expectLong, expectDouble) =
if (testname.matches("blockTransfer(Message)?Rate(Bytes)?$")) {
// blockTransfer(Message)?Rate(Bytes)? metrics are Meter so just
have rate information
- (Seq(), Seq("1", "5", "15", "Mean").map(suffix =>
s"${testname}_rate$suffix"))
+ (Seq(), rates)
} else {
// other metrics are Timer so have rate and timing information
- (
- Seq(s"${testname}_nanos_max", s"${testname}_nanos_min"),
- Seq("rate1", "rate5", "rate15", "rateMean", "nanos_mean",
"nanos_stdDev",
- "nanos_1stPercentile", "nanos_5thPercentile",
"nanos_25thPercentile",
- "nanos_50thPercentile", "nanos_75thPercentile",
"nanos_95thPercentile",
- "nanos_98thPercentile", "nanos_99thPercentile",
"nanos_999thPercentile")
- .map(suffix => s"${testname}_$suffix")
- )
+ (Seq("max", "min"), rates ++ Seq("mean", "stdDev") ++ percentiles)
}
- assert(gaugeLongNames.sorted === expectLong.sorted)
- assert(gaugeDoubleNames.sorted === expectDouble.sorted)
+ assert(gaugeLongNames.sorted === expectLong.map(testname + "_" +
_).sorted)
+ assert(gaugeDoubleNames.sorted === expectDouble.map(testname + "_" +
_).sorted)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]