This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea97ae948 [mosaic] Clean up reader resources on construction failure 
(#8144)
2ea97ae948 is described below

commit 2ea97ae948564f4c9bb40e10f364f7423666f46c
Author: QuakeWang <[email protected]>
AuthorDate: Sun Jun 7 19:50:25 2026 +0800

    [mosaic] Clean up reader resources on construction failure (#8144)
---
 paimon-mosaic/pom.xml                              |   7 +
 .../paimon/format/mosaic/MosaicRecordsReader.java  | 116 ++++++++++---
 .../format/mosaic/MosaicRecordsReaderTest.java     | 192 +++++++++++++++++++++
 3 files changed, 294 insertions(+), 21 deletions(-)

diff --git a/paimon-mosaic/pom.xml b/paimon-mosaic/pom.xml
index 6e0d1eae28..a265af539b 100644
--- a/paimon-mosaic/pom.xml
+++ b/paimon-mosaic/pom.xml
@@ -82,5 +82,12 @@ under the License.
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
index 24cdcbf05c..43f01c346c 100644
--- 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java
@@ -73,38 +73,65 @@ public class MosaicRecordsReader implements 
FileRecordReader<InternalRow> {
             RowType projectedRowType,
             @Nullable List<Predicate> predicates,
             Path filePath) {
+        this(
+                inputFileAdapter,
+                fileSize,
+                dataSchemaRowType,
+                projectedRowType,
+                predicates,
+                filePath,
+                new RootAllocator(),
+                MosaicReader::open);
+    }
+
+    MosaicRecordsReader(
+            MosaicInputFileAdapter inputFileAdapter,
+            long fileSize,
+            RowType dataSchemaRowType,
+            RowType projectedRowType,
+            @Nullable List<Predicate> predicates,
+            Path filePath,
+            BufferAllocator allocator,
+            NativeReaderOpener nativeReaderOpener) {
         this.filePath = filePath;
         this.inputFileAdapter = inputFileAdapter;
         this.dataSchemaRowType = dataSchemaRowType;
         this.predicates = predicates;
-        this.allocator = new RootAllocator();
+        this.allocator = allocator;
 
+        MosaicReader createdReader = null;
+        int createdNumRowGroups;
+        ArrowBatchReader createdArrowBatchReader;
         try {
-            this.reader = MosaicReader.open(inputFileAdapter, fileSize, 
allocator);
-        } catch (Exception e) {
-            allocator.close();
-            throw e;
-        }
+            createdReader = nativeReaderOpener.open(inputFileAdapter, 
fileSize, allocator);
 
-        Schema fileSchema = reader.getSchema();
-        Set<String> fileColumnNames = new HashSet<>();
-        for (Field field : fileSchema.getFields()) {
-            fileColumnNames.add(field.getName());
-        }
-        List<String> projectedNames = projectedRowType.getFieldNames();
-        List<String> existingColumns = new ArrayList<>();
-        for (String name : projectedNames) {
-            if (fileColumnNames.contains(name)) {
-                existingColumns.add(name);
+            Schema fileSchema = createdReader.getSchema();
+            Set<String> fileColumnNames = new HashSet<>();
+            for (Field field : fileSchema.getFields()) {
+                fileColumnNames.add(field.getName());
             }
-        }
-        if (!existingColumns.isEmpty()) {
-            reader.project(existingColumns.toArray(new String[0]));
+            List<String> projectedNames = projectedRowType.getFieldNames();
+            List<String> existingColumns = new ArrayList<>();
+            for (String name : projectedNames) {
+                if (fileColumnNames.contains(name)) {
+                    existingColumns.add(name);
+                }
+            }
+            if (!existingColumns.isEmpty()) {
+                createdReader.project(existingColumns.toArray(new String[0]));
+            }
+
+            createdNumRowGroups = createdReader.numRowGroups();
+            createdArrowBatchReader = new ArrowBatchReader(projectedRowType, 
true);
+        } catch (Throwable t) {
+            closeOnConstructionFailure(t, createdReader, allocator, 
inputFileAdapter);
+            throw rethrowUnchecked(t);
         }
 
-        this.numRowGroups = reader.numRowGroups();
+        this.reader = createdReader;
+        this.numRowGroups = createdNumRowGroups;
         this.currentRowGroup = 0;
-        this.arrowBatchReader = new ArrowBatchReader(projectedRowType, true);
+        this.arrowBatchReader = createdArrowBatchReader;
     }
 
     @Nullable
@@ -211,4 +238,51 @@ public class MosaicRecordsReader implements 
FileRecordReader<InternalRow> {
         allocator.close();
         inputFileAdapter.close();
     }
+
+    private static void addSuppressed(Throwable throwable, Throwable 
suppressed) {
+        throwable.addSuppressed(suppressed);
+    }
+
+    private static RuntimeException rethrowUnchecked(Throwable throwable) {
+        if (throwable instanceof RuntimeException) {
+            return (RuntimeException) throwable;
+        }
+        if (throwable instanceof Error) {
+            throw (Error) throwable;
+        }
+        return new RuntimeException(throwable);
+    }
+
+    private static void closeOnConstructionFailure(
+            Throwable throwable,
+            @Nullable MosaicReader reader,
+            BufferAllocator allocator,
+            MosaicInputFileAdapter inputFileAdapter) {
+        try {
+            if (reader != null) {
+                reader.close();
+            }
+        } catch (Throwable t) {
+            addSuppressed(throwable, t);
+        }
+
+        try {
+            allocator.close();
+        } catch (Throwable t) {
+            addSuppressed(throwable, t);
+        }
+
+        try {
+            inputFileAdapter.close();
+        } catch (Throwable t) {
+            addSuppressed(throwable, t);
+        }
+    }
+
+    @FunctionalInterface
+    interface NativeReaderOpener {
+
+        MosaicReader open(
+                MosaicInputFileAdapter inputFileAdapter, long fileSize, 
BufferAllocator allocator);
+    }
 }
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
new file mode 100644
index 0000000000..e8bc3df8f9
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsReaderTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.mosaic.MosaicReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/** Test for {@link MosaicRecordsReader}. */
+class MosaicRecordsReaderTest {
+
+    @Test
+    void testConstructorRuntimeExceptionClosesCreatedResources() throws 
IOException {
+        CloseCountingSeekableInputStream inputStream = new 
CloseCountingSeekableInputStream();
+        MosaicInputFileAdapter inputFileAdapter = 
createInputFileAdapter(inputStream);
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator();
+        RuntimeException failure = new RuntimeException("native reader 
failed");
+
+        assertThatThrownBy(
+                        () ->
+                                new MosaicRecordsReader(
+                                        inputFileAdapter,
+                                        0,
+                                        rowType(),
+                                        rowType(),
+                                        null,
+                                        new 
Path("file:/tmp/mosaic-reader-test"),
+                                        allocator,
+                                        (inputFile, fileSize, bufferAllocator) 
-> {
+                                            throw failure;
+                                        }))
+                .isSameAs(failure);
+
+        assertThat(allocator.closeCount()).isEqualTo(1);
+        assertThat(inputStream.closeCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testConstructorErrorClosesCreatedResources() throws IOException {
+        CloseCountingSeekableInputStream inputStream = new 
CloseCountingSeekableInputStream();
+        MosaicInputFileAdapter inputFileAdapter = 
createInputFileAdapter(inputStream);
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator();
+        UnsatisfiedLinkError failure = new UnsatisfiedLinkError("native 
library failed");
+
+        assertThatThrownBy(
+                        () ->
+                                new MosaicRecordsReader(
+                                        inputFileAdapter,
+                                        0,
+                                        rowType(),
+                                        rowType(),
+                                        null,
+                                        new 
Path("file:/tmp/mosaic-reader-test"),
+                                        allocator,
+                                        (inputFile, fileSize, bufferAllocator) 
-> {
+                                            throw failure;
+                                        }))
+                .isSameAs(failure);
+
+        assertThat(allocator.closeCount()).isEqualTo(1);
+        assertThat(inputStream.closeCount()).isEqualTo(1);
+    }
+
+    @Test
+    void 
testConstructorFailureAfterReaderCreatedClosesReaderAndOtherResources()
+            throws IOException {
+        CloseCountingSeekableInputStream inputStream = new 
CloseCountingSeekableInputStream();
+        MosaicInputFileAdapter inputFileAdapter = 
createInputFileAdapter(inputStream);
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator();
+        MosaicReader reader = mock(MosaicReader.class);
+        RuntimeException failure = new RuntimeException("schema failed");
+        doThrow(failure).when(reader).getSchema();
+
+        assertThatThrownBy(
+                        () ->
+                                new MosaicRecordsReader(
+                                        inputFileAdapter,
+                                        0,
+                                        rowType(),
+                                        rowType(),
+                                        null,
+                                        new 
Path("file:/tmp/mosaic-reader-test"),
+                                        allocator,
+                                        (inputFile, fileSize, bufferAllocator) 
-> reader))
+                .isSameAs(failure);
+
+        verify(reader).close();
+        assertThat(allocator.closeCount()).isEqualTo(1);
+        assertThat(inputStream.closeCount()).isEqualTo(1);
+    }
+
+    private static MosaicInputFileAdapter createInputFileAdapter(
+            CloseCountingSeekableInputStream inputStream) throws IOException {
+        return new MosaicInputFileAdapter(
+                new CloseCountingFileIO(inputStream), new 
Path("file:/tmp/mosaic-reader-test"));
+    }
+
+    private static RowType rowType() {
+        return DataTypes.ROW(DataTypes.INT());
+    }
+
+    private static class CloseCountingFileIO extends LocalFileIO {
+
+        private final CloseCountingSeekableInputStream inputStream;
+
+        private CloseCountingFileIO(CloseCountingSeekableInputStream 
inputStream) {
+            this.inputStream = inputStream;
+        }
+
+        @Override
+        public SeekableInputStream newInputStream(Path path) {
+            return inputStream;
+        }
+    }
+
+    private static class CloseCountingSeekableInputStream extends 
SeekableInputStream {
+
+        private int closeCount;
+
+        @Override
+        public void seek(long desired) {}
+
+        @Override
+        public long getPos() {
+            return 0;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return -1;
+        }
+
+        @Override
+        public int read() {
+            return -1;
+        }
+
+        @Override
+        public void close() {
+            closeCount++;
+        }
+
+        int closeCount() {
+            return closeCount;
+        }
+    }
+
+    private static class CloseCountingRootAllocator extends RootAllocator {
+
+        private int closeCount;
+
+        @Override
+        public void close() {
+            closeCount++;
+            super.close();
+        }
+
+        int closeCount() {
+            return closeCount;
+        }
+    }
+}

Reply via email to