This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 51b0df658864fc4caa0607fe82ce26f90c1a67ee Author: Weijie Guo <[email protected]> AuthorDate: Tue Apr 25 15:38:11 2023 +0800 [FLINK-32640][Network] Measure backpressure time for legacy hybrid shuffle. --- .../network/partition/hybrid/HsMemoryDataManager.java | 19 ++++++++++++++++++- .../io/network/partition/hybrid/HsOutputMetrics.java | 11 ++++++++++- .../network/partition/hybrid/HsResultPartition.java | 6 +++++- .../partition/hybrid/HybridShuffleTestUtils.java | 3 ++- 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java index 387114dbf81..ac374ea27c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.Decision; +import org.apache.flink.runtime.metrics.TimerGauge; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; @@ -92,6 +93,12 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData private final AtomicInteger poolSize; + /** + * If task thread blocked on request buffer from buffer pool, this metric should be updated. + * Pre-create it to avoid checkNotNull in hot-path for performance purpose. + */ + private TimerGauge hardBackPressuredTimePerSecond = new TimerGauge(); + public HsMemoryDataManager( int numSubpartitions, int bufferSize, @@ -200,6 +207,7 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData for (int i = 0; i < numSubpartitions; i++) { getSubpartitionMemoryDataManager(i).setOutputMetrics(metrics); } + this.hardBackPressuredTimePerSecond = metrics.getHardBackpressureTimerGauge(); } // ------------------------------------ @@ -263,7 +271,16 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData @Override public BufferBuilder requestBufferFromPool() throws InterruptedException { - MemorySegment segment = bufferPool.requestMemorySegmentBlocking(); + MemorySegment segment = bufferPool.requestMemorySegment(); + + if (segment == null) { + // only when the buffer is not acquired immediately, it is requested in blocking mode, + // which will make the calculation of backpressure more accurate. + hardBackPressuredTimePerSecond.markStart(); + segment = bufferPool.requestMemorySegmentBlocking(); + hardBackPressuredTimePerSecond.markEnd(); + } + Optional<Decision> decisionOpt = spillStrategy.onMemoryUsageChanged( numRequestedBuffers.incrementAndGet(), getPoolSize()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java index 6ffb1f03d01..ecedd1d2ebd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsOutputMetrics.java @@ -19,15 +19,20 @@ package org.apache.flink.runtime.io.network.partition.hybrid; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.metrics.TimerGauge; /** All metrics that {@link HsResultPartition} needs to count, except numBytesProduced. */ public class HsOutputMetrics { private final Counter numBytesOut; private final Counter numBuffersOut; - public HsOutputMetrics(Counter numBytesOut, Counter numBuffersOut) { + private final TimerGauge hardBackpressureTimerGauge; + + public HsOutputMetrics( + Counter numBytesOut, Counter numBuffersOut, TimerGauge hardBackpressureTimerGauge) { this.numBytesOut = numBytesOut; this.numBuffersOut = numBuffersOut; + this.hardBackpressureTimerGauge = hardBackpressureTimerGauge; } public Counter getNumBytesOut() { @@ -37,4 +42,8 @@ public class HsOutputMetrics { public Counter getNumBuffersOut() { return numBuffersOut; } + + public TimerGauge getHardBackpressureTimerGauge() { + return hardBackpressureTimerGauge; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java index c0298325caf..e0dca965666 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartition.java @@ -154,7 +154,11 @@ public class HsResultPartition extends ResultPartition { public void setMetricGroup(TaskIOMetricGroup metrics) { super.setMetricGroup(metrics); checkNotNull(memoryDataManager) - .setOutputMetrics(new HsOutputMetrics(numBytesOut, numBuffersOut)); + .setOutputMetrics( + new HsOutputMetrics( + numBytesOut, + numBuffersOut, + metrics.getHardBackPressuredTimePerSecond())); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java index 5b0629bd1a5..d9abab0b139 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HybridShuffleTestUtils.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndexImpl. import org.apache.flink.runtime.io.network.partition.hybrid.index.FileDataIndexRegionHelper; import org.apache.flink.runtime.io.network.partition.hybrid.index.TestingFileDataIndexRegion; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex; +import org.apache.flink.runtime.metrics.TimerGauge; import java.util.ArrayDeque; import java.util.ArrayList; @@ -74,7 +75,7 @@ public class HybridShuffleTestUtils { } public static HsOutputMetrics createTestingOutputMetrics() { - return new HsOutputMetrics(new TestCounter(), new TestCounter()); + return new HsOutputMetrics(new TestCounter(), new TestCounter(), new TimerGauge()); } public static TestingFileDataIndexRegion createSingleTestRegion(
