This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new dffcb1bdb chore: Add memory reservation debug logging (#3489)
dffcb1bdb is described below

commit dffcb1bdb4ab2be1e724b649f99d07f7e70ff425
Author: Andy Grove <[email protected]>
AuthorDate: Thu Feb 12 14:36:24 2026 -0700

    chore: Add memory reservation debug logging (#3489)
---
 .../main/scala/org/apache/comet/CometConf.scala    |  10 ++
 docs/source/contributor-guide/debugging.md         |  28 +++++-
 native/core/src/execution/jni_api.rs               |  12 ++-
 .../src/execution/memory_pools/logging_pool.rs     | 112 +++++++++++++++++++++
 native/core/src/execution/memory_pools/mod.rs      |   1 +
 native/core/src/execution/spark_config.rs          |   1 +
 6 files changed, 161 insertions(+), 3 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 522ccbc94..5ab81ba8d 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -54,6 +54,9 @@ object CometConf extends ShimCometConf {
   private val TRACING_GUIDE = "For more information, refer to the Comet 
Tracing " +
     "Guide 
(https://datafusion.apache.org/comet/contributor-guide/tracing.html)"
 
+  private val DEBUGGING_GUIDE = "For more information, refer to the Comet 
Debugging " +
+    "Guide 
(https://datafusion.apache.org/comet/contributor-guide/debugging.html)"
+
   /** List of all configs that is used for generating documentation */
   val allConfs = new ListBuffer[ConfigEntry[_]]
 
@@ -549,6 +552,13 @@ object CometConf extends ShimCometConf {
       .booleanConf
       .createWithDefault(false)
 
+  val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
+    conf(s"$COMET_PREFIX.debug.memory")
+      .category(CATEGORY_TESTING)
+      .doc(s"When enabled, log all native memory pool interactions. 
$DEBUGGING_GUIDE.")
+      .booleanConf
+      .createWithDefault(false)
+
   val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose"
   val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback"
 
diff --git a/docs/source/contributor-guide/debugging.md 
b/docs/source/contributor-guide/debugging.md
index db5bdfc59..1d2447d2e 100644
--- a/docs/source/contributor-guide/debugging.md
+++ b/docs/source/contributor-guide/debugging.md
@@ -127,7 +127,7 @@ To build Comet with this feature enabled:
 make release COMET_FEATURES=backtrace
 ```
 
-Start Comet with `RUST_BACKTRACE=1`
+Set `RUST_BACKTRACE=1` for the Spark worker/executor process, or for 
`spark-submit` if running in local mode.
 
 ```console
 RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars 
spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar --conf 
spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true 
--conf spark.comet.exec.enabled=true
@@ -188,3 +188,29 @@ This produces output like the following:
 
 Additionally, you can place a `log4rs.yaml` configuration file inside the 
Comet configuration directory specified by the `COMET_CONF_DIR` environment 
variable to enable more advanced logging configurations. This file uses the 
[log4rs YAML configuration 
format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file).
 For example, see: 
[log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml).
+
+### Debugging Memory Reservations
+
+Set `spark.comet.debug.memory=true` to log all calls that grow or shrink 
memory reservations.
+
+Example log output:
+
+```
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
+[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
+[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok
+```
+
+When backtraces are enabled (see earlier section) then backtraces will be 
included for failed allocations.
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index b1e48828f..ed68de5d9 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -82,9 +82,10 @@ use crate::execution::spark_plan::SparkPlan;
 
 use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, 
with_trace};
 
+use crate::execution::memory_pools::logging_pool::LoggingMemoryPool;
 use crate::execution::spark_config::{
-    SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, 
COMET_MAX_TEMP_DIRECTORY_SIZE,
-    COMET_TRACING_ENABLED,
+    SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, 
COMET_EXPLAIN_NATIVE_ENABLED,
+    COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED,
 };
 use crate::parquet::encryption_support::{CometEncryptionFactory, 
ENCRYPTION_FACTORY_ID};
 use datafusion_comet_proto::spark_operator::operator::OpStruct;
@@ -193,6 +194,7 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
         let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
         let max_temp_directory_size =
             spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 
1024 * 1024);
+        let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);
 
         with_trace("createPlan", tracing_enabled, || {
             // Init JVM classes
@@ -229,6 +231,12 @@ pub unsafe extern "system" fn 
Java_org_apache_comet_Native_createPlan(
             let memory_pool =
                 create_memory_pool(&memory_pool_config, task_memory_manager, 
task_attempt_id);
 
+            let memory_pool = if logging_memory_pool {
+                Arc::new(LoggingMemoryPool::new(task_attempt_id as u64, 
memory_pool))
+            } else {
+                memory_pool
+            };
+
             // Get local directories for storing spill files
             let num_local_dirs = env.get_array_length(&local_dirs)?;
             let mut local_dirs_vec = vec![];
diff --git a/native/core/src/execution/memory_pools/logging_pool.rs 
b/native/core/src/execution/memory_pools/logging_pool.rs
new file mode 100644
index 000000000..c23672d01
--- /dev/null
+++ b/native/core/src/execution/memory_pools/logging_pool.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::execution::memory_pool::{
+    MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
+};
+use log::{info, warn};
+use std::sync::Arc;
+
+#[derive(Debug)]
+pub(crate) struct LoggingMemoryPool {
+    task_attempt_id: u64,
+    pool: Arc<dyn MemoryPool>,
+}
+
+impl LoggingMemoryPool {
+    pub fn new(task_attempt_id: u64, pool: Arc<dyn MemoryPool>) -> Self {
+        Self {
+            task_attempt_id,
+            pool,
+        }
+    }
+}
+
+impl MemoryPool for LoggingMemoryPool {
+    fn register(&self, consumer: &MemoryConsumer) {
+        info!(
+            "[Task {}] MemoryPool[{}].register()",
+            self.task_attempt_id,
+            consumer.name(),
+        );
+        self.pool.register(consumer)
+    }
+
+    fn unregister(&self, consumer: &MemoryConsumer) {
+        info!(
+            "[Task {}] MemoryPool[{}].unregister()",
+            self.task_attempt_id,
+            consumer.name(),
+        );
+        self.pool.unregister(consumer)
+    }
+
+    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
+        info!(
+            "[Task {}] MemoryPool[{}].grow({})",
+            self.task_attempt_id,
+            reservation.consumer().name(),
+            additional
+        );
+        self.pool.grow(reservation, additional);
+    }
+
+    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
+        info!(
+            "[Task {}] MemoryPool[{}].shrink({})",
+            self.task_attempt_id,
+            reservation.consumer().name(),
+            shrink
+        );
+        self.pool.shrink(reservation, shrink);
+    }
+
+    fn try_grow(
+        &self,
+        reservation: &MemoryReservation,
+        additional: usize,
+    ) -> datafusion::common::Result<()> {
+        match self.pool.try_grow(reservation, additional) {
+            Ok(_) => {
+                info!(
+                    "[Task {}] MemoryPool[{}].try_grow({}) returning Ok",
+                    self.task_attempt_id,
+                    reservation.consumer().name(),
+                    additional
+                );
+                Ok(())
+            }
+            Err(e) => {
+                warn!(
+                    "[Task {}] MemoryPool[{}].try_grow({}) returning Err: 
{e:?}",
+                    self.task_attempt_id,
+                    reservation.consumer().name(),
+                    additional
+                );
+                Err(e)
+            }
+        }
+    }
+
+    fn reserved(&self) -> usize {
+        self.pool.reserved()
+    }
+
+    fn memory_limit(&self) -> MemoryLimit {
+        self.pool.memory_limit()
+    }
+}
diff --git a/native/core/src/execution/memory_pools/mod.rs 
b/native/core/src/execution/memory_pools/mod.rs
index fc6a81a5e..d8b347335 100644
--- a/native/core/src/execution/memory_pools/mod.rs
+++ b/native/core/src/execution/memory_pools/mod.rs
@@ -17,6 +17,7 @@
 
 mod config;
 mod fair_pool;
+pub mod logging_pool;
 mod task_shared;
 mod unified_pool;
 
diff --git a/native/core/src/execution/spark_config.rs 
b/native/core/src/execution/spark_config.rs
index 60ebb2ff8..b257a5ba6 100644
--- a/native/core/src/execution/spark_config.rs
+++ b/native/core/src/execution/spark_config.rs
@@ -21,6 +21,7 @@ pub(crate) const COMET_TRACING_ENABLED: &str = 
"spark.comet.tracing.enabled";
 pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
 pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = 
"spark.comet.explain.native.enabled";
 pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = 
"spark.comet.maxTempDirectorySize";
+pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";
 
 pub(crate) trait SparkConfig {
     fn get_bool(&self, name: &str) -> bool;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to