This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f3bbb6b6a9b26a135e3e257e51be942d6d8bedf0 Author: Weijie Guo <[email protected]> AuthorDate: Fri Jul 29 15:50:45 2022 +0800 [hotfix] Simplify the logic related to release and fail reader in HsFileDataManager. --- .../partition/hybrid/HsFileDataManager.java | 32 +++++++--------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java index dab671aa723..dbcd204300b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java @@ -93,10 +93,6 @@ public class HsFileDataManager implements Runnable, BufferRecycler { private final HybridShuffleConfiguration hybridShuffleConfiguration; - /** All failed subpartition readers to be released. */ - @GuardedBy("lock") - private final Set<HsSubpartitionFileReader> failedReaders = new HashSet<>(); - /** All readers waiting to read data of different subpartitions. */ @GuardedBy("lock") private final Set<HsSubpartitionFileReader> allReaders = new HashSet<>(); @@ -178,7 +174,6 @@ public class HsFileDataManager implements Runnable, BufferRecycler { } isReleased = true; - failedReaders.addAll(allReaders); List<HsSubpartitionFileReader> pendingReaders = new ArrayList<>(allReaders); mayNotifyReleased(); failSubpartitionReaders( @@ -324,7 +319,7 @@ public class HsFileDataManager implements Runnable, BufferRecycler { private void failSubpartitionReaders( Collection<HsSubpartitionFileReader> readers, Throwable failureCause) { synchronized (lock) { - failedReaders.addAll(readers); + removeSubpartitionReaders(readers); } for (HsSubpartitionFileReader reader : readers) { @@ -332,9 +327,17 @@ public class HsFileDataManager implements Runnable, BufferRecycler { } } + @GuardedBy("lock") + private void removeSubpartitionReaders(Collection<HsSubpartitionFileReader> readers) { + allReaders.removeAll(readers); + if (allReaders.isEmpty()) { + bufferPool.unregisterRequester(this); + closeFileChannel(); + } + } + private void endCurrentRoundOfReading(int numBuffersRead) { synchronized (lock) { - removeFailedReaders(); numRequestedBuffers += numBuffersRead; isRunning = false; mayTriggerReading(); @@ -342,21 +345,6 @@ public class HsFileDataManager implements Runnable, BufferRecycler { } } - @GuardedBy("lock") - private void removeFailedReaders() { - assert Thread.holdsLock(lock); - - for (HsSubpartitionFileReader reader : failedReaders) { - allReaders.remove(reader); - } - failedReaders.clear(); - - if (allReaders.isEmpty()) { - bufferPool.unregisterRequester(this); - closeFileChannel(); - } - } - @GuardedBy("lock") private void lazyInitialize() throws IOException { assert Thread.holdsLock(lock);
