This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eab99b6de820352490cce580fee2e7f31ef16617
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Jul 21 11:43:16 2023 +0800

    [FLINK-32640][Network] Measure backpressure time for tiered storage hybrid 
shuffle.
---
 .../tiered/shuffle/TieredResultPartition.java      |  1 +
 .../tiered/storage/TieredStorageMemoryManager.java |  8 ++++++
 .../storage/TieredStorageMemoryManagerImpl.java    | 30 ++++++++++++++++++----
 .../tiered/TestingTieredStorageMemoryManager.java  | 13 ++++++++++
 4 files changed, 47 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
index f5601fbd0ae..d3aae631620 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java
@@ -122,6 +122,7 @@ public class TieredResultPartition extends ResultPartition {
     @Override
     public void setMetricGroup(TaskIOMetricGroup metrics) {
         super.setMetricGroup(metrics);
+        storageMemoryManager.setMetricGroup(metrics);
         tieredStorageProducerClient.setMetricStatisticsUpdater(
                 this::updateProducerMetricStatistics);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
index f3aa2a87eb5..6b44eba76ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.util.List;
 
@@ -54,6 +55,13 @@ public interface TieredStorageMemoryManager {
      */
     void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> 
storageMemorySpecs);
 
+    /**
+     * Set the {@link TaskIOMetricGroup} for this memory manager.
+     *
+     * @param metricGroup the metric group to set
+     */
+    void setMetricGroup(TaskIOMetricGroup metricGroup);
+
     /**
      * Register a listener to listen the buffer reclaim request from the {@link
      * TieredStorageMemoryManager}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
index 7eadf0b0908..f11a413c194 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.LocalBufferPool;
+import org.apache.flink.runtime.metrics.TimerGauge;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FatalExitExceptionHandler;
 
@@ -86,6 +88,12 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
      */
     private final Map<Object, Integer> numOwnerRequestedBuffers;
 
+    /**
+     * Time gauge to measure that hard backpressure time. Pre-create it to 
avoid checkNotNull in
+     * hot-path for performance purpose.
+     */
+    private TimerGauge hardBackpressureTimerGauge = new TimerGauge();
+
     /**
      * This is for triggering buffer reclaiming while blocked on requesting 
new buffers.
      *
@@ -145,6 +153,12 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
         this.isInitialized = true;
     }
 
+    @Override
+    public void setMetricGroup(TaskIOMetricGroup metricGroup) {
+        this.hardBackpressureTimerGauge =
+                checkNotNull(metricGroup.getHardBackPressuredTimePerSecond());
+    }
+
     @Override
     public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) {
         bufferReclaimRequestListeners.add(onBufferReclaimRequest);
@@ -159,12 +173,18 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
         CompletableFuture<Void> requestBufferFuture = new 
CompletableFuture<>();
         scheduleCheckRequestBufferFuture(
                 requestBufferFuture, 
INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS);
-        MemorySegment memorySegment = null;
-        try {
-            memorySegment = bufferPool.requestMemorySegmentBlocking();
-        } catch (InterruptedException e) {
-            ExceptionUtils.rethrow(e);
+        MemorySegment memorySegment = bufferPool.requestMemorySegment();
+
+        if (memorySegment == null) {
+            try {
+                hardBackpressureTimerGauge.markStart();
+                memorySegment = bufferPool.requestMemorySegmentBlocking();
+                hardBackpressureTimerGauge.markEnd();
+            } catch (InterruptedException e) {
+                ExceptionUtils.rethrow(e);
+            }
         }
+
         requestBufferFuture.complete(null);
 
         incNumRequestedBuffer(owner);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
index fbab5c87541..c99013b6a1e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.function.TriConsumer;
 
 import java.util.List;
@@ -35,6 +36,8 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
 
     private final BiConsumer<BufferPool, List<TieredStorageMemorySpec>> 
setupConsumer;
 
+    private final Consumer<TaskIOMetricGroup> setMetricGroupConsumer;
+
     private final Consumer<Runnable> listenBufferReclaimRequestConsumer;
 
     private final Function<Object, BufferBuilder> 
requestBufferBlockingFunction;
@@ -49,6 +52,7 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
 
     private TestingTieredStorageMemoryManager(
             BiConsumer<BufferPool, List<TieredStorageMemorySpec>> 
setupConsumer,
+            Consumer<TaskIOMetricGroup> setMetricGroupConsumer,
             Consumer<Runnable> listenBufferReclaimRequestConsumer,
             Function<Object, BufferBuilder> requestBufferBlockingFunction,
             Function<Object, Integer> getMaxNonReclaimableBuffersFunction,
@@ -56,6 +60,7 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
             TriConsumer<Object, Object, Buffer> 
transferBufferOwnershipConsumer,
             Runnable releaseRunnable) {
         this.setupConsumer = setupConsumer;
+        this.setMetricGroupConsumer = setMetricGroupConsumer;
         this.listenBufferReclaimRequestConsumer = 
listenBufferReclaimRequestConsumer;
         this.requestBufferBlockingFunction = requestBufferBlockingFunction;
         this.getMaxNonReclaimableBuffersFunction = 
getMaxNonReclaimableBuffersFunction;
@@ -69,6 +74,11 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
         setupConsumer.accept(bufferPool, storageMemorySpecs);
     }
 
+    @Override
+    public void setMetricGroup(TaskIOMetricGroup metricGroup) {
+        setMetricGroupConsumer.accept(metricGroup);
+    }
+
     @Override
     public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) {
         listenBufferReclaimRequestConsumer.accept(onBufferReclaimRequest);
@@ -105,6 +115,8 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
         private BiConsumer<BufferPool, List<TieredStorageMemorySpec>> 
setupConsumer =
                 (bufferPool, tieredStorageMemorySpecs) -> {};
 
+        private Consumer<TaskIOMetricGroup> setMetricGroupConsumer = (ignore) 
-> {};
+
         private Consumer<Runnable> listenBufferReclaimRequestConsumer = 
runnable -> {};
 
         private Function<Object, BufferBuilder> requestBufferBlockingFunction 
= owner -> null;
@@ -165,6 +177,7 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
         public TestingTieredStorageMemoryManager build() {
             return new TestingTieredStorageMemoryManager(
                     setupConsumer,
+                    setMetricGroupConsumer,
                     listenBufferReclaimRequestConsumer,
                     requestBufferBlockingFunction,
                     getMaxNonReclaimableBuffersFunction,

Reply via email to