This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3bbb6b6a9b26a135e3e257e51be942d6d8bedf0
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Jul 29 15:50:45 2022 +0800

    [hotfix] Simplify the logic related to release and fail reader in 
HsFileDataManager.
---
 .../partition/hybrid/HsFileDataManager.java        | 32 +++++++---------------
 1 file changed, 10 insertions(+), 22 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index dab671aa723..dbcd204300b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -93,10 +93,6 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
 
     private final HybridShuffleConfiguration hybridShuffleConfiguration;
 
-    /** All failed subpartition readers to be released. */
-    @GuardedBy("lock")
-    private final Set<HsSubpartitionFileReader> failedReaders = new 
HashSet<>();
-
     /** All readers waiting to read data of different subpartitions. */
     @GuardedBy("lock")
     private final Set<HsSubpartitionFileReader> allReaders = new HashSet<>();
@@ -178,7 +174,6 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
             }
             isReleased = true;
 
-            failedReaders.addAll(allReaders);
             List<HsSubpartitionFileReader> pendingReaders = new 
ArrayList<>(allReaders);
             mayNotifyReleased();
             failSubpartitionReaders(
@@ -324,7 +319,7 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
     private void failSubpartitionReaders(
             Collection<HsSubpartitionFileReader> readers, Throwable 
failureCause) {
         synchronized (lock) {
-            failedReaders.addAll(readers);
+            removeSubpartitionReaders(readers);
         }
 
         for (HsSubpartitionFileReader reader : readers) {
@@ -332,9 +327,17 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
         }
     }
 
+    @GuardedBy("lock")
+    private void 
removeSubpartitionReaders(Collection<HsSubpartitionFileReader> readers) {
+        allReaders.removeAll(readers);
+        if (allReaders.isEmpty()) {
+            bufferPool.unregisterRequester(this);
+            closeFileChannel();
+        }
+    }
+
     private void endCurrentRoundOfReading(int numBuffersRead) {
         synchronized (lock) {
-            removeFailedReaders();
             numRequestedBuffers += numBuffersRead;
             isRunning = false;
             mayTriggerReading();
@@ -342,21 +345,6 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
         }
     }
 
-    @GuardedBy("lock")
-    private void removeFailedReaders() {
-        assert Thread.holdsLock(lock);
-
-        for (HsSubpartitionFileReader reader : failedReaders) {
-            allReaders.remove(reader);
-        }
-        failedReaders.clear();
-
-        if (allReaders.isEmpty()) {
-            bufferPool.unregisterRequester(this);
-            closeFileChannel();
-        }
-    }
-
     @GuardedBy("lock")
     private void lazyInitialize() throws IOException {
         assert Thread.holdsLock(lock);

Reply via email to