This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 086c47739e [mosaic] Clean up reader resources on close (#8152)
086c47739e is described below
commit 086c47739eefc9c3a95c81abe176b40a5fd8ff6f
Author: QuakeWang <[email protected]>
AuthorDate: Mon Jun 8 08:35:31 2026 +0800
[mosaic] Clean up reader resources on close (#8152)
---
.../paimon/format/mosaic/MosaicRecordsReader.java | 52 +++++++++++++--
.../format/mosaic/MosaicRecordsReaderTest.java | 77 ++++++++++++++++++++++
2 files changed, 124 insertions(+), 5 deletions(-)
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
index 43f01c346c..6a81d8aad8 100644
---
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
@@ -233,14 +233,56 @@ public class MosaicRecordsReader implements
FileRecordReader<InternalRow> {
@Override
public void close() throws IOException {
- releaseCurrentVsr();
- reader.close();
- allocator.close();
- inputFileAdapter.close();
+ Throwable throwable = null;
+
+ try {
+ releaseCurrentVsr();
+ } catch (Throwable t) {
+ throwable = t;
+ }
+
+ try {
+ reader.close();
+ } catch (Throwable t) {
+ throwable = addSuppressed(throwable, t);
+ }
+
+ try {
+ allocator.close();
+ } catch (Throwable t) {
+ throwable = addSuppressed(throwable, t);
+ }
+
+ try {
+ inputFileAdapter.close();
+ } catch (Throwable t) {
+ throwable = addSuppressed(throwable, t);
+ }
+
+ if (throwable != null) {
+ rethrow(throwable);
+ }
}
- private static void addSuppressed(Throwable throwable, Throwable
suppressed) {
+ private static Throwable addSuppressed(Throwable throwable, Throwable
suppressed) {
+ if (throwable == null) {
+ return suppressed;
+ }
throwable.addSuppressed(suppressed);
+ return throwable;
+ }
+
+ private static void rethrow(Throwable throwable) throws IOException {
+ if (throwable instanceof IOException) {
+ throw (IOException) throwable;
+ }
+ if (throwable instanceof RuntimeException) {
+ throw (RuntimeException) throwable;
+ }
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ }
+ throw new IOException(throwable);
}
private static RuntimeException rethrowUnchecked(Throwable throwable) {
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
index e8bc3df8f9..f2725dc675 100644
---
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
@@ -26,15 +26,18 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/** Test for {@link MosaicRecordsReader}. */
class MosaicRecordsReaderTest {
@@ -119,12 +122,74 @@ class MosaicRecordsReaderTest {
assertThat(inputStream.closeCount()).isEqualTo(1);
}
+ @Test
+ void testCloseContinuesWhenReaderCloseThrows() throws IOException {
+ CloseCountingSeekableInputStream inputStream = new
CloseCountingSeekableInputStream();
+ MosaicInputFileAdapter inputFileAdapter =
createInputFileAdapter(inputStream);
+ CloseCountingRootAllocator allocator = new
CloseCountingRootAllocator();
+ MosaicReader reader = createReader();
+ RuntimeException failure = new RuntimeException("reader close failed");
+ doThrow(failure).when(reader).close();
+
+ MosaicRecordsReader recordsReader =
+ createRecordsReader(inputFileAdapter, allocator, reader);
+
+ assertThatThrownBy(recordsReader::close).isSameAs(failure);
+
+ verify(reader).close();
+ assertThat(allocator.closeCount()).isEqualTo(1);
+ assertThat(inputStream.closeCount()).isEqualTo(1);
+ }
+
+ @Test
+ void testCloseAddsSuppressedExceptionsFromLaterResources() throws
IOException {
+ CloseCountingSeekableInputStream inputStream = new
CloseCountingSeekableInputStream();
+ MosaicInputFileAdapter inputFileAdapter =
createInputFileAdapter(inputStream);
+ RuntimeException allocatorFailure = new RuntimeException("allocator
close failed");
+ CloseCountingRootAllocator allocator = new
CloseCountingRootAllocator(allocatorFailure);
+ MosaicReader reader = createReader();
+ RuntimeException readerFailure = new RuntimeException("reader close
failed");
+ doThrow(readerFailure).when(reader).close();
+
+ MosaicRecordsReader recordsReader =
+ createRecordsReader(inputFileAdapter, allocator, reader);
+
+ assertThatThrownBy(recordsReader::close)
+ .isSameAs(readerFailure)
+ .satisfies(t ->
assertThat(t.getSuppressed()).containsExactly(allocatorFailure));
+
+ verify(reader).close();
+ assertThat(allocator.closeCount()).isEqualTo(1);
+ assertThat(inputStream.closeCount()).isEqualTo(1);
+ }
+
private static MosaicInputFileAdapter createInputFileAdapter(
CloseCountingSeekableInputStream inputStream) throws IOException {
return new MosaicInputFileAdapter(
new CloseCountingFileIO(inputStream), new
Path("file:/tmp/mosaic-reader-test"));
}
+ private static MosaicRecordsReader createRecordsReader(
+ MosaicInputFileAdapter inputFileAdapter,
+ CloseCountingRootAllocator allocator,
+ MosaicReader reader) {
+ return new MosaicRecordsReader(
+ inputFileAdapter,
+ 0,
+ rowType(),
+ rowType(),
+ null,
+ new Path("file:/tmp/mosaic-reader-test"),
+ allocator,
+ (inputFile, fileSize, bufferAllocator) -> reader);
+ }
+
+ private static MosaicReader createReader() {
+ MosaicReader reader = mock(MosaicReader.class);
+ when(reader.getSchema()).thenReturn(new
Schema(Collections.emptyList()));
+ return reader;
+ }
+
private static RowType rowType() {
return DataTypes.ROW(DataTypes.INT());
}
@@ -177,11 +242,23 @@ class MosaicRecordsReaderTest {
private static class CloseCountingRootAllocator extends RootAllocator {
+ private final RuntimeException closeFailure;
private int closeCount;
+ private CloseCountingRootAllocator() {
+ this(null);
+ }
+
+ private CloseCountingRootAllocator(RuntimeException closeFailure) {
+ this.closeFailure = closeFailure;
+ }
+
@Override
public void close() {
closeCount++;
+ if (closeFailure != null) {
+ throw closeFailure;
+ }
super.close();
}