This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51b0df658864fc4caa0607fe82ce26f90c1a67ee
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Apr 25 15:38:11 2023 +0800

    [FLINK-32640][Network] Measure backpressure time for legacy hybrid shuffle.
---
 .../network/partition/hybrid/HsMemoryDataManager.java | 19 ++++++++++++++++++-
 .../io/network/partition/hybrid/HsOutputMetrics.java  | 11 ++++++++++-
 .../network/partition/hybrid/HsResultPartition.java   |  6 +++++-
 .../partition/hybrid/HybridShuffleTestUtils.java      |  3 ++-
 4 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 387114dbf81..ac374ea27c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision;
+import org.apache.flink.runtime.metrics.TimerGauge;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -92,6 +93,12 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger poolSize;
 
+    /**
+     * If task thread blocked on request buffer from buffer pool, this metric 
should be updated.
+     * Pre-create it to avoid checkNotNull in hot-path for performance purpose.
+     */
+    private TimerGauge hardBackPressuredTimePerSecond = new TimerGauge();
+
     public HsMemoryDataManager(
             int numSubpartitions,
             int bufferSize,
@@ -200,6 +207,7 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
         for (int i = 0; i < numSubpartitions; i++) {
             getSubpartitionMemoryDataManager(i).setOutputMetrics(metrics);
         }
+        this.hardBackPressuredTimePerSecond = 
metrics.getHardBackpressureTimerGauge();
     }
 
     // ------------------------------------
@@ -263,7 +271,16 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
 
     @Override
     public BufferBuilder requestBufferFromPool() throws InterruptedException {
-        MemorySegment segment = bufferPool.requestMemorySegmentBlocking();
+        MemorySegment segment = bufferPool.requestMemorySegment();
+
+        if (segment == null) {
+            // only when the buffer is not acquired immediately, it is 
requested in blocking mode,
+            // which will make the calculation of backpressure more accurate.
+            hardBackPressuredTimePerSecond.markStart();
+            segment = bufferPool.requestMemorySegmentBlocking();
+            hardBackPressuredTimePerSecond.markEnd();
+        }
+
         Optional<Decision> decisionOpt =
                 spillStrategy.onMemoryUsageChanged(
                         numRequestedBuffers.incrementAndGet(), getPoolSize());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java
index 6ffb1f03d01..ecedd1d2ebd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java
@@ -19,15 +19,20 @@
 package org.apache.flink.runtime.io.network.partition.hybrid;
 
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.metrics.TimerGauge;
 
 /** All metrics that {@link HsResultPartition} needs to count, except 
numBytesProduced. */
 public class HsOutputMetrics {
     private final Counter numBytesOut;
     private final Counter numBuffersOut;
 
-    public HsOutputMetrics(Counter numBytesOut, Counter numBuffersOut) {
+    private final TimerGauge hardBackpressureTimerGauge;
+
+    public HsOutputMetrics(
+            Counter numBytesOut, Counter numBuffersOut, TimerGauge 
hardBackpressureTimerGauge) {
         this.numBytesOut = numBytesOut;
         this.numBuffersOut = numBuffersOut;
+        this.hardBackpressureTimerGauge = hardBackpressureTimerGauge;
     }
 
     public Counter getNumBytesOut() {
@@ -37,4 +42,8 @@ public class HsOutputMetrics {
     public Counter getNumBuffersOut() {
         return numBuffersOut;
     }
+
+    public TimerGauge getHardBackpressureTimerGauge() {
+        return hardBackpressureTimerGauge;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
index c0298325caf..e0dca965666 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java
@@ -154,7 +154,11 @@ public class HsResultPartition extends ResultPartition {
     public void setMetricGroup(TaskIOMetricGroup metrics) {
         super.setMetricGroup(metrics);
         checkNotNull(memoryDataManager)
-                .setOutputMetrics(new HsOutputMetrics(numBytesOut, 
numBuffersOut));
+                .setOutputMetrics(
+                        new HsOutputMetrics(
+                                numBytesOut,
+                                numBuffersOut,
+                                metrics.getHardBackPressuredTimePerSecond()));
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
index 5b0629bd1a5..d9abab0b139 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl.
 import 
org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex;
+import org.apache.flink.runtime.metrics.TimerGauge;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -74,7 +75,7 @@ public class HybridShuffleTestUtils {
     }
 
     public static HsOutputMetrics createTestingOutputMetrics() {
-        return new HsOutputMetrics(new TestCounter(), new TestCounter());
+        return new HsOutputMetrics(new TestCounter(), new TestCounter(), new 
TimerGauge());
     }
 
     public static TestingFileDataIndexRegion createSingleTestRegion(

Reply via email to