This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit f32b98443d7a920debdd259f8a04fc8b5acfb540 Author: Jingsong Lee <[email protected]> AuthorDate: Mon Apr 14 20:13:10 2025 +0800 [core] Should not close channelManager in BinaryExternalSortBuffer (#5466) --- .../apache/paimon/sort/BinaryExternalSortBuffer.java | 5 ++--- .../org/apache/paimon/sort/SpillChannelManager.java | 20 ++------------------ .../paimon/sort/BinaryExternalSortBufferTest.java | 9 ++++++++- 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java index 4bfbcd5ec7..c6495811de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java @@ -50,7 +50,7 @@ public class BinaryExternalSortBuffer implements SortBuffer { private final BinaryRowSerializer serializer; private final BinaryInMemorySortBuffer inMemorySortBuffer; private final IOManager ioManager; - private SpillChannelManager channelManager; + private final SpillChannelManager channelManager; private final int maxNumFileHandles; private final BlockCompressionFactory compressionCodecFactory; private final int compressionBlockSize; @@ -154,8 +154,7 @@ public class BinaryExternalSortBuffer implements SortBuffer { inMemorySortBuffer.clear(); spillChannelIDs.clear(); // delete files - channelManager.close(); - channelManager = new SpillChannelManager(); + channelManager.reset(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java b/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java index ee21427cac..5118eb2aac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java @@ -20,22 +20,17 @@ package org.apache.paimon.sort; import org.apache.paimon.disk.FileIOChannel; -import java.io.Closeable; import java.io.File; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import static org.apache.paimon.utils.Preconditions.checkArgument; - /** Channel manager to manage the life cycle of spill channels. */ -public class SpillChannelManager implements Closeable { +public class SpillChannelManager { private final HashSet<FileIOChannel.ID> channels; private final HashSet<FileIOChannel> openChannels; - private volatile boolean closed; - public SpillChannelManager() { this.channels = new HashSet<>(64); this.openChannels = new HashSet<>(64); @@ -43,13 +38,11 @@ public class SpillChannelManager implements Closeable { /** Add a new File channel. */ public synchronized void addChannel(FileIOChannel.ID id) { - checkArgument(!closed); channels.add(id); } /** Open File channels. */ public synchronized void addOpenChannels(List<FileIOChannel> toOpen) { - checkArgument(!closed); for (FileIOChannel channel : toOpen) { openChannels.add(channel); channels.remove(channel.getChannelID()); @@ -57,19 +50,10 @@ public class SpillChannelManager implements Closeable { } public synchronized void removeChannel(FileIOChannel.ID id) { - checkArgument(!closed); channels.remove(id); } - @Override - public synchronized void close() { - - if (this.closed) { - return; - } - - this.closed = true; - + public synchronized void reset() { for (Iterator<FileIOChannel> channels = this.openChannels.iterator(); channels.hasNext(); ) { final FileIOChannel channel = channels.next(); diff --git a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java index 59ef802a1c..0ff35a0606 100644 --- a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java @@ -188,8 +188,15 @@ public class BinaryExternalSortBufferTest { innerTestSpilling(createBuffer()); } + @Test + public void testSpillingAndClearWithMaxFanIn() throws Exception { + BinaryExternalSortBuffer buffer = createBuffer(2); + innerTestSpilling(buffer); + innerTestSpilling(buffer); + } + private void innerTestSpilling(BinaryExternalSortBuffer sorter) throws Exception { - int size = 1000_000; + int size = 2000_000; MockBinaryRowReader reader = new MockBinaryRowReader(size); sorter.write(reader);
