This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new ac4223c5 chore: Refactor Arrow Array and Schema allocation in
ColumnReader and MetadataColumnReader (#1047)
ac4223c5 is described below
commit ac4223c5765ffa09a51c4947a1f82b15ce255f2f
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Nov 2 00:12:08 2024 -0700
chore: Refactor Arrow Array and Schema allocation in ColumnReader and
MetadataColumnReader (#1047)
* chore: Refactor Arrow Array and Schema allocation in ColumnReader
* For review
---
.../org/apache/comet/parquet/ColumnReader.java | 83 ++++++++++++----------
.../apache/comet/parquet/MetadataColumnReader.java | 20 ++++--
.../main/java/org/apache/comet/parquet/Native.java | 6 +-
native/core/src/parquet/mod.rs | 25 ++-----
4 files changed, 70 insertions(+), 64 deletions(-)
diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
index 9e594804..1cc42f62 100644
--- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CometSchemaImporter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -50,6 +52,7 @@ import org.apache.comet.vector.CometVector;
public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG =
LoggerFactory.getLogger(ColumnReader.class);
+ protected final BufferAllocator ALLOCATOR = new RootAllocator();
/**
* The current Comet vector holding all the values read by this column
reader. Owned by this
@@ -87,6 +90,9 @@ public class ColumnReader extends AbstractColumnReader {
private final CometSchemaImporter importer;
+ private ArrowArray array = null;
+ private ArrowSchema schema = null;
+
public ColumnReader(
DataType type,
ColumnDescriptor descriptor,
@@ -201,53 +207,56 @@ public class ColumnReader extends AbstractColumnReader {
boolean isUuid =
logicalTypeAnnotation instanceof
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
- long[] addresses = Native.currentBatch(nativeHandle);
+ array = ArrowArray.allocateNew(ALLOCATOR);
+ schema = ArrowSchema.allocateNew(ALLOCATOR);
- try (ArrowArray array = ArrowArray.wrap(addresses[0]);
- ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
- FieldVector vector = importer.importVector(array, schema);
+ long arrayAddr = array.memoryAddress();
+ long schemaAddr = schema.memoryAddress();
- DictionaryEncoding dictionaryEncoding =
vector.getField().getDictionary();
+ Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
- CometPlainVector cometVector = new CometPlainVector(vector,
useDecimal128);
+ FieldVector vector = importer.importVector(array, schema);
- // Update whether the current vector contains any null values. This is
used in the following
- // batch(s) to determine whether we can skip loading the native vector.
- hadNull = cometVector.hasNull();
+ DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary();
- if (dictionaryEncoding == null) {
- if (dictionary != null) {
- // This means the column was using dictionary encoding but now has
fall-back to plain
- // encoding, on the native side. Setting 'dictionary' to null here,
so we can use it as
- // a condition to check if we can re-use vector later.
- dictionary = null;
- }
- // Either the column is not dictionary encoded, or it was using
dictionary encoding but
- // a new data page has switched back to use plain encoding. For both
cases we should
- // return plain vector.
- currentVector = cometVector;
- return currentVector;
- }
+ CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128);
+
+ // Update whether the current vector contains any null values. This is
used in the following
+ // batch(s) to determine whether we can skip loading the native vector.
+ hadNull = cometVector.hasNull();
- // We should already re-initiate `CometDictionary` here because
`Data.importVector` API will
- // release the previous dictionary vector and create a new one.
- Dictionary arrowDictionary =
importer.getProvider().lookup(dictionaryEncoding.getId());
- CometPlainVector dictionaryVector =
- new CometPlainVector(arrowDictionary.getVector(), useDecimal128,
isUuid);
+ if (dictionaryEncoding == null) {
if (dictionary != null) {
- dictionary.setDictionaryVector(dictionaryVector);
- } else {
- dictionary = new CometDictionary(dictionaryVector);
+ // This means the column was using dictionary encoding but now has
fall-back to plain
+ // encoding, on the native side. Setting 'dictionary' to null here, so
we can use it as
+ // a condition to check if we can re-use vector later.
+ dictionary = null;
}
-
- currentVector =
- new CometDictionaryVector(
- cometVector, dictionary, importer.getProvider(), useDecimal128,
false, isUuid);
-
- currentVector =
- new CometDictionaryVector(cometVector, dictionary,
importer.getProvider(), useDecimal128);
+ // Either the column is not dictionary encoded, or it was using
dictionary encoding but
+ // a new data page has switched back to use plain encoding. For both
cases we should
+ // return plain vector.
+ currentVector = cometVector;
return currentVector;
}
+
+ // We should already re-initiate `CometDictionary` here because
`Data.importVector` API will
+ // release the previous dictionary vector and create a new one.
+ Dictionary arrowDictionary =
importer.getProvider().lookup(dictionaryEncoding.getId());
+ CometPlainVector dictionaryVector =
+ new CometPlainVector(arrowDictionary.getVector(), useDecimal128,
isUuid);
+ if (dictionary != null) {
+ dictionary.setDictionaryVector(dictionaryVector);
+ } else {
+ dictionary = new CometDictionary(dictionaryVector);
+ }
+
+ currentVector =
+ new CometDictionaryVector(
+ cometVector, dictionary, importer.getProvider(), useDecimal128,
false, isUuid);
+
+ currentVector =
+ new CometDictionaryVector(cometVector, dictionary,
importer.getProvider(), useDecimal128);
+ return currentVector;
}
protected void readPage() {
diff --git
a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
index b8722ca7..13b90e25 100644
--- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
@@ -34,8 +34,12 @@ import org.apache.comet.vector.CometVector;
/** A metadata column reader that can be extended by {@link
RowIndexColumnReader} etc. */
public class MetadataColumnReader extends AbstractColumnReader {
private final BufferAllocator allocator = new RootAllocator();
+
private CometVector vector;
+ private ArrowArray array = null;
+ private ArrowSchema schema = null;
+
public MetadataColumnReader(DataType type, ColumnDescriptor descriptor,
boolean useDecimal128) {
// TODO: should we handle legacy dates & timestamps for metadata columns?
super(type, descriptor, useDecimal128, false);
@@ -50,13 +54,17 @@ public class MetadataColumnReader extends
AbstractColumnReader {
@Override
public void readBatch(int total) {
if (vector == null) {
- long[] addresses = Native.currentBatch(nativeHandle);
- try (ArrowArray array = ArrowArray.wrap(addresses[0]);
- ArrowSchema schema = ArrowSchema.wrap(addresses[1])) {
- FieldVector fieldVector = Data.importVector(allocator, array, schema,
null);
- vector = new CometPlainVector(fieldVector, useDecimal128);
- }
+ array = ArrowArray.allocateNew(allocator);
+ schema = ArrowSchema.allocateNew(allocator);
+
+ long arrayAddr = array.memoryAddress();
+ long schemaAddr = schema.memoryAddress();
+
+ Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
+ FieldVector fieldVector = Data.importVector(allocator, array, schema,
null);
+ vector = new CometPlainVector(fieldVector, useDecimal128);
}
+
vector.setNumValues(total);
}
diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java
b/common/src/main/java/org/apache/comet/parquet/Native.java
index f4820fed..1e666652 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -192,10 +192,10 @@ public final class Native extends NativeBase {
* Returns the current batch constructed via 'readBatch'
*
* @param handle the handle to the native Parquet column reader
- * @return a long array with 2 elements, the first is the address to native
Arrow array, and the
- * second is the address to the Arrow schema.
+ * @param arrayAddr the memory address to the ArrowArray struct
+ * @param schemaAddr the memory address to the ArrowSchema struct
*/
- public static native long[] currentBatch(long handle);
+ public static native void currentBatch(long handle, long arrayAddr, long
schemaAddr);
/** Set methods to set a constant value for the reader, so it'll return
constant vectors */
public static native void setNull(long handle);
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index c523f843..455f1992 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -27,7 +27,7 @@ use std::{boxed::Box, ptr::NonNull, sync::Arc};
use crate::errors::{try_unwrap_or_throw, CometError};
-use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use arrow::ffi::FFI_ArrowArray;
/// JNI exposed methods
use jni::JNIEnv;
@@ -52,7 +52,6 @@ const STR_CLASS_NAME: &str = "java/lang/String";
/// Parquet read context maintained across multiple JNI calls.
struct Context {
pub column_reader: ColumnReader,
- pub arrays: Option<(Arc<FFI_ArrowArray>, Arc<FFI_ArrowSchema>)>,
last_data_page: Option<GlobalRef>,
}
@@ -110,7 +109,6 @@ pub extern "system" fn
Java_org_apache_comet_parquet_Native_initColumnReader(
use_decimal_128 != 0,
use_legacy_date_timestamp != 0,
),
- arrays: None,
last_data_page: None,
};
let res = Box::new(ctx);
@@ -539,24 +537,15 @@ pub extern "system" fn
Java_org_apache_comet_parquet_Native_currentBatch(
e: JNIEnv,
_jclass: JClass,
handle: jlong,
-) -> jlongArray {
- try_unwrap_or_throw(&e, |env| {
+ array_addr: jlong,
+ schema_addr: jlong,
+) {
+ try_unwrap_or_throw(&e, |_env| {
let ctx = get_context(handle)?;
let reader = &mut ctx.column_reader;
let data = reader.current_batch();
- let (array, schema) = data.to_spark()?;
-
- unsafe {
- let arrow_array = Arc::from_raw(array as *const FFI_ArrowArray);
- let arrow_schema = Arc::from_raw(schema as *const FFI_ArrowSchema);
- ctx.arrays = Some((arrow_array, arrow_schema));
-
- let res = env.new_long_array(2)?;
- let buf: [i64; 2] = [array, schema];
- env.set_long_array_region(&res, 0, &buf)
- .expect("set long array region failed");
- Ok(res.into_raw())
- }
+ data.move_to_spark(array_addr, schema_addr)
+ .map_err(|e| e.into())
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]