This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git


The following commit(s) were added to refs/heads/main by this push:
     new dba937b  Fix JNI Arrow C Data ownership (#10)
dba937b is described below

commit dba937be4876c2018f925d61eb7683c26496d47f
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 20 13:16:36 2026 +0800

    Fix JNI Arrow C Data ownership (#10)
---
 .../org/apache/paimon/mosaic/MosaicWriter.java     | 53 ++++++++++++++++------
 .../apache/paimon/mosaic/MosaicRoundtripTest.java  | 15 ++++++
 jni/src/lib.rs                                     | 16 +++----
 python/mosaic/mosaic.py                            | 12 +++--
 python/tests/test_mosaic.py                        | 12 +++++
 5 files changed, 79 insertions(+), 29 deletions(-)

diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java 
b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
index c23449d..1fa49fe 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
@@ -41,19 +41,22 @@ public class MosaicWriter implements AutoCloseable {
     public MosaicWriter(OutputStream outputStream, Schema arrowSchema, 
WriterOptions options, BufferAllocator allocator) {
         this.allocator = allocator;
         try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
-            Data.exportSchema(allocator, arrowSchema, null, cSchema);
-            this.handle = NativeLib.nativeWriterOpen(
-                    outputStream,
-                    cSchema.memoryAddress(),
-                    options.getNumBuckets(),
-                    options.getCompression(),
-                    options.getZstdLevel(),
-                    options.getRowGroupMaxSize(),
-                    options.getMaxDictTotalBytes(),
-                    options.getMaxDictEntries(),
-                    options.getStatsColumns(),
-                    options.getPageSizeThreshold());
-            cSchema.release();
+            try {
+                Data.exportSchema(allocator, arrowSchema, null, cSchema);
+                this.handle = NativeLib.nativeWriterOpen(
+                        outputStream,
+                        cSchema.memoryAddress(),
+                        options.getNumBuckets(),
+                        options.getCompression(),
+                        options.getZstdLevel(),
+                        options.getRowGroupMaxSize(),
+                        options.getMaxDictTotalBytes(),
+                        options.getMaxDictEntries(),
+                        options.getStatsColumns(),
+                        options.getPageSizeThreshold());
+            } finally {
+                releaseExported(cSchema);
+            }
         }
         if (this.handle == 0) {
             throw new RuntimeException("failed to open writer");
@@ -61,10 +64,30 @@ public class MosaicWriter implements AutoCloseable {
     }
 
     public void write(VectorSchemaRoot root) {
+        if (closed || handle == 0) {
+            throw new IllegalStateException("writer is closed");
+        }
         try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
              ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator)) {
-            Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, 
arrowSchema);
-            NativeLib.nativeWriterWriteBatch(handle, 
arrowArray.memoryAddress(), arrowSchema.memoryAddress());
+            try {
+                Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, 
arrowSchema);
+                NativeLib.nativeWriterWriteBatch(handle, 
arrowArray.memoryAddress(), arrowSchema.memoryAddress());
+            } finally {
+                releaseExported(arrowArray);
+                releaseExported(arrowSchema);
+            }
+        }
+    }
+
+    private static void releaseExported(ArrowSchema schema) {
+        if (schema.snapshot().release != 0) {
+            schema.release();
+        }
+    }
+
+    private static void releaseExported(ArrowArray array) {
+        if (array.snapshot().release != 0) {
+            array.release();
         }
     }
 
diff --git 
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java 
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index ba6ff93..b293296 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -492,6 +492,21 @@ public class MosaicRoundtripTest {
         }
     }
 
+    @Test
+    public void testWriteAfterCloseFailsBeforeExport() {
+        Schema arrowSchema = new Schema(Arrays.asList(
+                Field.nullable("x", new ArrowType.Int(32, true))
+        ));
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        MosaicWriter writer = new MosaicWriter(baos, arrowSchema, allocator);
+        writer.close();
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+            assertThrows(IllegalStateException.class, () -> 
writer.write(root));
+        }
+    }
+
     @Test
     public void testSingleRow() {
         Schema arrowSchema = new Schema(Arrays.asList(
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
index c21ae65..e32ae1c 100644
--- a/jni/src/lib.rs
+++ b/jni/src/lib.rs
@@ -215,8 +215,9 @@ pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterOpen(
             return 0;
         }
 
-        let ffi_schema = unsafe { &*(arrow_schema_addr as *const 
FFI_ArrowSchema) };
-        let arrow_schema = match Schema::try_from(ffi_schema) {
+        let ffi_schema =
+            unsafe { FFI_ArrowSchema::from_raw(arrow_schema_addr as *mut 
FFI_ArrowSchema) };
+        let arrow_schema = match Schema::try_from(&ffi_schema) {
             Ok(s) => s,
             Err(e) => {
                 throw(&mut env, &format!("Arrow schema import failed: {}", e));
@@ -393,9 +394,9 @@ pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterWrite
         let ffi_array = array_addr as *mut FFI_ArrowArray;
         let ffi_schema = schema_addr as *mut FFI_ArrowSchema;
 
-        let arr_data = match unsafe {
-            arrow_array::ffi::from_ffi(ptr::read(ffi_array), 
&ptr::read(ffi_schema))
-        } {
+        let arr_owned = unsafe { FFI_ArrowArray::from_raw(ffi_array) };
+        let schema_owned = unsafe { FFI_ArrowSchema::from_raw(ffi_schema) };
+        let arr_data = match unsafe { arrow_array::ffi::from_ffi(arr_owned, 
&schema_owned) } {
             Ok(d) => d,
             Err(e) => {
                 throw(&mut env, &format!("Arrow import failed: {}", e));
@@ -403,11 +404,6 @@ pub extern "system" fn 
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterWrite
             }
         };
 
-        unsafe {
-            ptr::write(ffi_array, std::mem::zeroed());
-            ptr::write(ffi_schema, std::mem::zeroed());
-        }
-
         let struct_array = StructArray::from(arr_data);
         let batch = RecordBatch::from(struct_array);
         if let Err(e) = writer.inner.write_batch(&batch) {
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 857d12d..5f00749 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -154,13 +154,17 @@ class MosaicWriter:
         return self._pos
 
     def write(self, data):
-        if isinstance(data, pa.Table):
+        is_table = isinstance(data, pa.Table)
+        if not is_table and not isinstance(data, pa.RecordBatch):
+            raise TypeError(f"expected pyarrow.RecordBatch or pyarrow.Table, 
got {type(data)}")
+        if self._closed or not self._handle:
+            raise RuntimeError("writer is closed")
+
+        if is_table:
             for record_batch in data.to_batches():
                 self._write_single_batch(record_batch)
-        elif isinstance(data, pa.RecordBatch):
-            self._write_single_batch(data)
         else:
-            raise TypeError(f"expected pyarrow.RecordBatch or pyarrow.Table, 
got {type(data)}")
+            self._write_single_batch(data)
 
     def _write_single_batch(self, batch):
         c_schema = _ArrowSchema()
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 4f188ff..6ec64fc 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -576,3 +576,15 @@ class TestWriter:
             writer.write(batch)
             est = writer.estimated_file_size()
             assert est > 0
+
+    def test_write_after_close_fails_before_export(self):
+        pa_schema = pa.schema([pa.field("x", pa.int32())])
+        batch = pa.record_batch(
+            [pa.array([1, 2, 3], type=pa.int32())], names=["x"]
+        )
+
+        writer = MosaicWriter(io.BytesIO(), pa_schema)
+        writer.close()
+
+        with pytest.raises(RuntimeError, match="writer is closed"):
+            writer.write(batch)

Reply via email to