This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new dffcb1bdb chore: Add memory reservation debug logging (#3489)
dffcb1bdb is described below
commit dffcb1bdb4ab2be1e724b649f99d07f7e70ff425
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 12 14:36:24 2026 -0700
chore: Add memory reservation debug logging (#3489)
---
.../main/scala/org/apache/comet/CometConf.scala | 10 ++
docs/source/contributor-guide/debugging.md | 28 +++++-
native/core/src/execution/jni_api.rs | 12 ++-
.../src/execution/memory_pools/logging_pool.rs | 112 +++++++++++++++++++++
native/core/src/execution/memory_pools/mod.rs | 1 +
native/core/src/execution/spark_config.rs | 1 +
6 files changed, 161 insertions(+), 3 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 522ccbc94..5ab81ba8d 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -54,6 +54,9 @@ object CometConf extends ShimCometConf {
private val TRACING_GUIDE = "For more information, refer to the Comet
Tracing " +
"Guide
(https://datafusion.apache.org/comet/contributor-guide/tracing.html)"
+ private val DEBUGGING_GUIDE = "For more information, refer to the Comet
Debugging " +
+ "Guide
(https://datafusion.apache.org/comet/contributor-guide/debugging.html)"
+
/** List of all configs that is used for generating documentation */
val allConfs = new ListBuffer[ConfigEntry[_]]
@@ -549,6 +552,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)
+ val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
+ conf(s"$COMET_PREFIX.debug.memory")
+ .category(CATEGORY_TESTING)
+ .doc(s"When enabled, log all native memory pool interactions.
$DEBUGGING_GUIDE.")
+ .booleanConf
+ .createWithDefault(false)
+
val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose"
val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback"
diff --git a/docs/source/contributor-guide/debugging.md
b/docs/source/contributor-guide/debugging.md
index db5bdfc59..1d2447d2e 100644
--- a/docs/source/contributor-guide/debugging.md
+++ b/docs/source/contributor-guide/debugging.md
@@ -127,7 +127,7 @@ To build Comet with this feature enabled:
make release COMET_FEATURES=backtrace
```
-Start Comet with `RUST_BACKTRACE=1`
+Set `RUST_BACKTRACE=1` for the Spark worker/executor process, or for
`spark-submit` if running in local mode.
```console
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars
spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar --conf
spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true
--conf spark.comet.exec.enabled=true
@@ -188,3 +188,29 @@ This produces output like the following:
Additionally, you can place a `log4rs.yaml` configuration file inside the
Comet configuration directory specified by the `COMET_CONF_DIR` environment
variable to enable more advanced logging configurations. This file uses the
[log4rs YAML configuration
format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file).
For example, see:
[log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml).
+
+### Debugging Memory Reservations
+
+Set `spark.comet.debug.memory=true` to log all calls that grow or shrink
memory reservations.
+
+Example log output:
+
+```
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
+[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok
+```
+
+When backtraces are enabled (see earlier section) then backtraces will be
included for failed allocations.
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index b1e48828f..ed68de5d9 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -82,9 +82,10 @@ use crate::execution::spark_plan::SparkPlan;
use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end,
with_trace};
+use crate::execution::memory_pools::logging_pool::LoggingMemoryPool;
use crate::execution::spark_config::{
- SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED,
COMET_MAX_TEMP_DIRECTORY_SIZE,
- COMET_TRACING_ENABLED,
+ SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY,
COMET_EXPLAIN_NATIVE_ENABLED,
+ COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED,
};
use crate::parquet::encryption_support::{CometEncryptionFactory,
ENCRYPTION_FACTORY_ID};
use datafusion_comet_proto::spark_operator::operator::OpStruct;
@@ -193,6 +194,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
let max_temp_directory_size =
spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 *
1024 * 1024);
+ let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);
with_trace("createPlan", tracing_enabled, || {
// Init JVM classes
@@ -229,6 +231,12 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_createPlan(
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager,
task_attempt_id);
+ let memory_pool = if logging_memory_pool {
+ Arc::new(LoggingMemoryPool::new(task_attempt_id as u64,
memory_pool))
+ } else {
+ memory_pool
+ };
+
// Get local directories for storing spill files
let num_local_dirs = env.get_array_length(&local_dirs)?;
let mut local_dirs_vec = vec![];
diff --git a/native/core/src/execution/memory_pools/logging_pool.rs
b/native/core/src/execution/memory_pools/logging_pool.rs
new file mode 100644
index 000000000..c23672d01
--- /dev/null
+++ b/native/core/src/execution/memory_pools/logging_pool.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::execution::memory_pool::{
+ MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
+};
+use log::{info, warn};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub(crate) struct LoggingMemoryPool {
+ task_attempt_id: u64,
+ pool: Arc<dyn MemoryPool>,
+}
+
+impl LoggingMemoryPool {
+ pub fn new(task_attempt_id: u64, pool: Arc<dyn MemoryPool>) -> Self {
+ Self {
+ task_attempt_id,
+ pool,
+ }
+ }
+}
+
+impl MemoryPool for LoggingMemoryPool {
+ fn register(&self, consumer: &MemoryConsumer) {
+ info!(
+ "[Task {}] MemoryPool[{}].register()",
+ self.task_attempt_id,
+ consumer.name(),
+ );
+ self.pool.register(consumer)
+ }
+
+ fn unregister(&self, consumer: &MemoryConsumer) {
+ info!(
+ "[Task {}] MemoryPool[{}].unregister()",
+ self.task_attempt_id,
+ consumer.name(),
+ );
+ self.pool.unregister(consumer)
+ }
+
+ fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+ info!(
+ "[Task {}] MemoryPool[{}].grow({})",
+ self.task_attempt_id,
+ reservation.consumer().name(),
+ additional
+ );
+ self.pool.grow(reservation, additional);
+ }
+
+ fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+ info!(
+ "[Task {}] MemoryPool[{}].shrink({})",
+ self.task_attempt_id,
+ reservation.consumer().name(),
+ shrink
+ );
+ self.pool.shrink(reservation, shrink);
+ }
+
+ fn try_grow(
+ &self,
+ reservation: &MemoryReservation,
+ additional: usize,
+ ) -> datafusion::common::Result<()> {
+ match self.pool.try_grow(reservation, additional) {
+ Ok(_) => {
+ info!(
+ "[Task {}] MemoryPool[{}].try_grow({}) returning Ok",
+ self.task_attempt_id,
+ reservation.consumer().name(),
+ additional
+ );
+ Ok(())
+ }
+ Err(e) => {
+ warn!(
+ "[Task {}] MemoryPool[{}].try_grow({}) returning Err:
{e:?}",
+ self.task_attempt_id,
+ reservation.consumer().name(),
+ additional
+ );
+ Err(e)
+ }
+ }
+ }
+
+ fn reserved(&self) -> usize {
+ self.pool.reserved()
+ }
+
+ fn memory_limit(&self) -> MemoryLimit {
+ self.pool.memory_limit()
+ }
+}
diff --git a/native/core/src/execution/memory_pools/mod.rs
b/native/core/src/execution/memory_pools/mod.rs
index fc6a81a5e..d8b347335 100644
--- a/native/core/src/execution/memory_pools/mod.rs
+++ b/native/core/src/execution/memory_pools/mod.rs
@@ -17,6 +17,7 @@
mod config;
mod fair_pool;
+pub mod logging_pool;
mod task_shared;
mod unified_pool;
diff --git a/native/core/src/execution/spark_config.rs
b/native/core/src/execution/spark_config.rs
index 60ebb2ff8..b257a5ba6 100644
--- a/native/core/src/execution/spark_config.rs
+++ b/native/core/src/execution/spark_config.rs
@@ -21,6 +21,7 @@ pub(crate) const COMET_TRACING_ENABLED: &str =
"spark.comet.tracing.enabled";
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str =
"spark.comet.explain.native.enabled";
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str =
"spark.comet.maxTempDirectorySize";
+pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";
pub(crate) trait SparkConfig {
fn get_bool(&self, name: &str) -> bool;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]