This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new c5e78b6b5 feat: respect `batchSize/workerThreads/blockingThreads` 
configurations for native_iceberg_compat scan (#1587)
c5e78b6b5 is described below

commit c5e78b6b59778f0429f0fc8157c6a959bfd9d4c3
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Apr 3 03:09:02 2025 +0800

    feat: respect `batchSize/workerThreads/blockingThreads` configurations for 
native_iceberg_compat scan (#1587)
---
 .../main/java/org/apache/comet/parquet/Native.java    |  5 ++++-
 .../org/apache/comet/parquet/NativeBatchReader.java   | 19 ++++++++++++++++++-
 native/core/src/execution/planner.rs                  |  5 +++++
 native/core/src/parquet/mod.rs                        | 19 +++++++++++++------
 .../apache/comet/parquet/CometParquetFileFormat.scala |  5 +++++
 5 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java 
b/common/src/main/java/org/apache/comet/parquet/Native.java
index 4e50190f8..277e8db23 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -256,7 +256,10 @@ public final class Native extends NativeBase {
       byte[] filter,
       byte[] requiredSchema,
       byte[] dataSchema,
-      String sessionTimezone);
+      String sessionTimezone,
+      int batchSize,
+      int workerThreads,
+      int blockingThreads);
 
   // arrow native version of read batch
   /**
diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 4f6991c5d..ea7d4bd4b 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -353,6 +353,20 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
       }
     }
 
+    int batchSize =
+        conf.getInt(
+            CometConf.COMET_BATCH_SIZE().key(),
+            (Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
+    int workerThreads =
+        conf.getInt(
+            CometConf.COMET_WORKER_THREADS().key(),
+            (Integer) CometConf.COMET_WORKER_THREADS().defaultValue().get());
+    ;
+    int blockingThreads =
+        conf.getInt(
+            CometConf.COMET_BLOCKING_THREADS().key(),
+            (Integer) CometConf.COMET_BLOCKING_THREADS().defaultValue().get());
+    ;
     this.handle =
         Native.initRecordBatchReader(
             filePath,
@@ -362,7 +376,10 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
             nativeFilter,
             serializedRequestedArrowSchema,
             serializedDataArrowSchema,
-            timeZoneId);
+            timeZoneId,
+            batchSize,
+            workerThreads,
+            blockingThreads);
     isInitialized = true;
   }
 
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 826bb73cc..e8c94ec3c 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -175,6 +175,11 @@ impl PhysicalPlanner {
         }
     }
 
+    /// Return session context of this planner.
+    pub fn session_ctx(&self) -> &Arc<SessionContext> {
+        &self.session_ctx
+    }
+
     /// get DataFusion PartitionedFiles from a Spark FilePartition
     fn get_partitioned_files(
         &self,
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 9289d9d42..d522a83aa 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -54,8 +54,9 @@ use crate::parquet::parquet_support::prepare_object_store;
 use arrow::array::{Array, RecordBatch};
 use arrow::buffer::{Buffer, MutableBuffer};
 use datafusion::datasource::listing::PartitionedFile;
-use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::execution::SendableRecordBatchStream;
 use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{SessionConfig, SessionContext};
 use futures::{poll, StreamExt};
 use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, 
JString, ReleaseMode};
 use jni::sys::jstring;
@@ -650,9 +651,15 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
     required_schema: jbyteArray,
     data_schema: jbyteArray,
     session_timezone: jstring,
+    batch_size: jint,
+    worker_threads: jint,
+    blocking_threads: jint,
 ) -> jlong {
     try_unwrap_or_throw(&e, |mut env| unsafe {
-        let task_ctx = TaskContext::default();
+        let session_config = SessionConfig::new().with_batch_size(batch_size 
as usize);
+        let planer =
+            
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)));
+        let session_ctx = planer.session_ctx();
 
         let path: String = env
             .get_string(&JString::from_raw(file_path))
@@ -660,11 +667,13 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
             .into();
 
         let runtime = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(worker_threads as usize)
+            .max_blocking_threads(blocking_threads as usize)
             .enable_all()
             .build()?;
 
         let (object_store_url, object_store_path) =
-            prepare_object_store(task_ctx.runtime_env(), path.clone())?;
+            prepare_object_store(session_ctx.runtime_env(), path.clone())?;
 
         let required_schema_array = JByteArray::from_raw(required_schema);
         let required_schema_buffer = 
env.convert_byte_array(&required_schema_array)?;
@@ -674,8 +683,6 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
         let data_schema_buffer = env.convert_byte_array(&data_schema_array)?;
         let data_schema = 
Arc::new(deserialize_schema(data_schema_buffer.as_bytes())?);
 
-        let planer = PhysicalPlanner::default();
-
         let data_filters = if !filter.is_null() {
             let filter_array = JByteArray::from_raw(filter);
             let filter_buffer = env.convert_byte_array(&filter_array)?;
@@ -708,7 +715,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_parquet_Native_initRecordBat
         )?;
 
         let partition_index: usize = 0;
-        let batch_stream = Some(scan.execute(partition_index, 
Arc::new(task_ctx))?);
+        let batch_stream = Some(scan.execute(partition_index, 
session_ctx.task_ctx())?);
 
         let ctx = BatchContext {
             runtime,
diff --git 
a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala 
b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
index b67f99ad8..cb87f2de3 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala
@@ -232,6 +232,11 @@ object CometParquetFileFormat extends Logging with 
ShimSQLConf {
     hadoopConf.setBoolean(
       CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key,
       CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.get())
+    hadoopConf.setInt(CometConf.COMET_BATCH_SIZE.key, 
CometConf.COMET_BATCH_SIZE.get())
+    hadoopConf.setInt(CometConf.COMET_WORKER_THREADS.key, 
CometConf.COMET_WORKER_THREADS.get())
+    hadoopConf.setInt(
+      CometConf.COMET_BLOCKING_THREADS.key,
+      CometConf.COMET_BLOCKING_THREADS.get())
   }
 
   def getDatetimeRebaseSpec(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to