This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 9caeec18f chore: Pass Comet configs to native `createPlan` (#2543)
9caeec18f is described below

commit 9caeec18ffd4a7012b600283bd19f0f85683ea05
Author: Andy Grove <[email protected]>
AuthorDate: Fri Oct 10 09:05:32 2025 -0600

    chore: Pass Comet configs to native `createPlan` (#2543)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  6 ++--
 native/core/src/execution/jni_api.rs               | 40 +++++++++++----------
 native/core/src/execution/mod.rs                   |  1 +
 native/core/src/execution/spark_config.rs          | 42 ++++++++++++++++++++++
 .../scala/org/apache/comet/CometExecIterator.scala |  8 ++---
 spark/src/main/scala/org/apache/comet/Native.scala |  4 ---
 6 files changed, 71 insertions(+), 30 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5931eb25b..05c075057 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -63,9 +63,11 @@ object CometConf extends ShimCometConf {
 
   def conf(key: String): ConfigBuilder = ConfigBuilder(key)
 
-  val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";
+  val COMET_PREFIX = "spark.comet";
 
-  val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";
+  val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";
+
+  val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
 
   val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
     .doc(
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index b76108ad9..b17cfa1d9 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -78,6 +78,10 @@ use crate::execution::spark_plan::SparkPlan;
 
 use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, 
with_trace};
 
+use crate::execution::spark_config::{
+    SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, 
COMET_MAX_TEMP_DIRECTORY_SIZE,
+    COMET_TRACING_ENABLED,
+};
 use crate::parquet::encryption_support::{CometEncryptionFactory, 
ENCRYPTION_FACTORY_ID};
 use datafusion_comet_proto::spark_operator::operator::OpStruct;
 use log::info;
@@ -168,14 +172,23 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
     memory_limit: jlong,
     memory_limit_per_task: jlong,
     task_attempt_id: jlong,
-    debug_native: jboolean,
-    explain_native: jboolean,
-    tracing_enabled: jboolean,
-    max_temp_directory_size: jlong,
     key_unwrapper_obj: JObject,
 ) -> jlong {
     try_unwrap_or_throw(&e, |mut env| {
-        with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
+        // Deserialize Spark configs
+        let array = unsafe { 
JPrimitiveArray::from_raw(serialized_spark_configs) };
+        let bytes = env.convert_byte_array(array)?;
+        let spark_configs = serde::deserialize_config(bytes.as_slice())?;
+        let spark_config: HashMap<String, String> = 
spark_configs.entries.into_iter().collect();
+
+        // Access Comet configs
+        let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED);
+        let explain_native = 
spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED);
+        let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
+        let max_temp_directory_size =
+            spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 
1024 * 1024);
+
+        with_trace("createPlan", tracing_enabled, || {
             // Init JVM classes
             JVMClasses::init(&mut env);
 
@@ -186,15 +199,6 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
             let bytes = env.convert_byte_array(array)?;
             let spark_plan = serde::deserialize_op(bytes.as_slice())?;
 
-            // Deserialize Spark configs
-            let array = unsafe { 
JPrimitiveArray::from_raw(serialized_spark_configs) };
-            let bytes = env.convert_byte_array(array)?;
-            let spark_configs = serde::deserialize_config(bytes.as_slice())?;
-
-            // Convert Spark configs to HashMap
-            let _spark_config_map: HashMap<String, String> =
-                spark_configs.entries.into_iter().collect();
-
             let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?);
 
             // Get the global references of input sources
@@ -238,7 +242,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
                 batch_size as usize,
                 memory_pool,
                 local_dirs,
-                max_temp_directory_size as u64,
+                max_temp_directory_size,
             )?;
 
             let plan_creation_time = start.elapsed();
@@ -274,10 +278,10 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
                 metrics_last_update_time: Instant::now(),
                 plan_creation_time,
                 session_ctx: Arc::new(session),
-                debug_native: debug_native == 1,
-                explain_native: explain_native == 1,
+                debug_native,
+                explain_native,
                 memory_pool_config,
-                tracing_enabled: tracing_enabled != JNI_FALSE,
+                tracing_enabled,
             });
 
             Ok(Box::into_raw(exec_context) as i64)
diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs
index c55b96f2a..b8a3d546b 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/core/src/execution/mod.rs
@@ -27,6 +27,7 @@ pub(crate) mod sort;
 pub(crate) mod spark_plan;
 pub use datafusion_comet_spark_expr::timezone;
 mod memory_pools;
+pub(crate) mod spark_config;
 pub(crate) mod tracing;
 pub(crate) mod utils;
 
diff --git a/native/core/src/execution/spark_config.rs 
b/native/core/src/execution/spark_config.rs
new file mode 100644
index 000000000..60ebb2ff8
--- /dev/null
+++ b/native/core/src/execution/spark_config.rs
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
+pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
+pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = 
"spark.comet.explain.native.enabled";
+pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = 
"spark.comet.maxTempDirectorySize";
+
+pub(crate) trait SparkConfig {
+    fn get_bool(&self, name: &str) -> bool;
+    fn get_u64(&self, name: &str, default_value: u64) -> u64;
+}
+
+impl SparkConfig for HashMap<String, String> {
+    fn get_bool(&self, name: &str) -> bool {
+        self.get(name)
+            .and_then(|str_val| str_val.parse::<bool>().ok())
+            .unwrap_or(false)
+    }
+
+    fn get_u64(&self, name: &str, default_value: u64) -> u64 {
+        self.get(name)
+            .and_then(|str_val| str_val.parse::<u64>().ok())
+            .unwrap_or(default_value)
+    }
+}
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 8603a7b9a..43e200138 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -96,9 +96,9 @@ class CometExecIterator(
       CometSparkSessionExtensions.getCometMemoryOverhead(conf)
     }
 
-    // serialize Spark conf in protobuf format
+    // serialize Comet related Spark configs in protobuf format
     val builder = ConfigMap.newBuilder()
-    conf.getAll.foreach { case (k, v) =>
+    conf.getAll.filter(_._1.startsWith(CometConf.COMET_PREFIX)).foreach { case 
(k, v) =>
       builder.putEntries(k, v)
     }
     val protobufSparkConfigs = builder.build().toByteArray
@@ -140,10 +140,6 @@ class CometExecIterator(
       memoryLimit,
       memoryLimitPerTask,
       taskAttemptId,
-      debug = COMET_DEBUG_ENABLED.get(),
-      explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
-      tracingEnabled,
-      maxTempDirectorySize = CometConf.COMET_MAX_TEMP_DIRECTORY_SIZE.get(),
       keyUnwrapper)
   }
 
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala 
b/spark/src/main/scala/org/apache/comet/Native.scala
index fb24dce0d..6ef92d0a6 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -68,10 +68,6 @@ class Native extends NativeBase {
       memoryLimit: Long,
       memoryLimitPerTask: Long,
       taskAttemptId: Long,
-      debug: Boolean,
-      explain: Boolean,
-      tracingEnabled: Boolean,
-      maxTempDirectorySize: Long,
       keyUnwrapper: CometFileKeyUnwrapper): Long
   // scalastyle:on
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to