This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new ac4223c5 chore: Refactor Arrow Array and Schema allocation in 
ColumnReader and MetadataColumnReader (#1047)
ac4223c5 is described below

commit ac4223c5765ffa09a51c4947a1f82b15ce255f2f
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Nov 2 00:12:08 2024 -0700

    chore: Refactor Arrow Array and Schema allocation in ColumnReader and 
MetadataColumnReader (#1047)
    
    * chore: Refactor Arrow Array and Schema allocation in ColumnReader
    
    * For review
---
 .../org/apache/comet/parquet/ColumnReader.java     | 83 ++++++++++++----------
 .../apache/comet/parquet/MetadataColumnReader.java | 20 ++++--
 .../main/java/org/apache/comet/parquet/Native.java |  6 +-
 native/core/src/parquet/mod.rs                     | 25 ++-----
 4 files changed, 70 insertions(+), 64 deletions(-)

diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index 9e594804..1cc42f62 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.arrow.c.ArrowArray;
 import org.apache.arrow.c.ArrowSchema;
 import org.apache.arrow.c.CometSchemaImporter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -50,6 +52,7 @@ import org.apache.comet.vector.CometVector;
 
 public class ColumnReader extends AbstractColumnReader {
   protected static final Logger LOG = 
LoggerFactory.getLogger(ColumnReader.class);
+  protected final BufferAllocator ALLOCATOR = new RootAllocator();
 
   /**
    * The current Comet vector holding all the values read by this column 
reader. Owned by this
@@ -87,6 +90,9 @@ public class ColumnReader extends AbstractColumnReader {
 
   private final CometSchemaImporter importer;
 
+  private ArrowArray array = null;
+  private ArrowSchema schema = null;
+
   public ColumnReader(
       DataType type,
       ColumnDescriptor descriptor,
@@ -201,53 +207,56 @@ public class ColumnReader extends AbstractColumnReader {
     boolean isUuid =
         logicalTypeAnnotation instanceof 
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
 
-    long[] addresses = Native.currentBatch(nativeHandle);
+    array = ArrowArray.allocateNew(ALLOCATOR);
+    schema = ArrowSchema.allocateNew(ALLOCATOR);
 
-    try (ArrowArray array = ArrowArray.wrap(addresses[0]);
-        ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
-      FieldVector vector = importer.importVector(array, schema);
+    long arrayAddr = array.memoryAddress();
+    long schemaAddr = schema.memoryAddress();
 
-      DictionaryEncoding dictionaryEncoding = 
vector.getField().getDictionary();
+    Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
 
-      CometPlainVector cometVector = new CometPlainVector(vector, 
useDecimal128);
+    FieldVector vector = importer.importVector(array, schema);
 
-      // Update whether the current vector contains any null values. This is 
used in the following
-      // batch(s) to determine whether we can skip loading the native vector.
-      hadNull = cometVector.hasNull();
+    DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
 
-      if (dictionaryEncoding == null) {
-        if (dictionary != null) {
-          // This means the column was using dictionary encoding but now has 
fall-back to plain
-          // encoding, on the native side. Setting 'dictionary' to null here, 
so we can use it as
-          // a condition to check if we can re-use vector later.
-          dictionary = null;
-        }
-        // Either the column is not dictionary encoded, or it was using 
dictionary encoding but
-        // a new data page has switched back to use plain encoding. For both 
cases we should
-        // return plain vector.
-        currentVector = cometVector;
-        return currentVector;
-      }
+    CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
+
+    // Update whether the current vector contains any null values. This is 
used in the following
+    // batch(s) to determine whether we can skip loading the native vector.
+    hadNull = cometVector.hasNull();
 
-      // We should already re-initiate `CometDictionary` here because 
`Data.importVector` API will
-      // release the previous dictionary vector and create a new one.
-      Dictionary arrowDictionary = 
importer.getProvider().lookup(dictionaryEncoding.getId());
-      CometPlainVector dictionaryVector =
-          new CometPlainVector(arrowDictionary.getVector(), useDecimal128, 
isUuid);
+    if (dictionaryEncoding == null) {
       if (dictionary != null) {
-        dictionary.setDictionaryVector(dictionaryVector);
-      } else {
-        dictionary = new CometDictionary(dictionaryVector);
+        // This means the column was using dictionary encoding but now has 
fall-back to plain
+        // encoding, on the native side. Setting 'dictionary' to null here, so 
we can use it as
+        // a condition to check if we can re-use vector later.
+        dictionary = null;
       }
-
-      currentVector =
-          new CometDictionaryVector(
-              cometVector, dictionary, importer.getProvider(), useDecimal128, 
false, isUuid);
-
-      currentVector =
-          new CometDictionaryVector(cometVector, dictionary, 
importer.getProvider(), useDecimal128);
+      // Either the column is not dictionary encoded, or it was using 
dictionary encoding but
+      // a new data page has switched back to use plain encoding. For both 
cases we should
+      // return plain vector.
+      currentVector = cometVector;
       return currentVector;
     }
+
+    // We should already re-initiate `CometDictionary` here because 
`Data.importVector` API will
+    // release the previous dictionary vector and create a new one.
+    Dictionary arrowDictionary = 
importer.getProvider().lookup(dictionaryEncoding.getId());
+    CometPlainVector dictionaryVector =
+        new CometPlainVector(arrowDictionary.getVector(), useDecimal128, 
isUuid);
+    if (dictionary != null) {
+      dictionary.setDictionaryVector(dictionaryVector);
+    } else {
+      dictionary = new CometDictionary(dictionaryVector);
+    }
+
+    currentVector =
+        new CometDictionaryVector(
+            cometVector, dictionary, importer.getProvider(), useDecimal128, 
false, isUuid);
+
+    currentVector =
+        new CometDictionaryVector(cometVector, dictionary, 
importer.getProvider(), useDecimal128);
+    return currentVector;
   }
 
   protected void readPage() {
diff --git 
a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
index b8722ca7..13b90e25 100644
--- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
@@ -34,8 +34,12 @@ import org.apache.comet.vector.CometVector;
 /** A metadata column reader that can be extended by {@link 
RowIndexColumnReader} etc. */
 public class MetadataColumnReader extends AbstractColumnReader {
   private final BufferAllocator allocator = new RootAllocator();
+
   private CometVector vector;
 
+  private ArrowArray array = null;
+  private ArrowSchema schema = null;
+
   public MetadataColumnReader(DataType type, ColumnDescriptor descriptor, 
boolean useDecimal128) {
     // TODO: should we handle legacy dates & timestamps for metadata columns?
     super(type, descriptor, useDecimal128, false);
@@ -50,13 +54,17 @@ public class MetadataColumnReader extends 
AbstractColumnReader {
   @Override
   public void readBatch(int total) {
     if (vector == null) {
-      long[] addresses = Native.currentBatch(nativeHandle);
-      try (ArrowArray array = ArrowArray.wrap(addresses[0]);
-          ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
-        FieldVector fieldVector = Data.importVector(allocator, array, schema, 
null);
-        vector = new CometPlainVector(fieldVector, useDecimal128);
-      }
+      array = ArrowArray.allocateNew(allocator);
+      schema = ArrowSchema.allocateNew(allocator);
+
+      long arrayAddr = array.memoryAddress();
+      long schemaAddr = schema.memoryAddress();
+
+      Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
+      FieldVector fieldVector = Data.importVector(allocator, array, schema, 
null);
+      vector = new CometPlainVector(fieldVector, useDecimal128);
     }
+
     vector.setNumValues(total);
   }
 
diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java 
b/common/src/main/java/org/apache/comet/parquet/Native.java
index f4820fed..1e666652 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -192,10 +192,10 @@ public final class Native extends NativeBase {
    * Returns the current batch constructed via 'readBatch'
    *
    * @param handle the handle to the native Parquet column reader
-   * @return a long array with 2 elements, the first is the address to native 
Arrow array, and the
-   *     second is the address to the Arrow schema.
+   * @param arrayAddr the memory address to the ArrowArray struct
+   * @param schemaAddr the memory address to the ArrowSchema struct
    */
-  public static native long[] currentBatch(long handle);
+  public static native void currentBatch(long handle, long arrayAddr, long 
schemaAddr);
 
   /** Set methods to set a constant value for the reader, so it'll return 
constant vectors */
   public static native void setNull(long handle);
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index c523f843..455f1992 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -27,7 +27,7 @@ use std::{boxed::Box, ptr::NonNull, sync::Arc};
 
 use crate::errors::{try_unwrap_or_throw, CometError};
 
-use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use arrow::ffi::FFI_ArrowArray;
 
 /// JNI exposed methods
 use jni::JNIEnv;
@@ -52,7 +52,6 @@ const STR_CLASS_NAME: &str = "java/lang/String";
 /// Parquet read context maintained across multiple JNI calls.
 struct Context {
     pub column_reader: ColumnReader,
-    pub arrays: Option<(Arc<FFI_ArrowArray>, Arc<FFI_ArrowSchema>)>,
     last_data_page: Option<GlobalRef>,
 }
 
@@ -110,7 +109,6 @@ pub extern "system" fn 
Java_org_apache_comet_parquet_Native_initColumnReader(
                 use_decimal_128 != 0,
                 use_legacy_date_timestamp != 0,
             ),
-            arrays: None,
             last_data_page: None,
         };
         let res = Box::new(ctx);
@@ -539,24 +537,15 @@ pub extern "system" fn 
Java_org_apache_comet_parquet_Native_currentBatch(
     e: JNIEnv,
     _jclass: JClass,
     handle: jlong,
-) -> jlongArray {
-    try_unwrap_or_throw(&e, |env| {
+    array_addr: jlong,
+    schema_addr: jlong,
+) {
+    try_unwrap_or_throw(&e, |_env| {
         let ctx = get_context(handle)?;
         let reader = &mut ctx.column_reader;
         let data = reader.current_batch();
-        let (array, schema) = data.to_spark()?;
-
-        unsafe {
-            let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray);
-            let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema);
-            ctx.arrays = Some((arrow_array, arrow_schema));
-
-            let res = env.new_long_array(2)?;
-            let buf: [i64; 2] = [array, schema];
-            env.set_long_array_region(&res, 0, &buf)
-                .expect("set long array region failed");
-            Ok(res.into_raw())
-        }
+        data.move_to_spark(array_addr, schema_addr)
+            .map_err(|e| e.into())
     })
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to