This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new 624a39a Fix Java reader handle leak on constructor failure (#14)
624a39a is described below
commit 624a39a89054c68289e62358701aabbe09084a41
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 20 14:23:22 2026 +0800
Fix Java reader handle leak on constructor failure (#14)
---
.../org/apache/paimon/mosaic/MosaicReader.java | 11 ++++--
.../apache/paimon/mosaic/MosaicRoundtripTest.java | 45 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 3 deletions(-)
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
index c59a04b..0b85c9b 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicReader.java
@@ -46,11 +46,16 @@ public class MosaicReader implements AutoCloseable {
}
public static MosaicReader open(InputFile inputFile, long fileLength,
BufferAllocator allocator) {
- long h = NativeLib.nativeReaderOpen(inputFile, fileLength);
- if (h == 0) {
+ long handle = NativeLib.nativeReaderOpen(inputFile, fileLength);
+ if (handle == 0) {
throw new RuntimeException("failed to open reader");
}
- return new MosaicReader(h, allocator);
+ try {
+ return new MosaicReader(handle, allocator);
+ } catch (RuntimeException | Error e) {
+ NativeLib.nativeReaderFree(handle);
+ throw e;
+ }
}
public Schema getSchema() {
diff --git
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index b293296..523c55e 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -20,6 +20,7 @@
package org.apache.paimon.mosaic;
import java.io.ByteArrayOutputStream;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
@@ -81,6 +82,31 @@ public class MosaicRoundtripTest {
return MosaicReader.open(inputFile, data.length, allocator);
}
+ private static void awaitGarbageCollection(WeakReference<?> reference)
throws InterruptedException {
+ for (int i = 0; i < 20 && reference.get() != null; i++) {
+ System.gc();
+ System.runFinalization();
+ Thread.sleep(50L);
+ }
+ assertNull("expected input file to be released after failed open",
reference.get());
+ }
+
+ private WeakReference<InputFile> openReaderWithClosedAllocator(byte[]
data) {
+ BufferAllocator failingAllocator = new RootAllocator();
+ failingAllocator.close();
+
+ InputFile inputFile = new InputFile() {
+ @Override
+ public void readFully(long position, byte[] buffer, int offset,
int length) {
+ System.arraycopy(data, (int) position, buffer, offset, length);
+ }
+ };
+ WeakReference<InputFile> reference = new WeakReference<>(inputFile);
+
+ assertThrows(RuntimeException.class, () ->
MosaicReader.open(inputFile, data.length, failingAllocator));
+ return reference;
+ }
+
@Test
public void testBasicRoundtrip() {
Schema arrowSchema = new Schema(Arrays.asList(
@@ -507,6 +533,25 @@ public class MosaicRoundtripTest {
}
}
+ @Test
+ public void testReaderOpenFreesNativeHandleWhenConstructorFails() throws
Exception {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true))
+ ));
+
+ byte[] data;
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ IntVector xVec = (IntVector) root.getVector("x");
+ xVec.allocateNew(1);
+ xVec.set(0, 1);
+ root.setRowCount(1);
+ data = writeToBytes(arrowSchema, writer -> writer.write(root));
+ }
+
+ WeakReference<InputFile> reference =
openReaderWithClosedAllocator(data);
+ awaitGarbageCollection(reference);
+ }
+
@Test
public void testSingleRow() {
Schema arrowSchema = new Schema(Arrays.asList(