This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f2376a8682 [mosaic] Clean up writer resources on construction failure
(#8129)
f2376a8682 is described below
commit f2376a8682ce4fd24694e75c98254c69dc56582b
Author: QuakeWang <[email protected]>
AuthorDate: Sat Jun 6 16:50:23 2026 +0800
[mosaic] Clean up writer resources on construction failure (#8129)
---
.../paimon/arrow/vector/ArrowFormatWriter.java | 75 ++++++++++++++++--
.../paimon/arrow/vector/ArrowFormatWriterTest.java | 44 +++++++++++
.../paimon/format/mosaic/MosaicRecordsWriter.java | 89 ++++++++++++++++++++--
.../format/mosaic/MosaicRecordsWriterTest.java | 77 +++++++++++++++++++
4 files changed, 273 insertions(+), 12 deletions(-)
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index e06710760d..6133bfb550 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -54,10 +54,11 @@ public class ArrowFormatWriter implements AutoCloseable {
private final int batchSize;
private final BufferAllocator allocator;
@Nullable private final Long memoryUsedMaxInBytes;
+ private final boolean closeAllocatorOnClose;
private int rowId;
public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
- this(rowType, writeBatchSize, caseSensitive, new RootAllocator(),
null, null);
+ this(rowType, writeBatchSize, caseSensitive, new RootAllocator(),
null, null, true);
}
public ArrowFormatWriter(
@@ -71,7 +72,8 @@ public class ArrowFormatWriter implements AutoCloseable {
caseSensitive,
new RootAllocator(),
memoryUsedMaxInBytes,
- null);
+ null,
+ true);
}
public ArrowFormatWriter(
@@ -80,7 +82,7 @@ public class ArrowFormatWriter implements AutoCloseable {
boolean caseSensitive,
BufferAllocator allocator,
@Nullable Long memoryUsedMaxInBytes) {
- this(rowType, writeBatchSize, caseSensitive, allocator,
memoryUsedMaxInBytes, null);
+ this(rowType, writeBatchSize, caseSensitive, allocator,
memoryUsedMaxInBytes, null, true);
}
public ArrowFormatWriter(
@@ -95,7 +97,8 @@ public class ArrowFormatWriter implements AutoCloseable {
caseSensitive,
new RootAllocator(),
memoryUsedMaxInBytes,
- shreddingSchemas);
+ shreddingSchemas,
+ true);
}
public ArrowFormatWriter(
@@ -105,6 +108,24 @@ public class ArrowFormatWriter implements AutoCloseable {
BufferAllocator allocator,
@Nullable Long memoryUsedMaxInBytes,
@Nullable RowType shreddingSchemas) {
+ this(
+ rowType,
+ writeBatchSize,
+ caseSensitive,
+ allocator,
+ memoryUsedMaxInBytes,
+ shreddingSchemas,
+ true);
+ }
+
+ private ArrowFormatWriter(
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ BufferAllocator allocator,
+ @Nullable Long memoryUsedMaxInBytes,
+ @Nullable RowType shreddingSchemas,
+ boolean closeAllocatorOnClose) {
this(
rowType,
writeBatchSize,
@@ -113,7 +134,8 @@ public class ArrowFormatWriter implements AutoCloseable {
memoryUsedMaxInBytes,
shreddingSchemas,
ArrowFieldTypeConversion.ARROW_FIELD_TYPE_VISITOR,
- ArrowFieldWriterFactoryVisitor.INSTANCE);
+ ArrowFieldWriterFactoryVisitor.INSTANCE,
+ closeAllocatorOnClose);
}
public ArrowFormatWriter(
@@ -125,7 +147,30 @@ public class ArrowFormatWriter implements AutoCloseable {
@Nullable RowType shreddingSchemas,
ArrowFieldTypeConversion.ArrowFieldTypeVisitor fieldTypeVisitor,
ArrowFieldWriterFactoryVisitor fieldWriterFactory) {
+ this(
+ rowType,
+ writeBatchSize,
+ caseSensitive,
+ allocator,
+ memoryUsedMaxInBytes,
+ shreddingSchemas,
+ fieldTypeVisitor,
+ fieldWriterFactory,
+ true);
+ }
+
+ private ArrowFormatWriter(
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ BufferAllocator allocator,
+ @Nullable Long memoryUsedMaxInBytes,
+ @Nullable RowType shreddingSchemas,
+ ArrowFieldTypeConversion.ArrowFieldTypeVisitor fieldTypeVisitor,
+ ArrowFieldWriterFactoryVisitor fieldWriterFactory,
+ boolean closeAllocatorOnClose) {
this.allocator = allocator;
+ this.closeAllocatorOnClose = closeAllocatorOnClose;
RowType outputRowType = replaceWithShreddingType(rowType,
shreddingSchemas);
vectorSchemaRoot =
@@ -156,6 +201,22 @@ public class ArrowFormatWriter implements AutoCloseable {
this.memoryUsedMaxInBytes = memoryUsedMaxInBytes;
}
+ public static ArrowFormatWriter forBorrowedAllocator(
+ RowType rowType,
+ int writeBatchSize,
+ boolean caseSensitive,
+ BufferAllocator allocator,
+ @Nullable Long memoryUsedMaxInBytes) {
+ return new ArrowFormatWriter(
+ rowType,
+ writeBatchSize,
+ caseSensitive,
+ allocator,
+ memoryUsedMaxInBytes,
+ null,
+ false);
+ }
+
public void flush() {
vectorSchemaRoot.setRowCount(rowId);
}
@@ -221,7 +282,9 @@ public class ArrowFormatWriter implements AutoCloseable {
@Override
public void close() {
vectorSchemaRoot.close();
- allocator.close();
+ if (closeAllocatorOnClose) {
+ allocator.close();
+ }
}
public int getBatchSize() {
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index 9d102c9ce2..b445423178 100644
---
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -463,6 +463,35 @@ public class ArrowFormatWriterTest {
}
}
+ @Test
+ public void testWriterClosesExternalAllocatorByDefault() {
+ CloseCountingRootAllocator allocator = new
CloseCountingRootAllocator();
+ try {
+ ArrowFormatWriter writer =
+ new ArrowFormatWriter(PRIMITIVE_TYPE, 4096, true,
allocator, null);
+ writer.close();
+ assertThat(allocator.closeCount()).isEqualTo(1);
+ } finally {
+ if (allocator.closeCount() == 0) {
+ allocator.close();
+ }
+ }
+ }
+
+ @Test
+ public void testWriterWithBorrowedAllocatorDoesNotCloseAllocator() {
+ CloseCountingRootAllocator allocator = new
CloseCountingRootAllocator();
+ try {
+ ArrowFormatWriter writer =
+ ArrowFormatWriter.forBorrowedAllocator(
+ PRIMITIVE_TYPE, 4096, true, allocator, null);
+ writer.close();
+ assertThat(allocator.closeCount()).isZero();
+ } finally {
+ allocator.close();
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testWriteWithExternalAllocator(boolean allocationFailed) {
@@ -769,4 +798,19 @@ public class ArrowFormatWriterTest {
assertThat(tsType.getTimezone()).isNull();
}
}
+
+ private static class CloseCountingRootAllocator extends RootAllocator {
+
+ private int closeCount;
+
+ @Override
+ public void close() {
+ closeCount++;
+ super.close();
+ }
+
+ int closeCount() {
+ return closeCount;
+ }
+ }
}
diff --git
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
index fdef0eb365..944f78c4b1 100644
---
a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
+++
b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java
@@ -58,15 +58,30 @@ public class MosaicRecordsWriter implements
BundleFormatWriter {
FileFormatFactory.FormatContext formatContext,
List<String> statsColumnNames,
@Nullable Integer numBuckets) {
+ this(
+ outputStream,
+ rowType,
+ formatContext,
+ statsColumnNames,
+ numBuckets,
+ new RootAllocator(),
+ MosaicWriter::new);
+ }
+
+ MosaicRecordsWriter(
+ OutputStream outputStream,
+ RowType rowType,
+ FileFormatFactory.FormatContext formatContext,
+ List<String> statsColumnNames,
+ @Nullable Integer numBuckets,
+ BufferAllocator allocator,
+ NativeWriterFactory nativeWriterFactory) {
this.statsColumnNames = statsColumnNames;
- this.allocator = new RootAllocator();
+ this.allocator = allocator;
int writeBatchSize = formatContext.writeBatchSize();
long writeBatchMemory = formatContext.writeBatchMemory().getBytes();
- this.arrowFormatWriter =
- new ArrowFormatWriter(rowType, writeBatchSize, true,
allocator, writeBatchMemory);
-
WriterOptions options = new
WriterOptions().zstdLevel(formatContext.zstdLevel());
if (numBuckets != null) {
options = options.numBuckets(numBuckets);
@@ -79,8 +94,22 @@ public class MosaicRecordsWriter implements
BundleFormatWriter {
options.statsColumns(statsColumnNames.toArray(new String[0]));
}
- Schema arrowSchema =
arrowFormatWriter.getVectorSchemaRoot().getSchema();
- this.nativeWriter = new MosaicWriter(outputStream, arrowSchema,
options, allocator);
+ ArrowFormatWriter createdArrowWriter = null;
+ MosaicWriter createdNativeWriter = null;
+ try {
+ createdArrowWriter =
+ ArrowFormatWriter.forBorrowedAllocator(
+ rowType, writeBatchSize, true, allocator,
writeBatchMemory);
+ Schema arrowSchema =
createdArrowWriter.getVectorSchemaRoot().getSchema();
+ createdNativeWriter =
+ nativeWriterFactory.create(outputStream, arrowSchema,
options, allocator);
+ } catch (Throwable t) {
+ closeOnConstructionFailure(t, createdNativeWriter,
createdArrowWriter, allocator);
+ throw rethrowUnchecked(t);
+ }
+
+ this.arrowFormatWriter = createdArrowWriter;
+ this.nativeWriter = createdNativeWriter;
}
@Override
@@ -196,4 +225,52 @@ public class MosaicRecordsWriter implements
BundleFormatWriter {
}
throw new IOException(throwable);
}
+
+ private static RuntimeException rethrowUnchecked(Throwable throwable) {
+ if (throwable instanceof RuntimeException) {
+ return (RuntimeException) throwable;
+ }
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ }
+ return new RuntimeException(throwable);
+ }
+
+ private static void closeOnConstructionFailure(
+ Throwable throwable,
+ @Nullable MosaicWriter nativeWriter,
+ @Nullable ArrowFormatWriter arrowFormatWriter,
+ BufferAllocator allocator) {
+ try {
+ if (nativeWriter != null) {
+ nativeWriter.close();
+ }
+ } catch (Throwable t) {
+ addSuppressed(throwable, t);
+ }
+
+ try {
+ if (arrowFormatWriter != null) {
+ arrowFormatWriter.close();
+ }
+ } catch (Throwable t) {
+ addSuppressed(throwable, t);
+ }
+
+ try {
+ allocator.close();
+ } catch (Throwable t) {
+ addSuppressed(throwable, t);
+ }
+ }
+
+ @FunctionalInterface
+ interface NativeWriterFactory {
+
+ MosaicWriter create(
+ OutputStream outputStream,
+ Schema arrowSchema,
+ WriterOptions options,
+ BufferAllocator allocator);
+ }
}
diff --git
a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsWriterTest.java
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsWriterTest.java
new file mode 100644
index 0000000000..58a8dc5252
--- /dev/null
+++
b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicRecordsWriterTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.mosaic;
+
+import org.apache.paimon.format.FileFormatFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link MosaicRecordsWriter}. */
+class MosaicRecordsWriterTest {
+
+ @Test
+ void testConstructorFailureClosesCreatedResources() {
+ RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING());
+ FileFormatFactory.FormatContext formatContext =
+ new FileFormatFactory.FormatContext(new Options(), 1024, 1024);
+ CloseCountingRootAllocator allocator = new
CloseCountingRootAllocator();
+ RuntimeException failure = new RuntimeException("native writer
failed");
+
+ assertThatThrownBy(
+ () ->
+ new MosaicRecordsWriter(
+ new ByteArrayOutputStream(),
+ rowType,
+ formatContext,
+ Collections.emptyList(),
+ null,
+ allocator,
+ (outputStream, arrowSchema, options,
bufferAllocator) -> {
+ throw failure;
+ }))
+ .isSameAs(failure);
+
+ assertThat(allocator.closeCount()).isEqualTo(1);
+ }
+
+ private static class CloseCountingRootAllocator extends RootAllocator {
+
+ private int closeCount;
+
+ @Override
+ public void close() {
+ closeCount++;
+ super.close();
+ }
+
+ int closeCount() {
+ return closeCount;
+ }
+ }
+}