This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66121c8486cf9c421130020fb548c7c7192d6079
Author: Wencong Liu <[email protected]>
AuthorDate: Mon Aug 14 11:32:32 2023 +0800

    [hotfix] Fix the inaccurate backlog number of Hybrid Shuffle in legacy mode
---
 .../io/network/partition/hybrid/HsDataView.java    |  2 +-
 .../HsSubpartitionConsumerMemoryDataManager.java   | 28 ++++++++++++++++---
 .../hybrid/HsSubpartitionFileReaderImpl.java       | 31 +++++++++++++++++-----
 3 files changed, 51 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
index 30ad6693fba..49d88df23c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsDataView.java
@@ -55,7 +55,7 @@ public interface HsDataView {
             int nextBufferToConsume, Collection<Buffer> buffersToRecycle);
 
     /**
-     * Get the number of buffers backlog.
+     * Get the number of buffers whose {@link Buffer.DataType} is buffer.
      *
      * @return backlog of this view's corresponding subpartition.
      */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java
index 30cf2e91898..eb597cddbf8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionConsumerMemoryDataManager.java
@@ -53,6 +53,9 @@ public class HsSubpartitionConsumerMemoryDataManager 
implements HsDataView {
 
     private final HsMemoryDataManagerOperation memoryDataManagerOperation;
 
+    @GuardedBy("consumerLock")
+    private int backlog = 0;
+
     public HsSubpartitionConsumerMemoryDataManager(
             Lock resultPartitionLock,
             Lock consumerLock,
@@ -69,12 +72,16 @@ public class HsSubpartitionConsumerMemoryDataManager 
implements HsDataView {
     @GuardedBy("consumerLock")
     // this method only called from subpartitionMemoryDataManager with write 
lock.
     public void addInitialBuffers(Deque<HsBufferContext> buffers) {
+        for (HsBufferContext bufferContext : buffers) {
+            tryIncreaseBacklog(bufferContext.getBuffer());
+        }
         unConsumedBuffers.addAll(buffers);
     }
 
     @GuardedBy("consumerLock")
     // this method only called from subpartitionMemoryDataManager with write 
lock.
     public boolean addBuffer(HsBufferContext bufferContext) {
+        tryIncreaseBacklog(bufferContext.getBuffer());
         unConsumedBuffers.add(bufferContext);
         trimHeadingReleasedBuffers();
         return unConsumedBuffers.size() <= 1;
@@ -104,6 +111,7 @@ public class HsSubpartitionConsumerMemoryDataManager 
implements HsDataView {
 
                             HsBufferContext bufferContext =
                                     
checkNotNull(unConsumedBuffers.pollFirst());
+                            tryDecreaseBacklog(bufferContext.getBuffer());
                             bufferContext.consumed(consumerId);
                             Buffer.DataType nextDataType =
                                     
peekNextToConsumeDataTypeInternal(toConsumeIndex + 1);
@@ -157,12 +165,12 @@ public class HsSubpartitionConsumerMemoryDataManager 
implements HsDataView {
     }
 
     @SuppressWarnings("FieldAccessNotGuarded")
-    // Un-synchronized get unConsumedBuffers size to provide memory data 
backlog,this will make the
+    // Un-synchronized get the backlog to provide memory data backlog, this 
will make the
     // result greater than or equal to the actual backlog, but obtaining an 
accurate backlog will
     // bring too much extra overhead.
     @Override
     public int getBacklog() {
-        return unConsumedBuffers.size();
+        return backlog;
     }
 
     @Override
@@ -173,7 +181,21 @@ public class HsSubpartitionConsumerMemoryDataManager 
implements HsDataView {
     @GuardedBy("consumerLock")
     private void trimHeadingReleasedBuffers() {
         while (!unConsumedBuffers.isEmpty() && 
unConsumedBuffers.peekFirst().isReleased()) {
-            unConsumedBuffers.removeFirst();
+            tryDecreaseBacklog(unConsumedBuffers.removeFirst().getBuffer());
+        }
+    }
+
+    @GuardedBy("consumerLock")
+    private void tryIncreaseBacklog(Buffer buffer) {
+        if (buffer.isBuffer()) {
+            ++backlog;
+        }
+    }
+
+    @GuardedBy("consumerLock")
+    private void tryDecreaseBacklog(Buffer buffer) {
+        if (buffer.isBuffer()) {
+            --backlog;
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index 003cd951678..09938385fea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -39,6 +39,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.positionToNextBuffer;
@@ -72,6 +73,8 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
 
     private final Consumer<HsSubpartitionFileReader> fileReaderReleaser;
 
+    private final AtomicInteger backlog = new AtomicInteger(0);
+
     private final Object lock = new Object();
 
     @GuardedBy("lock")
@@ -172,7 +175,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
                     buffers.add(segment);
                     throw throwable;
                 }
-
+                tryIncreaseBacklog(buffer);
                 loadedBuffers.add(BufferIndexOrError.newBuffer(buffer, 
indexToLoad));
                 bufferIndexManager.updateLastLoaded(indexToLoad);
                 cachedRegionManager.advance(
@@ -198,7 +201,8 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
             // order
             while ((bufferIndexOrError = loadedBuffers.pollLast()) != null) {
                 if (bufferIndexOrError.getBuffer().isPresent()) {
-                    checkNotNull(bufferIndexOrError.buffer).recycleBuffer();
+                    bufferIndexOrError.getBuffer().get().recycleBuffer();
+                    tryDecreaseBacklog(bufferIndexOrError.getBuffer().get());
                 }
             }
 
@@ -243,7 +247,6 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
         BufferIndexOrError next = loadedBuffers.peek();
 
         Buffer.DataType nextDataType = next == null ? Buffer.DataType.NONE : 
next.getDataType();
-        int backlog = loadedBuffers.size();
         int bufferIndex = current.getIndex();
         Buffer buffer =
                 current.getBuffer()
@@ -251,9 +254,10 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
                                 () ->
                                         new NullPointerException(
                                                 "Get a non-throwable and 
non-buffer bufferIndexOrError, which is not allowed"));
+        tryDecreaseBacklog(buffer);
         return Optional.of(
                 ResultSubpartition.BufferAndBacklog.fromBufferAndLookahead(
-                        buffer, nextDataType, backlog, bufferIndex));
+                        buffer, nextDataType, backlog.get(), bufferIndex));
     }
 
     @Override
@@ -280,6 +284,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
             while (!loadedBuffers.isEmpty()) {
                 BufferIndexOrError bufferIndexOrError = loadedBuffers.poll();
                 if (bufferIndexOrError.getBuffer().isPresent()) {
+                    tryDecreaseBacklog(bufferIndexOrError.getBuffer().get());
                     bufferToRecycle.add(bufferIndexOrError.getBuffer().get());
                 }
             }
@@ -291,7 +296,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
 
     @Override
     public int getBacklog() {
-        return loadedBuffers.size();
+        return backlog.get();
     }
 
     // ------------------------------------------------------------------------
@@ -312,7 +317,9 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
                 // Because the update of consumption progress may be delayed, 
there is a
                 // very small probability to load the buffer that has been 
consumed from memory.
                 // Skip these buffers directly to avoid repeated consumption.
-                
buffersToRecycle.add(checkNotNull(loadedBuffers.poll()).buffer);
+                Buffer buffer = checkNotNull(loadedBuffers.poll()).buffer;
+                tryDecreaseBacklog(checkNotNull(buffer));
+                buffersToRecycle.add(buffer);
                 peek = loadedBuffers.peek();
             }
         }
@@ -320,6 +327,18 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
         return Optional.ofNullable(peek);
     }
 
+    private void tryIncreaseBacklog(Buffer buffer) {
+        if (buffer.isBuffer()) {
+            backlog.getAndIncrement();
+        }
+    }
+
+    private void tryDecreaseBacklog(Buffer buffer) {
+        if (buffer.isBuffer()) {
+            backlog.getAndDecrement();
+        }
+    }
+
     private void moveFileOffsetToBuffer(int bufferIndex) throws IOException {
         Tuple2<Integer, Long> indexAndOffset =
                 cachedRegionManager.getNumSkipAndFileOffset(bufferIndex);

Reply via email to