This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 9caeec18f chore: Pass Comet configs to native `createPlan` (#2543)
9caeec18f is described below
commit 9caeec18ffd4a7012b600283bd19f0f85683ea05
Author: Andy Grove <[email protected]>
AuthorDate: Fri Oct 10 09:05:32 2025 -0600
chore: Pass Comet configs to native `createPlan` (#2543)
---
.../main/scala/org/apache/comet/CometConf.scala | 6 ++--
native/core/src/execution/jni_api.rs | 40 +++++++++++----------
native/core/src/execution/mod.rs | 1 +
native/core/src/execution/spark_config.rs | 42 ++++++++++++++++++++++
.../scala/org/apache/comet/CometExecIterator.scala | 8 ++---
spark/src/main/scala/org/apache/comet/Native.scala | 4 ---
6 files changed, 71 insertions(+), 30 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 5931eb25b..05c075057 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -63,9 +63,11 @@ object CometConf extends ShimCometConf {
def conf(key: String): ConfigBuilder = ConfigBuilder(key)
- val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";
+ val COMET_PREFIX = "spark.comet";
- val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";
+ val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";
+
+ val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";
val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.doc(
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index b76108ad9..b17cfa1d9 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -78,6 +78,10 @@ use crate::execution::spark_plan::SparkPlan;
use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end,
with_trace};
+use crate::execution::spark_config::{
+ SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED,
COMET_MAX_TEMP_DIRECTORY_SIZE,
+ COMET_TRACING_ENABLED,
+};
use crate::parquet::encryption_support::{CometEncryptionFactory,
ENCRYPTION_FACTORY_ID};
use datafusion_comet_proto::spark_operator::operator::OpStruct;
use log::info;
@@ -168,14 +172,23 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
memory_limit: jlong,
memory_limit_per_task: jlong,
task_attempt_id: jlong,
- debug_native: jboolean,
- explain_native: jboolean,
- tracing_enabled: jboolean,
- max_temp_directory_size: jlong,
key_unwrapper_obj: JObject,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
- with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
+ // Deserialize Spark configs
+ let array = unsafe {
JPrimitiveArray::from_raw(serialized_spark_configs) };
+ let bytes = env.convert_byte_array(array)?;
+ let spark_configs = serde::deserialize_config(bytes.as_slice())?;
+ let spark_config: HashMap<String, String> =
spark_configs.entries.into_iter().collect();
+
+ // Access Comet configs
+ let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED);
+ let explain_native =
spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED);
+ let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
+ let max_temp_directory_size =
+ spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 *
1024 * 1024);
+
+ with_trace("createPlan", tracing_enabled, || {
// Init JVM classes
JVMClasses::init(&mut env);
@@ -186,15 +199,6 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let bytes = env.convert_byte_array(array)?;
let spark_plan = serde::deserialize_op(bytes.as_slice())?;
- // Deserialize Spark configs
- let array = unsafe {
JPrimitiveArray::from_raw(serialized_spark_configs) };
- let bytes = env.convert_byte_array(array)?;
- let spark_configs = serde::deserialize_config(bytes.as_slice())?;
-
- // Convert Spark configs to HashMap
- let _spark_config_map: HashMap<String, String> =
- spark_configs.entries.into_iter().collect();
-
let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?);
// Get the global references of input sources
@@ -238,7 +242,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
batch_size as usize,
memory_pool,
local_dirs,
- max_temp_directory_size as u64,
+ max_temp_directory_size,
)?;
let plan_creation_time = start.elapsed();
@@ -274,10 +278,10 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
metrics_last_update_time: Instant::now(),
plan_creation_time,
session_ctx: Arc::new(session),
- debug_native: debug_native == 1,
- explain_native: explain_native == 1,
+ debug_native,
+ explain_native,
memory_pool_config,
- tracing_enabled: tracing_enabled != JNI_FALSE,
+ tracing_enabled,
});
Ok(Box::into_raw(exec_context) as i64)
diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs
index c55b96f2a..b8a3d546b 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/core/src/execution/mod.rs
@@ -27,6 +27,7 @@ pub(crate) mod sort;
pub(crate) mod spark_plan;
pub use datafusion_comet_spark_expr::timezone;
mod memory_pools;
+pub(crate) mod spark_config;
pub(crate) mod tracing;
pub(crate) mod utils;
diff --git a/native/core/src/execution/spark_config.rs
b/native/core/src/execution/spark_config.rs
new file mode 100644
index 000000000..60ebb2ff8
--- /dev/null
+++ b/native/core/src/execution/spark_config.rs
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
+pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
+pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str =
"spark.comet.explain.native.enabled";
+pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str =
"spark.comet.maxTempDirectorySize";
+
+pub(crate) trait SparkConfig {
+ fn get_bool(&self, name: &str) -> bool;
+ fn get_u64(&self, name: &str, default_value: u64) -> u64;
+}
+
+impl SparkConfig for HashMap<String, String> {
+ fn get_bool(&self, name: &str) -> bool {
+ self.get(name)
+ .and_then(|str_val| str_val.parse::<bool>().ok())
+ .unwrap_or(false)
+ }
+
+ fn get_u64(&self, name: &str, default_value: u64) -> u64 {
+ self.get(name)
+ .and_then(|str_val| str_val.parse::<u64>().ok())
+ .unwrap_or(default_value)
+ }
+}
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 8603a7b9a..43e200138 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -96,9 +96,9 @@ class CometExecIterator(
CometSparkSessionExtensions.getCometMemoryOverhead(conf)
}
- // serialize Spark conf in protobuf format
+ // serialize Comet related Spark configs in protobuf format
val builder = ConfigMap.newBuilder()
- conf.getAll.foreach { case (k, v) =>
+ conf.getAll.filter(_._1.startsWith(CometConf.COMET_PREFIX)).foreach { case
(k, v) =>
builder.putEntries(k, v)
}
val protobufSparkConfigs = builder.build().toByteArray
@@ -140,10 +140,6 @@ class CometExecIterator(
memoryLimit,
memoryLimitPerTask,
taskAttemptId,
- debug = COMET_DEBUG_ENABLED.get(),
- explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
- tracingEnabled,
- maxTempDirectorySize = CometConf.COMET_MAX_TEMP_DIRECTORY_SIZE.get(),
keyUnwrapper)
}
diff --git a/spark/src/main/scala/org/apache/comet/Native.scala
b/spark/src/main/scala/org/apache/comet/Native.scala
index fb24dce0d..6ef92d0a6 100644
--- a/spark/src/main/scala/org/apache/comet/Native.scala
+++ b/spark/src/main/scala/org/apache/comet/Native.scala
@@ -68,10 +68,6 @@ class Native extends NativeBase {
memoryLimit: Long,
memoryLimitPerTask: Long,
taskAttemptId: Long,
- debug: Boolean,
- explain: Boolean,
- tracingEnabled: Boolean,
- maxTempDirectorySize: Long,
keyUnwrapper: CometFileKeyUnwrapper): Long
// scalastyle:on
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]