This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit eab99b6de820352490cce580fee2e7f31ef16617 Author: Weijie Guo <[email protected]> AuthorDate: Fri Jul 21 11:43:16 2023 +0800 [FLINK-32640][Network] Measure backpressure time for tiered storage hybrid shuffle. --- .../tiered/shuffle/TieredResultPartition.java | 1 + .../tiered/storage/TieredStorageMemoryManager.java | 8 ++++++ .../storage/TieredStorageMemoryManagerImpl.java | 30 ++++++++++++++++++---- .../tiered/TestingTieredStorageMemoryManager.java | 13 ++++++++++ 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java index f5601fbd0ae..d3aae631620 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartition.java @@ -122,6 +122,7 @@ public class TieredResultPartition extends ResultPartition { @Override public void setMetricGroup(TaskIOMetricGroup metrics) { super.setMetricGroup(metrics); + storageMemoryManager.setMetricGroup(metrics); tieredStorageProducerClient.setMetricStatisticsUpdater( this::updateProducerMetricStatistics); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java index f3aa2a87eb5..6b44eba76ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import java.util.List; @@ -54,6 +55,13 @@ public interface TieredStorageMemoryManager { */ void setup(BufferPool bufferPool, List<TieredStorageMemorySpec> storageMemorySpecs); + /** + * Set the {@link TaskIOMetricGroup} for this memory manager. + * + * @param metricGroup the metric group to set + */ + void setMetricGroup(TaskIOMetricGroup metricGroup); + /** * Register a listener to listen the buffer reclaim request from the {@link * TieredStorageMemoryManager}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java index 7eadf0b0908..f11a413c194 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.LocalBufferPool; +import org.apache.flink.runtime.metrics.TimerGauge; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FatalExitExceptionHandler; @@ -86,6 +88,12 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage */ private final Map<Object, Integer> numOwnerRequestedBuffers; + /** + * Time gauge to measure that hard backpressure time. Pre-create it to avoid checkNotNull in + * hot-path for performance purpose. + */ + private TimerGauge hardBackpressureTimerGauge = new TimerGauge(); + /** * This is for triggering buffer reclaiming while blocked on requesting new buffers. * @@ -145,6 +153,12 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage this.isInitialized = true; } + @Override + public void setMetricGroup(TaskIOMetricGroup metricGroup) { + this.hardBackpressureTimerGauge = + checkNotNull(metricGroup.getHardBackPressuredTimePerSecond()); + } + @Override public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) { bufferReclaimRequestListeners.add(onBufferReclaimRequest); @@ -159,12 +173,18 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage CompletableFuture<Void> requestBufferFuture = new CompletableFuture<>(); scheduleCheckRequestBufferFuture( requestBufferFuture, INITIAL_REQUEST_BUFFER_TIMEOUT_FOR_RECLAIMING_MS); - MemorySegment memorySegment = null; - try { - memorySegment = bufferPool.requestMemorySegmentBlocking(); - } catch (InterruptedException e) { - ExceptionUtils.rethrow(e); + MemorySegment memorySegment = bufferPool.requestMemorySegment(); + + if (memorySegment == null) { + try { + hardBackpressureTimerGauge.markStart(); + memorySegment = bufferPool.requestMemorySegmentBlocking(); + hardBackpressureTimerGauge.markEnd(); + } catch (InterruptedException e) { + ExceptionUtils.rethrow(e); + } } + requestBufferFuture.complete(null); incNumRequestedBuffer(owner); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java index fbab5c87541..c99013b6a1e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.function.TriConsumer; import java.util.List; @@ -35,6 +36,8 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan private final BiConsumer<BufferPool, List<TieredStorageMemorySpec>> setupConsumer; + private final Consumer<TaskIOMetricGroup> setMetricGroupConsumer; + private final Consumer<Runnable> listenBufferReclaimRequestConsumer; private final Function<Object, BufferBuilder> requestBufferBlockingFunction; @@ -49,6 +52,7 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan private TestingTieredStorageMemoryManager( BiConsumer<BufferPool, List<TieredStorageMemorySpec>> setupConsumer, + Consumer<TaskIOMetricGroup> setMetricGroupConsumer, Consumer<Runnable> listenBufferReclaimRequestConsumer, Function<Object, BufferBuilder> requestBufferBlockingFunction, Function<Object, Integer> getMaxNonReclaimableBuffersFunction, @@ -56,6 +60,7 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan TriConsumer<Object, Object, Buffer> transferBufferOwnershipConsumer, Runnable releaseRunnable) { this.setupConsumer = setupConsumer; + this.setMetricGroupConsumer = setMetricGroupConsumer; this.listenBufferReclaimRequestConsumer = listenBufferReclaimRequestConsumer; this.requestBufferBlockingFunction = requestBufferBlockingFunction; this.getMaxNonReclaimableBuffersFunction = getMaxNonReclaimableBuffersFunction; @@ -69,6 +74,11 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan setupConsumer.accept(bufferPool, storageMemorySpecs); } + @Override + public void setMetricGroup(TaskIOMetricGroup metricGroup) { + setMetricGroupConsumer.accept(metricGroup); + } + @Override public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) { listenBufferReclaimRequestConsumer.accept(onBufferReclaimRequest); @@ -105,6 +115,8 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan private BiConsumer<BufferPool, List<TieredStorageMemorySpec>> setupConsumer = (bufferPool, tieredStorageMemorySpecs) -> {}; + private Consumer<TaskIOMetricGroup> setMetricGroupConsumer = (ignore) -> {}; + private Consumer<Runnable> listenBufferReclaimRequestConsumer = runnable -> {}; private Function<Object, BufferBuilder> requestBufferBlockingFunction = owner -> null; @@ -165,6 +177,7 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan public TestingTieredStorageMemoryManager build() { return new TestingTieredStorageMemoryManager( setupConsumer, + setMetricGroupConsumer, listenBufferReclaimRequestConsumer, requestBufferBlockingFunction, getMaxNonReclaimableBuffersFunction,
