This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 086c47739e [mosaic] Clean up reader resources on close (#8152)
086c47739e is described below

commit 086c47739eefc9c3a95c81abe176b40a5fd8ff6f
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 8 08:35:31 2026 +0800

    [mosaic] Clean up reader resources on close (#8152)
---
 .../paimon/format/mosaic/MosaicRecordsReader.java  | 52 +++++++++++++--
 .../format/mosaic/MosaicRecordsReaderTest.java     | 77 ++++++++++++++++++++++
 2 files changed, 124 insertions(+), 5 deletions(-)

diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
index 43f01c346c..6a81d8aad8 100644
--- 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
@@ -233,14 +233,56 @@ public class MosaicRecordsReader implements 
FileRecordReader<InternalRow> {
 
     @Override
     public void close() throws IOException {
-        releaseCurrentVsr();
-        reader.close();
-        allocator.close();
-        inputFileAdapter.close();
+        Throwable throwable = null;
+
+        try {
+            releaseCurrentVsr();
+        } catch (Throwable t) {
+            throwable = t;
+        }
+
+        try {
+            reader.close();
+        } catch (Throwable t) {
+            throwable = addSuppressed(throwable, t);
+        }
+
+        try {
+            allocator.close();
+        } catch (Throwable t) {
+            throwable = addSuppressed(throwable, t);
+        }
+
+        try {
+            inputFileAdapter.close();
+        } catch (Throwable t) {
+            throwable = addSuppressed(throwable, t);
+        }
+
+        if (throwable != null) {
+            rethrow(throwable);
+        }
     }
 
-    private static void addSuppressed(Throwable throwable, Throwable 
suppressed) {
+    private static Throwable addSuppressed(Throwable throwable, Throwable 
suppressed) {
+        if (throwable == null) {
+            return suppressed;
+        }
         throwable.addSuppressed(suppressed);
+        return throwable;
+    }
+
+    private static void rethrow(Throwable throwable) throws IOException {
+        if (throwable instanceof IOException) {
+            throw (IOException) throwable;
+        }
+        if (throwable instanceof RuntimeException) {
+            throw (RuntimeException) throwable;
+        }
+        if (throwable instanceof Error) {
+            throw (Error) throwable;
+        }
+        throw new IOException(throwable);
     }
 
     private static RuntimeException rethrowUnchecked(Throwable throwable) {
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
index e8bc3df8f9..f2725dc675 100644
--- 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
@@ -26,15 +26,18 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /** Test for {@link MosaicRecordsReader}. */
 class MosaicRecordsReaderTest {
@@ -119,12 +122,74 @@ class MosaicRecordsReaderTest {
         assertThat(inputStream.closeCount()).isEqualTo(1);
     }
 
+    @Test
+    void testCloseContinuesWhenReaderCloseThrows() throws IOException {
+        CloseCountingSeekableInputStream inputStream = new 
CloseCountingSeekableInputStream();
+        MosaicInputFileAdapter inputFileAdapter = 
createInputFileAdapter(inputStream);
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator();
+        MosaicReader reader = createReader();
+        RuntimeException failure = new RuntimeException("reader close failed");
+        doThrow(failure).when(reader).close();
+
+        MosaicRecordsReader recordsReader =
+                createRecordsReader(inputFileAdapter, allocator, reader);
+
+        assertThatThrownBy(recordsReader::close).isSameAs(failure);
+
+        verify(reader).close();
+        assertThat(allocator.closeCount()).isEqualTo(1);
+        assertThat(inputStream.closeCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testCloseAddsSuppressedExceptionsFromLaterResources() throws 
IOException {
+        CloseCountingSeekableInputStream inputStream = new 
CloseCountingSeekableInputStream();
+        MosaicInputFileAdapter inputFileAdapter = 
createInputFileAdapter(inputStream);
+        RuntimeException allocatorFailure = new RuntimeException("allocator 
close failed");
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator(allocatorFailure);
+        MosaicReader reader = createReader();
+        RuntimeException readerFailure = new RuntimeException("reader close 
failed");
+        doThrow(readerFailure).when(reader).close();
+
+        MosaicRecordsReader recordsReader =
+                createRecordsReader(inputFileAdapter, allocator, reader);
+
+        assertThatThrownBy(recordsReader::close)
+                .isSameAs(readerFailure)
+                .satisfies(t -> 
assertThat(t.getSuppressed()).containsExactly(allocatorFailure));
+
+        verify(reader).close();
+        assertThat(allocator.closeCount()).isEqualTo(1);
+        assertThat(inputStream.closeCount()).isEqualTo(1);
+    }
+
     private static MosaicInputFileAdapter createInputFileAdapter(
             CloseCountingSeekableInputStream inputStream) throws IOException {
         return new MosaicInputFileAdapter(
                 new CloseCountingFileIO(inputStream), new 
Path("file:/tmp/mosaic-reader-test"));
     }
 
+    private static MosaicRecordsReader createRecordsReader(
+            MosaicInputFileAdapter inputFileAdapter,
+            CloseCountingRootAllocator allocator,
+            MosaicReader reader) {
+        return new MosaicRecordsReader(
+                inputFileAdapter,
+                0,
+                rowType(),
+                rowType(),
+                null,
+                new Path("file:/tmp/mosaic-reader-test"),
+                allocator,
+                (inputFile, fileSize, bufferAllocator) -> reader);
+    }
+
+    private static MosaicReader createReader() {
+        MosaicReader reader = mock(MosaicReader.class);
+        when(reader.getSchema()).thenReturn(new 
Schema(Collections.emptyList()));
+        return reader;
+    }
+
     private static RowType rowType() {
         return DataTypes.ROW(DataTypes.INT());
     }
@@ -177,11 +242,23 @@ class MosaicRecordsReaderTest {
 
     private static class CloseCountingRootAllocator extends RootAllocator {
 
+        private final RuntimeException closeFailure;
         private int closeCount;
 
+        private CloseCountingRootAllocator() {
+            this(null);
+        }
+
+        private CloseCountingRootAllocator(RuntimeException closeFailure) {
+            this.closeFailure = closeFailure;
+        }
+
         @Override
         public void close() {
             closeCount++;
+            if (closeFailure != null) {
+                throw closeFailure;
+            }
             super.close();
         }
 

Reply via email to