This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new def45fab9 minor: include taskAttemptId in log messages (#2467)
def45fab9 is described below

commit def45fab90b81e51578594e74202149866605f89
Author: Andy Grove <[email protected]>
AuthorDate: Wed Oct 1 17:36:27 2025 -0600

    minor: include taskAttemptId in log messages (#2467)
---
 native/core/src/execution/memory_pools/mod.rs      |  3 ++-
 .../src/execution/memory_pools/unified_pool.rs     | 31 +++++++++++++++++-----
 .../org/apache/spark/CometTaskMemoryManager.java   | 26 +++++++++++++++---
 .../scala/org/apache/comet/CometExecIterator.scala | 10 +++++--
 4 files changed, 57 insertions(+), 13 deletions(-)

diff --git a/native/core/src/execution/memory_pools/mod.rs 
b/native/core/src/execution/memory_pools/mod.rs
index c1fa5bbca..3e40dc692 100644
--- a/native/core/src/execution/memory_pools/mod.rs
+++ b/native/core/src/execution/memory_pools/mod.rs
@@ -42,7 +42,8 @@ pub(crate) fn create_memory_pool(
     match memory_pool_config.pool_type {
         MemoryPoolType::Unified => {
             // Set Comet memory pool for native
-            let memory_pool = 
CometUnifiedMemoryPool::new(comet_task_memory_manager);
+            let memory_pool =
+                CometUnifiedMemoryPool::new(comet_task_memory_manager, 
task_attempt_id);
             Arc::new(TrackConsumersPool::new(
                 memory_pool,
                 NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
diff --git a/native/core/src/execution/memory_pools/unified_pool.rs 
b/native/core/src/execution/memory_pools/unified_pool.rs
index bbde72258..88b273107 100644
--- a/native/core/src/execution/memory_pools/unified_pool.rs
+++ b/native/core/src/execution/memory_pools/unified_pool.rs
@@ -40,6 +40,7 @@ use log::warn;
 pub struct CometUnifiedMemoryPool {
     task_memory_manager_handle: Arc<GlobalRef>,
     used: AtomicUsize,
+    task_attempt_id: i64,
 }
 
 impl Debug for CometUnifiedMemoryPool {
@@ -51,9 +52,13 @@ impl Debug for CometUnifiedMemoryPool {
 }
 
 impl CometUnifiedMemoryPool {
-    pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> 
CometUnifiedMemoryPool {
+    pub fn new(
+        task_memory_manager_handle: Arc<GlobalRef>,
+        task_attempt_id: i64,
+    ) -> CometUnifiedMemoryPool {
         Self {
             task_memory_manager_handle,
+            task_attempt_id,
             used: AtomicUsize::new(0),
         }
     }
@@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
     fn drop(&mut self) {
         let used = self.used.load(Relaxed);
         if used != 0 {
-            warn!("CometUnifiedMemoryPool dropped with {used} bytes still 
reserved");
+            warn!(
+                "Task {} dropped CometUnifiedMemoryPool with {used} bytes 
still reserved",
+                self.task_attempt_id
+            );
         }
     }
 }
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
     }
 
     fn shrink(&self, _: &MemoryReservation, size: usize) {
-        self.release_to_spark(size)
-            .unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
+        if let Err(e) = self.release_to_spark(size) {
+            panic!(
+                "Task {} failed to return {size} bytes to Spark: {e:?}",
+                self.task_attempt_id
+            );
+        }
         if let Err(prev) = self
             .used
             .fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
         {
-            panic!("overflow when releasing {size} of {prev} bytes");
+            panic!(
+                "Task {} overflow when releasing {size} of {prev} bytes",
+                self.task_attempt_id
+            );
         }
     }
 
@@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
                 self.release_to_spark(acquired as usize)?;
 
                 return Err(resources_datafusion_err!(
-                    "Failed to acquire {} bytes, only got {}. Reserved: {}",
+                    "Task {} failed to acquire {} bytes, only got {}. 
Reserved: {}",
+                    self.task_attempt_id,
                     additional,
                     acquired,
                     self.reserved()
@@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
                 .fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired 
as usize))
             {
                 return Err(resources_datafusion_err!(
-                    "Failed to acquire {} bytes due to overflow. Reserved: {}",
+                    "Task {} failed to acquire {} bytes due to overflow. 
Reserved: {}",
+                    self.task_attempt_id,
                     additional,
                     prev
                 ));
diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java 
b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
index 23c753dd5..3f344da68 100644
--- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
+++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
@@ -40,12 +40,15 @@ public class CometTaskMemoryManager {
   /** The id uniquely identifies the native plan this memory manager is 
associated to */
   private final long id;
 
+  private final long taskAttemptId;
+
   public final TaskMemoryManager internal;
   private final NativeMemoryConsumer nativeMemoryConsumer;
   private final AtomicLong used = new AtomicLong();
 
-  public CometTaskMemoryManager(long id) {
+  public CometTaskMemoryManager(long id, long taskAttemptId) {
     this.id = id;
+    this.taskAttemptId = taskAttemptId;
     this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
     this.nativeMemoryConsumer = new NativeMemoryConsumer();
   }
@@ -53,9 +56,20 @@ public class CometTaskMemoryManager {
   // Called by Comet native through JNI.
   // Returns the actual amount of memory (in bytes) granted.
   public long acquireMemory(long size) {
+    if (logger.isTraceEnabled()) {
+      logger.trace("Task {} requested {} bytes", taskAttemptId, size);
+    }
     long acquired = internal.acquireExecutionMemory(size, 
nativeMemoryConsumer);
-    used.addAndGet(acquired);
+    long newUsed = used.addAndGet(acquired);
     if (acquired < size) {
+      logger.warn(
+          "Task {} requested {} bytes but only received {} bytes. Current 
allocation is {} and "
+              + "the total memory consumption is {} bytes.",
+          taskAttemptId,
+          size,
+          acquired,
+          newUsed,
+          internal.getMemoryConsumptionForThisTask());
       // If memory manager is not able to acquire the requested size, log 
memory usage
       internal.showMemoryUsage();
     }
@@ -64,10 +78,16 @@ public class CometTaskMemoryManager {
 
   // Called by Comet native through JNI
   public void releaseMemory(long size) {
+    if (logger.isTraceEnabled()) {
+      logger.trace("Task {} released {} bytes", taskAttemptId, size);
+    }
     long newUsed = used.addAndGet(-size);
     if (newUsed < 0) {
       logger.error(
-          "Used memory is negative: " + newUsed + " after releasing memory 
chunk of: " + size);
+          "Task {} used memory is negative ({}) after releasing {} bytes",
+          taskAttemptId,
+          newUsed,
+          size);
     }
     internal.releaseExecutionMemory(size, nativeMemoryConsumer);
   }
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala 
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 0bd41d05e..f62dd13e8 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -68,7 +68,8 @@ class CometExecIterator(
   private val memoryMXBean = ManagementFactory.getMemoryMXBean
   private val nativeLib = new Native()
   private val nativeUtil = new NativeUtil()
-  private val cometTaskMemoryManager = new CometTaskMemoryManager(id)
+  private val taskAttemptId = TaskContext.get().taskAttemptId
+  private val cometTaskMemoryManager = new CometTaskMemoryManager(id, 
taskAttemptId)
   private val cometBatchIterators = inputs.map { iterator =>
     new CometBatchIterator(iterator, nativeUtil)
   }.toArray
@@ -116,7 +117,7 @@ class CometExecIterator(
       memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
       memoryLimit,
       memoryLimitPerTask,
-      taskAttemptId = TaskContext.get().taskAttemptId,
+      taskAttemptId,
       debug = COMET_DEBUG_ENABLED.get(),
       explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
       tracingEnabled)
@@ -175,6 +176,11 @@ class CometExecIterator(
         })
     } catch {
       case e: CometNativeException =>
+        // it is generally considered bad practice to log and then rethrow an
+        // exception, but it really helps debugging to be able to see which 
task
+        // threw the exception, so we log the exception with taskAttemptId here
+        logError(s"Native execution for task $taskAttemptId failed", e)
+
         val fileNotFoundPattern: Regex =
           ("""^External: Object at location (.+?) not found: No such file or 
directory """ +
             """\(os error \d+\)$""").r


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to