This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 23e48db  [SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer 
variables to expose correct units
23e48db is described below

commit 23e48dbf77ba59a24c7c1eaf5bb2be003c6258d0
Author: Erik Krogen <[email protected]>
AuthorDate: Sat Jul 24 21:26:18 2021 +0800

    [SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer variables to 
expose correct units
    
    ### What changes were proposed in this pull request?
    `ExternalBlockHandler` exposes 4 metrics which are Dropwizard `Timer` 
metrics, and are named with a `millis` suffix:
    ```
        private final Timer openBlockRequestLatencyMillis = new Timer();
        private final Timer registerExecutorRequestLatencyMillis = new Timer();
        private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
        private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
    ```
    However these Dropwizard Timers by default use nanoseconds 
([documentation](https://metrics.dropwizard.io/3.2.3/getting-started.html#timers)).
    
    This causes `YarnShuffleServiceMetrics` to expose confusingly-named metrics 
like `openBlockRequestLatencyMillis_nanos_max` (the actual values are currently 
in nanos).
    
    This PR adds a new `Timer` subclass, `TimerWithCustomTimeUnit`, which 
accepts a `TimeUnit` at creation time and exposes timing information using this 
time unit when values are read. Internally, values are still stored with 
nanosecond-level precision. The `Timer` metrics within `ExternalBlockHandler` 
are updated to use the new class with milliseconds as the unit. The logic to 
include the `nanos` suffix in the metric name within 
`YarnShuffleServiceMetrics` has also been removed, with th [...]
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, there are two changes.
    First, the names for metrics exposed by `ExternalBlockHandler` via 
`YarnShuffleServiceMetrics` such as `openBlockRequestLatencyMillis_nanos_max` 
and `openBlockRequestLatencyMillis_nanos_50thPercentile` have been changed to 
remove the `_nanos` suffix. This would be considered a breaking change, but 
these names were only exposed as part of #32388, which has not yet been 
released (slated for 3.2.0). New names are like 
`openBlockRequestLatencyMillis_max` and `openBlockRequestLatencyMillis [...]
    Second, the values of the metrics themselves have changed, to expose 
milliseconds instead of nanoseconds. Note that this does not affect metrics 
such as `openBlockRequestLatencyMillis_count` or 
`openBlockRequestLatencyMillis_rate1`, only the `Snapshot`-related metrics 
(`max`, `median`, percentiles, etc.). For the YARN case, these metrics were 
also introduced by #32388, and thus also have not yet been released. It was 
possible for the nanosecond values to be consumed by some other metr [...]
    
    ### How was this patch tested?
    Unit tests have been updated.
    
    Closes #33116 from xkrogen/xkrogen-SPARK-35259-ess-fix-metric-unit-prefix.
    
    Authored-by: Erik Krogen <[email protected]>
    Signed-off-by: yi.wu <[email protected]>
    (cherry picked from commit 70a15868fc97e2b86c5ecc7bcf812bfdb05d98ea)
    Signed-off-by: yi.wu <[email protected]>
---
 .../network/util/TimerWithCustomTimeUnit.java      | 124 +++++++++++++++++++++
 .../network/util/TimerWithCustomUnitSuite.java     | 109 ++++++++++++++++++
 .../network/shuffle/ExternalBlockHandler.java      |  14 ++-
 .../network/yarn/YarnShuffleServiceMetrics.java    |  11 +-
 .../yarn/YarnShuffleServiceMetricsSuite.scala      |  18 ++-
 5 files changed, 255 insertions(+), 21 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java
new file mode 100644
index 0000000..86fc4f1
--- /dev/null
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
+/**
+ * A custom version of a {@link Timer} which allows for specifying a specific 
{@link TimeUnit} to
+ * be used when accessing timing values via {@link #getSnapshot()}. Normally, 
though the
+ * {@link #update(long, TimeUnit)} method requires a unit, the extraction 
methods on the snapshot
+ * do not specify a unit, and always return nanoseconds. It can be useful to 
specify that a timer
+ * should use a different unit for its snapshot. Note that internally, all 
values are still stored
+ * with nanosecond-precision; it is only before being returned to the caller 
that the nanosecond
+ * value is converted to the custom time unit.
+ */
+public class TimerWithCustomTimeUnit extends Timer {
+
+  private final TimeUnit timeUnit;
+  private final double nanosPerUnit;
+
+  public TimerWithCustomTimeUnit(TimeUnit timeUnit) {
+    this(timeUnit, Clock.defaultClock());
+  }
+
+  TimerWithCustomTimeUnit(TimeUnit timeUnit, Clock clock) {
+    super(new ExponentiallyDecayingReservoir(), clock);
+    this.timeUnit = timeUnit;
+    this.nanosPerUnit = timeUnit.toNanos(1);
+  }
+
+  @Override
+  public Snapshot getSnapshot() {
+    return new SnapshotWithCustomTimeUnit(super.getSnapshot());
+  }
+
+  private double toUnit(double nanos) {
+    // TimeUnit.convert() truncates (loses precision), so floating-point 
division is used instead
+    return nanos / nanosPerUnit;
+  }
+
+  private long toUnit(long nanos) {
+    return timeUnit.convert(nanos, TimeUnit.NANOSECONDS);
+  }
+
+  private class SnapshotWithCustomTimeUnit extends Snapshot {
+
+    private final Snapshot wrappedSnapshot;
+
+    SnapshotWithCustomTimeUnit(Snapshot wrappedSnapshot) {
+      this.wrappedSnapshot = wrappedSnapshot;
+    }
+
+    @Override
+    public double getValue(double v) {
+      return toUnit(wrappedSnapshot.getValue(v));
+    }
+
+    @Override
+    public long[] getValues() {
+      long[] nanoValues = wrappedSnapshot.getValues();
+      long[] customUnitValues = new long[nanoValues.length];
+      for (int i = 0; i < nanoValues.length; i++) {
+        customUnitValues[i] = toUnit(nanoValues[i]);
+      }
+      return customUnitValues;
+    }
+
+    @Override
+    public int size() {
+      return wrappedSnapshot.size();
+    }
+
+    @Override
+    public long getMax() {
+      return toUnit(wrappedSnapshot.getMax());
+    }
+
+    @Override
+    public double getMean() {
+      return toUnit(wrappedSnapshot.getMean());
+    }
+
+    @Override
+    public long getMin() {
+      return toUnit(wrappedSnapshot.getMin());
+    }
+
+    @Override
+    public double getStdDev() {
+      return toUnit(wrappedSnapshot.getStdDev());
+    }
+
+    @Override
+    public void dump(OutputStream outputStream) {
+      try (PrintWriter writer = new PrintWriter(outputStream)) {
+        for (long value : getValues()) {
+          writer.println(value);
+        }
+      }
+    }
+  }
+}
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java
new file mode 100644
index 0000000..1da0912
--- /dev/null
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link TimerWithCustomTimeUnit} */
+public class TimerWithCustomUnitSuite {
+
+  private static final double EPSILON = 1.0 / 1_000_000_000;
+
+  @Test
+  public void testTimerWithMillisecondTimeUnit() {
+    testTimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
+  }
+
+  @Test
+  public void testTimerWithNanosecondTimeUnit() {
+    testTimerWithCustomTimeUnit(TimeUnit.NANOSECONDS);
+  }
+
+  private void testTimerWithCustomTimeUnit(TimeUnit timeUnit) {
+    Timer timer = new TimerWithCustomTimeUnit(timeUnit);
+    Duration[] durations = {
+        Duration.ofNanos(1),
+        Duration.ofMillis(1),
+        Duration.ofMillis(5),
+        Duration.ofMillis(100),
+        Duration.ofSeconds(10)
+    };
+    Arrays.stream(durations).forEach(timer::update);
+
+    Snapshot snapshot = timer.getSnapshot();
+    assertEquals(toTimeUnit(durations[0], timeUnit), snapshot.getMin());
+    assertEquals(toTimeUnitFloating(durations[0], timeUnit), 
snapshot.getValue(0), EPSILON);
+    assertEquals(toTimeUnitFloating(durations[2], timeUnit), 
snapshot.getMedian(), EPSILON);
+    assertEquals(toTimeUnitFloating(durations[3], timeUnit), 
snapshot.get75thPercentile(), EPSILON);
+    assertEquals(toTimeUnit(durations[4], timeUnit), snapshot.getMax());
+
+    assertArrayEquals(Arrays.stream(durations).mapToLong(d -> toTimeUnit(d, 
timeUnit)).toArray(),
+        snapshot.getValues());
+    double total = Arrays.stream(durations).mapToDouble(d -> 
toTimeUnitFloating(d, timeUnit)).sum();
+    assertEquals(total / durations.length, snapshot.getMean(), EPSILON);
+  }
+
+  @Test
+  public void testTimingViaContext() {
+    ManualClock clock = new ManualClock();
+    Timer timer = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS, clock);
+    Duration[] durations = { Duration.ofNanos(1), Duration.ofMillis(100), 
Duration.ofMillis(1000) };
+    for (Duration d : durations) {
+      Timer.Context context = timer.time();
+      clock.advance(toTimeUnit(d, TimeUnit.NANOSECONDS));
+      context.stop();
+    }
+
+    Snapshot snapshot = timer.getSnapshot();
+    assertEquals(0, snapshot.getMin());
+    assertEquals(100, snapshot.getMedian(), EPSILON);
+    assertEquals(1000, snapshot.getMax(), EPSILON);
+  }
+
+  private static long toTimeUnit(Duration duration, TimeUnit timeUnit) {
+    return timeUnit.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
+  }
+
+  private static double toTimeUnitFloating(Duration duration, TimeUnit 
timeUnit) {
+    return ((double) duration.toNanos()) / timeUnit.toNanos(1);
+  }
+
+  private static class ManualClock extends Clock {
+
+    private long currTick = 1;
+
+    void advance(long nanos) {
+      currTick += nanos;
+    }
+
+    @Override
+    public long getTick() {
+      return currTick;
+    }
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 922bb96..e0f2e95 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import com.codahale.metrics.Gauge;
@@ -49,6 +50,7 @@ import org.apache.spark.network.server.OneForOneStreamManager;
 import org.apache.spark.network.server.RpcHandler;
 import org.apache.spark.network.server.StreamManager;
 import org.apache.spark.network.shuffle.protocol.*;
+import org.apache.spark.network.util.TimerWithCustomTimeUnit;
 import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
 import org.apache.spark.network.util.TransportConf;
 
@@ -299,13 +301,17 @@ public class ExternalBlockHandler extends RpcHandler
   public class ShuffleMetrics implements MetricSet {
     private final Map<String, Metric> allMetrics;
     // Time latency for open block request in ms
-    private final Timer openBlockRequestLatencyMillis = new Timer();
+    private final Timer openBlockRequestLatencyMillis =
+        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
     // Time latency for executor registration latency in ms
-    private final Timer registerExecutorRequestLatencyMillis = new Timer();
+    private final Timer registerExecutorRequestLatencyMillis =
+        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
     // Time latency for processing fetch merged blocks meta request latency in 
ms
-    private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
+    private final Timer fetchMergedBlocksMetaLatencyMillis =
+        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
     // Time latency for processing finalize shuffle merge request latency in ms
-    private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
+    private final Timer finalizeShuffleMergeLatencyMillis =
+        new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
     // Block transfer rate in blocks per second
     private final Meter blockTransferRate = new Meter();
     // Block fetch message rate per second. When using non-batch fetches
diff --git 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
index 964d8f9..d843a67 100644
--- 
a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
+++ 
b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java
@@ -68,7 +68,6 @@ class YarnShuffleServiceMetrics implements MetricsSource {
       // Snapshot inside the Timer provides the information for the operation 
delay
       Timer t = (Timer) metric;
       Snapshot snapshot = t.getSnapshot();
-      String timingName = name + "_nanos";
       metricsRecordBuilder
         .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of 
timer " + name),
           t.getCount())
@@ -84,13 +83,13 @@ class YarnShuffleServiceMetrics implements MetricsSource {
         .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate 
of timer " + name),
           t.getMeanRate())
         .addGauge(
-          getShuffleServiceMetricsInfoForGenericValue(timingName, "max"), 
snapshot.getMax())
+          getShuffleServiceMetricsInfoForGenericValue(name, "max"), 
snapshot.getMax())
         .addGauge(
-          getShuffleServiceMetricsInfoForGenericValue(timingName, "min"), 
snapshot.getMin())
+          getShuffleServiceMetricsInfoForGenericValue(name, "min"), 
snapshot.getMin())
         .addGauge(
-          getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"), 
snapshot.getMean())
+          getShuffleServiceMetricsInfoForGenericValue(name, "mean"), 
snapshot.getMean())
         .addGauge(
-          getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"), 
snapshot.getStdDev());
+          getShuffleServiceMetricsInfoForGenericValue(name, "stdDev"), 
snapshot.getStdDev());
       for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 
980, 990, 999 }) {
         String percentileStr;
         switch (percentileThousands) {
@@ -105,7 +104,7 @@ class YarnShuffleServiceMetrics implements MetricsSource {
             break;
         }
         metricsRecordBuilder.addGauge(
-          getShuffleServiceMetricsInfoForGenericValue(timingName, 
percentileStr),
+          getShuffleServiceMetricsInfoForGenericValue(name, percentileStr),
           snapshot.getValue(percentileThousands / 1000.0));
       }
     } else if (metric instanceof Meter) {
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
index eff2de7..f040594 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
@@ -75,23 +75,19 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite 
with Matchers {
         metrics.getMetrics.get(testname))
 
       assert(counterNames === Seq(s"${testname}_count"))
+      val rates = Seq("rate1", "rate5", "rate15", "rateMean")
+      val percentiles =
+        "1stPercentile" +: Seq(5, 25, 50, 75, 95, 98, 99, 999).map(_ + 
"thPercentile")
       val (expectLong, expectDouble) =
         if (testname.matches("blockTransfer(Message)?Rate(Bytes)?$")) {
           // blockTransfer(Message)?Rate(Bytes)? metrics are Meter so just 
have rate information
-          (Seq(), Seq("1", "5", "15", "Mean").map(suffix => 
s"${testname}_rate$suffix"))
+          (Seq(), rates)
         } else {
           // other metrics are Timer so have rate and timing information
-          (
-              Seq(s"${testname}_nanos_max", s"${testname}_nanos_min"),
-              Seq("rate1", "rate5", "rate15", "rateMean", "nanos_mean", 
"nanos_stdDev",
-                "nanos_1stPercentile", "nanos_5thPercentile", 
"nanos_25thPercentile",
-                "nanos_50thPercentile", "nanos_75thPercentile", 
"nanos_95thPercentile",
-                "nanos_98thPercentile", "nanos_99thPercentile", 
"nanos_999thPercentile")
-                  .map(suffix => s"${testname}_$suffix")
-          )
+          (Seq("max", "min"), rates ++ Seq("mean", "stdDev") ++ percentiles)
         }
-      assert(gaugeLongNames.sorted === expectLong.sorted)
-      assert(gaugeDoubleNames.sorted === expectDouble.sorted)
+      assert(gaugeLongNames.sorted === expectLong.map(testname + "_" + 
_).sorted)
+      assert(gaugeDoubleNames.sorted === expectDouble.map(testname + "_" + 
_).sorted)
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to