This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 66121c8486cf9c421130020fb548c7c7192d6079 Author: Wencong Liu <[email protected]> AuthorDate: Mon Aug 14 11:32:32 2023 +0800 [hotfix] Fix the inaccurate backlog number of Hybrid Shuffle in legacy mode --- .../io/network/partition/hybrid/HsDataView.java | 2 +- .../HsSubpartitionConsumerMemoryDataManager.java | 28 ++++++++++++++++--- .../hybrid/HsSubpartitionFileReaderImpl.java | 31 +++++++++++++++++----- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java index 30ad6693fba..49d88df23c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java @@ -55,7 +55,7 @@ public interface HsDataView { int nextBufferToConsume, Collection<Buffer> buffersToRecycle); /** - * Get the number of buffers backlog. + * Get the number of buffers whose {@link Buffer.DataType} is buffer. * * @return backlog of this view's corresponding subpartition. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java index 30cf2e91898..eb597cddbf8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java @@ -53,6 +53,9 @@ public class HsSubpartitionConsumerMemoryDataManager implements HsDataView { private final HsMemoryDataManagerOperation memoryDataManagerOperation; + @GuardedBy("consumerLock") + private int backlog = 0; + public HsSubpartitionConsumerMemoryDataManager( Lock resultPartitionLock, Lock consumerLock, @@ -69,12 +72,16 @@ public class HsSubpartitionConsumerMemoryDataManager implements HsDataView { @GuardedBy("consumerLock") // this method only called from subpartitionMemoryDataManager with write lock. public void addInitialBuffers(Deque<HsBufferContext> buffers) { + for (HsBufferContext bufferContext : buffers) { + tryIncreaseBacklog(bufferContext.getBuffer()); + } unConsumedBuffers.addAll(buffers); } @GuardedBy("consumerLock") // this method only called from subpartitionMemoryDataManager with write lock. public boolean addBuffer(HsBufferContext bufferContext) { + tryIncreaseBacklog(bufferContext.getBuffer()); unConsumedBuffers.add(bufferContext); trimHeadingReleasedBuffers(); return unConsumedBuffers.size() <= 1; @@ -104,6 +111,7 @@ public class HsSubpartitionConsumerMemoryDataManager implements HsDataView { HsBufferContext bufferContext = checkNotNull(unConsumedBuffers.pollFirst()); + tryDecreaseBacklog(bufferContext.getBuffer()); bufferContext.consumed(consumerId); Buffer.DataType nextDataType = peekNextToConsumeDataTypeInternal(toConsumeIndex + 1); @@ -157,12 +165,12 @@ public class HsSubpartitionConsumerMemoryDataManager implements HsDataView { } @SuppressWarnings("FieldAccessNotGuarded") - // Un-synchronized get unConsumedBuffers size to provide memory data backlog,this will make the + // Un-synchronized get the backlog to provide memory data backlog, this will make the // result greater than or equal to the actual backlog, but obtaining an accurate backlog will // bring too much extra overhead. @Override public int getBacklog() { - return unConsumedBuffers.size(); + return backlog; } @Override @@ -173,7 +181,21 @@ public class HsSubpartitionConsumerMemoryDataManager implements HsDataView { @GuardedBy("consumerLock") private void trimHeadingReleasedBuffers() { while (!unConsumedBuffers.isEmpty() && unConsumedBuffers.peekFirst().isReleased()) { - unConsumedBuffers.removeFirst(); + tryDecreaseBacklog(unConsumedBuffers.removeFirst().getBuffer()); + } + } + + @GuardedBy("consumerLock") + private void tryIncreaseBacklog(Buffer buffer) { + if (buffer.isBuffer()) { + ++backlog; + } + } + + @GuardedBy("consumerLock") + private void tryDecreaseBacklog(Buffer buffer) { + if (buffer.isBuffer()) { + --backlog; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java index 003cd951678..09938385fea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java @@ -39,6 +39,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer; @@ -72,6 +73,8 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { private final Consumer<HsSubpartitionFileReader> fileReaderReleaser; + private final AtomicInteger backlog = new AtomicInteger(0); + private final Object lock = new Object(); @GuardedBy("lock") @@ -172,7 +175,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { buffers.add(segment); throw throwable; } - + tryIncreaseBacklog(buffer); loadedBuffers.add(BufferIndexOrError.newBuffer(buffer, indexToLoad)); bufferIndexManager.updateLastLoaded(indexToLoad); cachedRegionManager.advance( @@ -198,7 +201,8 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { // order while ((bufferIndexOrError = loadedBuffers.pollLast()) != null) { if (bufferIndexOrError.getBuffer().isPresent()) { - checkNotNull(bufferIndexOrError.buffer).recycleBuffer(); + bufferIndexOrError.getBuffer().get().recycleBuffer(); + tryDecreaseBacklog(bufferIndexOrError.getBuffer().get()); } } @@ -243,7 +247,6 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { BufferIndexOrError next = loadedBuffers.peek(); Buffer.DataType nextDataType = next == null ? Buffer.DataType.NONE : next.getDataType(); - int backlog = loadedBuffers.size(); int bufferIndex = current.getIndex(); Buffer buffer = current.getBuffer() @@ -251,9 +254,10 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { () -> new NullPointerException( "Get a non-throwable and non-buffer bufferIndexOrError, which is not allowed")); + tryDecreaseBacklog(buffer); return Optional.of( ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead( - buffer, nextDataType, backlog, bufferIndex)); + buffer, nextDataType, backlog.get(), bufferIndex)); } @Override @@ -280,6 +284,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { while (!loadedBuffers.isEmpty()) { BufferIndexOrError bufferIndexOrError = loadedBuffers.poll(); if (bufferIndexOrError.getBuffer().isPresent()) { + tryDecreaseBacklog(bufferIndexOrError.getBuffer().get()); bufferToRecycle.add(bufferIndexOrError.getBuffer().get()); } } @@ -291,7 +296,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { @Override public int getBacklog() { - return loadedBuffers.size(); + return backlog.get(); } // ------------------------------------------------------------------------ @@ -312,7 +317,9 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { // Because the update of consumption progress may be delayed, there is a // very small probability to load the buffer that has been consumed from memory. // Skip these buffers directly to avoid repeated consumption. - buffersToRecycle.add(checkNotNull(loadedBuffers.poll()).buffer); + Buffer buffer = checkNotNull(loadedBuffers.poll()).buffer; + tryDecreaseBacklog(checkNotNull(buffer)); + buffersToRecycle.add(buffer); peek = loadedBuffers.peek(); } } @@ -320,6 +327,18 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { return Optional.ofNullable(peek); } + private void tryIncreaseBacklog(Buffer buffer) { + if (buffer.isBuffer()) { + backlog.getAndIncrement(); + } + } + + private void tryDecreaseBacklog(Buffer buffer) { + if (buffer.isBuffer()) { + backlog.getAndDecrement(); + } + } + private void moveFileOffsetToBuffer(int bufferIndex) throws IOException { Tuple2<Integer, Long> indexAndOffset = cachedRegionManager.getNumSkipAndFileOffset(bufferIndex);
