This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-mosaic.git
The following commit(s) were added to refs/heads/main by this push:
new dba937b Fix JNI Arrow C Data ownership (#10)
dba937b is described below
commit dba937be4876c2018f925d61eb7683c26496d47f
Author: QuakeWang <[email protected]>
AuthorDate: Wed May 20 13:16:36 2026 +0800
Fix JNI Arrow C Data ownership (#10)
---
.../org/apache/paimon/mosaic/MosaicWriter.java | 53 ++++++++++++++++------
.../apache/paimon/mosaic/MosaicRoundtripTest.java | 15 ++++++
jni/src/lib.rs | 16 +++----
python/mosaic/mosaic.py | 12 +++--
python/tests/test_mosaic.py | 12 +++++
5 files changed, 79 insertions(+), 29 deletions(-)
diff --git a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
index c23449d..1fa49fe 100644
--- a/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
+++ b/java/src/main/java/org/apache/paimon/mosaic/MosaicWriter.java
@@ -41,19 +41,22 @@ public class MosaicWriter implements AutoCloseable {
public MosaicWriter(OutputStream outputStream, Schema arrowSchema,
WriterOptions options, BufferAllocator allocator) {
this.allocator = allocator;
try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator)) {
- Data.exportSchema(allocator, arrowSchema, null, cSchema);
- this.handle = NativeLib.nativeWriterOpen(
- outputStream,
- cSchema.memoryAddress(),
- options.getNumBuckets(),
- options.getCompression(),
- options.getZstdLevel(),
- options.getRowGroupMaxSize(),
- options.getMaxDictTotalBytes(),
- options.getMaxDictEntries(),
- options.getStatsColumns(),
- options.getPageSizeThreshold());
- cSchema.release();
+ try {
+ Data.exportSchema(allocator, arrowSchema, null, cSchema);
+ this.handle = NativeLib.nativeWriterOpen(
+ outputStream,
+ cSchema.memoryAddress(),
+ options.getNumBuckets(),
+ options.getCompression(),
+ options.getZstdLevel(),
+ options.getRowGroupMaxSize(),
+ options.getMaxDictTotalBytes(),
+ options.getMaxDictEntries(),
+ options.getStatsColumns(),
+ options.getPageSizeThreshold());
+ } finally {
+ releaseExported(cSchema);
+ }
}
if (this.handle == 0) {
throw new RuntimeException("failed to open writer");
@@ -61,10 +64,30 @@ public class MosaicWriter implements AutoCloseable {
}
public void write(VectorSchemaRoot root) {
+ if (closed || handle == 0) {
+ throw new IllegalStateException("writer is closed");
+ }
try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator)) {
- Data.exportVectorSchemaRoot(allocator, root, null, arrowArray,
arrowSchema);
- NativeLib.nativeWriterWriteBatch(handle,
arrowArray.memoryAddress(), arrowSchema.memoryAddress());
+ try {
+ Data.exportVectorSchemaRoot(allocator, root, null, arrowArray,
arrowSchema);
+ NativeLib.nativeWriterWriteBatch(handle,
arrowArray.memoryAddress(), arrowSchema.memoryAddress());
+ } finally {
+ releaseExported(arrowArray);
+ releaseExported(arrowSchema);
+ }
+ }
+ }
+
+ private static void releaseExported(ArrowSchema schema) {
+ if (schema.snapshot().release != 0) {
+ schema.release();
+ }
+ }
+
+ private static void releaseExported(ArrowArray array) {
+ if (array.snapshot().release != 0) {
+ array.release();
}
}
diff --git
a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
index ba6ff93..b293296 100644
--- a/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
+++ b/java/src/test/java/org/apache/paimon/mosaic/MosaicRoundtripTest.java
@@ -492,6 +492,21 @@ public class MosaicRoundtripTest {
}
}
+ @Test
+ public void testWriteAfterCloseFailsBeforeExport() {
+ Schema arrowSchema = new Schema(Arrays.asList(
+ Field.nullable("x", new ArrowType.Int(32, true))
+ ));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ MosaicWriter writer = new MosaicWriter(baos, arrowSchema, allocator);
+ writer.close();
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ assertThrows(IllegalStateException.class, () ->
writer.write(root));
+ }
+ }
+
@Test
public void testSingleRow() {
Schema arrowSchema = new Schema(Arrays.asList(
diff --git a/jni/src/lib.rs b/jni/src/lib.rs
index c21ae65..e32ae1c 100644
--- a/jni/src/lib.rs
+++ b/jni/src/lib.rs
@@ -215,8 +215,9 @@ pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterOpen(
return 0;
}
- let ffi_schema = unsafe { &*(arrow_schema_addr as *const
FFI_ArrowSchema) };
- let arrow_schema = match Schema::try_from(ffi_schema) {
+ let ffi_schema =
+ unsafe { FFI_ArrowSchema::from_raw(arrow_schema_addr as *mut
FFI_ArrowSchema) };
+ let arrow_schema = match Schema::try_from(&ffi_schema) {
Ok(s) => s,
Err(e) => {
throw(&mut env, &format!("Arrow schema import failed: {}", e));
@@ -393,9 +394,9 @@ pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterWrite
let ffi_array = array_addr as *mut FFI_ArrowArray;
let ffi_schema = schema_addr as *mut FFI_ArrowSchema;
- let arr_data = match unsafe {
- arrow_array::ffi::from_ffi(ptr::read(ffi_array),
&ptr::read(ffi_schema))
- } {
+ let arr_owned = unsafe { FFI_ArrowArray::from_raw(ffi_array) };
+ let schema_owned = unsafe { FFI_ArrowSchema::from_raw(ffi_schema) };
+ let arr_data = match unsafe { arrow_array::ffi::from_ffi(arr_owned,
&schema_owned) } {
Ok(d) => d,
Err(e) => {
throw(&mut env, &format!("Arrow import failed: {}", e));
@@ -403,11 +404,6 @@ pub extern "system" fn
Java_org_apache_paimon_mosaic_NativeLib_nativeWriterWrite
}
};
- unsafe {
- ptr::write(ffi_array, std::mem::zeroed());
- ptr::write(ffi_schema, std::mem::zeroed());
- }
-
let struct_array = StructArray::from(arr_data);
let batch = RecordBatch::from(struct_array);
if let Err(e) = writer.inner.write_batch(&batch) {
diff --git a/python/mosaic/mosaic.py b/python/mosaic/mosaic.py
index 857d12d..5f00749 100644
--- a/python/mosaic/mosaic.py
+++ b/python/mosaic/mosaic.py
@@ -154,13 +154,17 @@ class MosaicWriter:
return self._pos
def write(self, data):
- if isinstance(data, pa.Table):
+ is_table = isinstance(data, pa.Table)
+ if not is_table and not isinstance(data, pa.RecordBatch):
+ raise TypeError(f"expected pyarrow.RecordBatch or pyarrow.Table,
got {type(data)}")
+ if self._closed or not self._handle:
+ raise RuntimeError("writer is closed")
+
+ if is_table:
for record_batch in data.to_batches():
self._write_single_batch(record_batch)
- elif isinstance(data, pa.RecordBatch):
- self._write_single_batch(data)
else:
- raise TypeError(f"expected pyarrow.RecordBatch or pyarrow.Table,
got {type(data)}")
+ self._write_single_batch(data)
def _write_single_batch(self, batch):
c_schema = _ArrowSchema()
diff --git a/python/tests/test_mosaic.py b/python/tests/test_mosaic.py
index 4f188ff..6ec64fc 100644
--- a/python/tests/test_mosaic.py
+++ b/python/tests/test_mosaic.py
@@ -576,3 +576,15 @@ class TestWriter:
writer.write(batch)
est = writer.estimated_file_size()
assert est > 0
+
+ def test_write_after_close_fails_before_export(self):
+ pa_schema = pa.schema([pa.field("x", pa.int32())])
+ batch = pa.record_batch(
+ [pa.array([1, 2, 3], type=pa.int32())], names=["x"]
+ )
+
+ writer = MosaicWriter(io.BytesIO(), pa_schema)
+ writer.close()
+
+ with pytest.raises(RuntimeError, match="writer is closed"):
+ writer.write(batch)