This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f2376a8682 [mosaic] Clean up writer resources on construction failure 
(#8129)
f2376a8682 is described below

commit f2376a8682ce4fd24694e75c98254c69dc56582b
Author: QuakeWang <[email protected]>
AuthorDate: Sat Jun 6 16:50:23 2026 +0800

    [mosaic] Clean up writer resources on construction failure (#8129)
---
 .../paimon/arrow/vector/ArrowFormatWriter.java     | 75 ++++++++++++++++--
 .../paimon/arrow/vector/ArrowFormatWriterTest.java | 44 +++++++++++
 .../paimon/format/mosaic/MosaicRecordsWriter.java  | 89 ++++++++++++++++++++--
 .../format/mosaic/MosaicRecordsWriterTest.java     | 77 +++++++++++++++++++
 4 files changed, 273 insertions(+), 12 deletions(-)

diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index e06710760d..6133bfb550 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -54,10 +54,11 @@ public class ArrowFormatWriter implements AutoCloseable {
     private final int batchSize;
     private final BufferAllocator allocator;
     @Nullable private final Long memoryUsedMaxInBytes;
+    private final boolean closeAllocatorOnClose;
     private int rowId;
 
     public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
-        this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), 
null, null);
+        this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), 
null, null, true);
     }
 
     public ArrowFormatWriter(
@@ -71,7 +72,8 @@ public class ArrowFormatWriter implements AutoCloseable {
                 caseSensitive,
                 new RootAllocator(),
                 memoryUsedMaxInBytes,
-                null);
+                null,
+                true);
     }
 
     public ArrowFormatWriter(
@@ -80,7 +82,7 @@ public class ArrowFormatWriter implements AutoCloseable {
             boolean caseSensitive,
             BufferAllocator allocator,
             @Nullable Long memoryUsedMaxInBytes) {
-        this(rowType, writeBatchSize, caseSensitive, allocator, 
memoryUsedMaxInBytes, null);
+        this(rowType, writeBatchSize, caseSensitive, allocator, 
memoryUsedMaxInBytes, null, true);
     }
 
     public ArrowFormatWriter(
@@ -95,7 +97,8 @@ public class ArrowFormatWriter implements AutoCloseable {
                 caseSensitive,
                 new RootAllocator(),
                 memoryUsedMaxInBytes,
-                shreddingSchemas);
+                shreddingSchemas,
+                true);
     }
 
     public ArrowFormatWriter(
@@ -105,6 +108,24 @@ public class ArrowFormatWriter implements AutoCloseable {
             BufferAllocator allocator,
             @Nullable Long memoryUsedMaxInBytes,
             @Nullable RowType shreddingSchemas) {
+        this(
+                rowType,
+                writeBatchSize,
+                caseSensitive,
+                allocator,
+                memoryUsedMaxInBytes,
+                shreddingSchemas,
+                true);
+    }
+
+    private ArrowFormatWriter(
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            BufferAllocator allocator,
+            @Nullable Long memoryUsedMaxInBytes,
+            @Nullable RowType shreddingSchemas,
+            boolean closeAllocatorOnClose) {
         this(
                 rowType,
                 writeBatchSize,
@@ -113,7 +134,8 @@ public class ArrowFormatWriter implements AutoCloseable {
                 memoryUsedMaxInBytes,
                 shreddingSchemas,
                 ArrowFieldTypeConversion.ARROW_FIELD_TYPE_VISITOR,
-                ArrowFieldWriterFactoryVisitor.INSTANCE);
+                ArrowFieldWriterFactoryVisitor.INSTANCE,
+                closeAllocatorOnClose);
     }
 
     public ArrowFormatWriter(
@@ -125,7 +147,30 @@ public class ArrowFormatWriter implements AutoCloseable {
             @Nullable RowType shreddingSchemas,
             ArrowFieldTypeConversion.ArrowFieldTypeVisitor fieldTypeVisitor,
             ArrowFieldWriterFactoryVisitor fieldWriterFactory) {
+        this(
+                rowType,
+                writeBatchSize,
+                caseSensitive,
+                allocator,
+                memoryUsedMaxInBytes,
+                shreddingSchemas,
+                fieldTypeVisitor,
+                fieldWriterFactory,
+                true);
+    }
+
+    private ArrowFormatWriter(
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            BufferAllocator allocator,
+            @Nullable Long memoryUsedMaxInBytes,
+            @Nullable RowType shreddingSchemas,
+            ArrowFieldTypeConversion.ArrowFieldTypeVisitor fieldTypeVisitor,
+            ArrowFieldWriterFactoryVisitor fieldWriterFactory,
+            boolean closeAllocatorOnClose) {
         this.allocator = allocator;
+        this.closeAllocatorOnClose = closeAllocatorOnClose;
 
         RowType outputRowType = replaceWithShreddingType(rowType, 
shreddingSchemas);
         vectorSchemaRoot =
@@ -156,6 +201,22 @@ public class ArrowFormatWriter implements AutoCloseable {
         this.memoryUsedMaxInBytes = memoryUsedMaxInBytes;
     }
 
+    public static ArrowFormatWriter forBorrowedAllocator(
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            BufferAllocator allocator,
+            @Nullable Long memoryUsedMaxInBytes) {
+        return new ArrowFormatWriter(
+                rowType,
+                writeBatchSize,
+                caseSensitive,
+                allocator,
+                memoryUsedMaxInBytes,
+                null,
+                false);
+    }
+
     public void flush() {
         vectorSchemaRoot.setRowCount(rowId);
     }
@@ -221,7 +282,9 @@ public class ArrowFormatWriter implements AutoCloseable {
     @Override
     public void close() {
         vectorSchemaRoot.close();
-        allocator.close();
+        if (closeAllocatorOnClose) {
+            allocator.close();
+        }
     }
 
     public int getBatchSize() {
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index 9d102c9ce2..b445423178 100644
--- 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -463,6 +463,35 @@ public class ArrowFormatWriterTest {
         }
     }
 
+    @Test
+    public void testWriterClosesExternalAllocatorByDefault() {
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator();
+        try {
+            ArrowFormatWriter writer =
+                    new ArrowFormatWriter(PRIMITIVE_TYPE, 4096, true, 
allocator, null);
+            writer.close();
+            assertThat(allocator.closeCount()).isEqualTo(1);
+        } finally {
+            if (allocator.closeCount() == 0) {
+                allocator.close();
+            }
+        }
+    }
+
+    @Test
+    public void testWriterWithBorrowedAllocatorDoesNotCloseAllocator() {
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator();
+        try {
+            ArrowFormatWriter writer =
+                    ArrowFormatWriter.forBorrowedAllocator(
+                            PRIMITIVE_TYPE, 4096, true, allocator, null);
+            writer.close();
+            assertThat(allocator.closeCount()).isZero();
+        } finally {
+            allocator.close();
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void testWriteWithExternalAllocator(boolean allocationFailed) {
@@ -769,4 +798,19 @@ public class ArrowFormatWriterTest {
             assertThat(tsType.getTimezone()).isNull();
         }
     }
+
+    private static class CloseCountingRootAllocator extends RootAllocator {
+
+        private int closeCount;
+
+        @Override
+        public void close() {
+            closeCount++;
+            super.close();
+        }
+
+        int closeCount() {
+            return closeCount;
+        }
+    }
 }
diff --git 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
index fdef0eb365..944f78c4b1 100644
--- 
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
+++ 
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
@@ -58,15 +58,30 @@ public class MosaicRecordsWriter implements 
BundleFormatWriter {
             FileFormatFactory.FormatContext formatContext,
             List<String> statsColumnNames,
             @Nullable Integer numBuckets) {
+        this(
+                outputStream,
+                rowType,
+                formatContext,
+                statsColumnNames,
+                numBuckets,
+                new RootAllocator(),
+                MosaicWriter::new);
+    }
+
+    MosaicRecordsWriter(
+            OutputStream outputStream,
+            RowType rowType,
+            FileFormatFactory.FormatContext formatContext,
+            List<String> statsColumnNames,
+            @Nullable Integer numBuckets,
+            BufferAllocator allocator,
+            NativeWriterFactory nativeWriterFactory) {
         this.statsColumnNames = statsColumnNames;
-        this.allocator = new RootAllocator();
+        this.allocator = allocator;
 
         int writeBatchSize = formatContext.writeBatchSize();
         long writeBatchMemory = formatContext.writeBatchMemory().getBytes();
 
-        this.arrowFormatWriter =
-                new ArrowFormatWriter(rowType, writeBatchSize, true, 
allocator, writeBatchMemory);
-
         WriterOptions options = new 
WriterOptions().zstdLevel(formatContext.zstdLevel());
         if (numBuckets != null) {
             options = options.numBuckets(numBuckets);
@@ -79,8 +94,22 @@ public class MosaicRecordsWriter implements 
BundleFormatWriter {
             options.statsColumns(statsColumnNames.toArray(new String[0]));
         }
 
-        Schema arrowSchema = 
arrowFormatWriter.getVectorSchemaRoot().getSchema();
-        this.nativeWriter = new MosaicWriter(outputStream, arrowSchema, 
options, allocator);
+        ArrowFormatWriter createdArrowWriter = null;
+        MosaicWriter createdNativeWriter = null;
+        try {
+            createdArrowWriter =
+                    ArrowFormatWriter.forBorrowedAllocator(
+                            rowType, writeBatchSize, true, allocator, 
writeBatchMemory);
+            Schema arrowSchema = 
createdArrowWriter.getVectorSchemaRoot().getSchema();
+            createdNativeWriter =
+                    nativeWriterFactory.create(outputStream, arrowSchema, 
options, allocator);
+        } catch (Throwable t) {
+            closeOnConstructionFailure(t, createdNativeWriter, 
createdArrowWriter, allocator);
+            throw rethrowUnchecked(t);
+        }
+
+        this.arrowFormatWriter = createdArrowWriter;
+        this.nativeWriter = createdNativeWriter;
     }
 
     @Override
@@ -196,4 +225,52 @@ public class MosaicRecordsWriter implements 
BundleFormatWriter {
         }
         throw new IOException(throwable);
     }
+
+    private static RuntimeException rethrowUnchecked(Throwable throwable) {
+        if (throwable instanceof RuntimeException) {
+            return (RuntimeException) throwable;
+        }
+        if (throwable instanceof Error) {
+            throw (Error) throwable;
+        }
+        return new RuntimeException(throwable);
+    }
+
+    private static void closeOnConstructionFailure(
+            Throwable throwable,
+            @Nullable MosaicWriter nativeWriter,
+            @Nullable ArrowFormatWriter arrowFormatWriter,
+            BufferAllocator allocator) {
+        try {
+            if (nativeWriter != null) {
+                nativeWriter.close();
+            }
+        } catch (Throwable t) {
+            addSuppressed(throwable, t);
+        }
+
+        try {
+            if (arrowFormatWriter != null) {
+                arrowFormatWriter.close();
+            }
+        } catch (Throwable t) {
+            addSuppressed(throwable, t);
+        }
+
+        try {
+            allocator.close();
+        } catch (Throwable t) {
+            addSuppressed(throwable, t);
+        }
+    }
+
+    @FunctionalInterface
+    interface NativeWriterFactory {
+
+        MosaicWriter create(
+                OutputStream outputStream,
+                Schema arrowSchema,
+                WriterOptions options,
+                BufferAllocator allocator);
+    }
 }
diff --git 
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsWriterTest.java
 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsWriterTest.java
new file mode 100644
index 0000000000..58a8dc5252
--- /dev/null
+++ 
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsWriterTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link MosaicRecordsWriter}. */
+class MosaicRecordsWriterTest {
+
+    @Test
+    void testConstructorFailureClosesCreatedResources() {
+        RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+        FileFormatFactory.FormatContext formatContext =
+                new FileFormatFactory.FormatContext(new Options(), 1024, 1024);
+        CloseCountingRootAllocator allocator = new 
CloseCountingRootAllocator();
+        RuntimeException failure = new RuntimeException("native writer 
failed");
+
+        assertThatThrownBy(
+                        () ->
+                                new MosaicRecordsWriter(
+                                        new ByteArrayOutputStream(),
+                                        rowType,
+                                        formatContext,
+                                        Collections.emptyList(),
+                                        null,
+                                        allocator,
+                                        (outputStream, arrowSchema, options, 
bufferAllocator) -> {
+                                            throw failure;
+                                        }))
+                .isSameAs(failure);
+
+        assertThat(allocator.closeCount()).isEqualTo(1);
+    }
+
+    private static class CloseCountingRootAllocator extends RootAllocator {
+
+        private int closeCount;
+
+        @Override
+        public void close() {
+            closeCount++;
+            super.close();
+        }
+
+        int closeCount() {
+            return closeCount;
+        }
+    }
+}

Reply via email to