comphead commented on code in PR #2467:
URL: https://github.com/apache/datafusion-comet/pull/2467#discussion_r2395486489


##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
     }
 
     fn shrink(&self, _: &MemoryReservation, size: usize) {
-        self.release_to_spark(size)
-            .unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
+        if let Err(e) = self.release_to_spark(size) {
+            panic!(
+                "Task {} failed to return {size} bytes to Spark: {e:?}",
+                self.task_attempt_id
+            );
+        }
         if let Err(prev) = self
             .used
             .fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
         {
-            panic!("overflow when releasing {size} of {prev} bytes");
+            panic!(
+                "Task {} overflow when releasing {size} of {prev} bytes",

Review Comment:
   ```suggestion
                   "Task ID {} overflow when releasing {size} of {prev} bytes",
   ```



##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
     fn drop(&mut self) {
         let used = self.used.load(Relaxed);
         if used != 0 {
-            warn!("CometUnifiedMemoryPool dropped with {used} bytes still 
reserved");
+            warn!(
+                "Task {} dropped CometUnifiedMemoryPool with {used} bytes 
still reserved",

Review Comment:
   ```suggestion
                   "Task ID {} dropped CometUnifiedMemoryPool with {used} bytes 
still reserved",
   ```



##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -40,22 +40,36 @@ public class CometTaskMemoryManager {
   /** The id uniquely identifies the native plan this memory manager is 
associated to */
   private final long id;
 
+  private final long taskAttemptId;
+
   public final TaskMemoryManager internal;
   private final NativeMemoryConsumer nativeMemoryConsumer;
   private final AtomicLong used = new AtomicLong();
 
-  public CometTaskMemoryManager(long id) {
+  public CometTaskMemoryManager(long id, long taskAttemptId) {
     this.id = id;
+    this.taskAttemptId = taskAttemptId;
     this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
     this.nativeMemoryConsumer = new NativeMemoryConsumer();
   }
 
   // Called by Comet native through JNI.
   // Returns the actual amount of memory (in bytes) granted.
   public long acquireMemory(long size) {
+    if (logger.isTraceEnabled()) {
+      logger.trace("Task {} requested {} bytes", taskAttemptId, size);

Review Comment:
   ```suggestion
         logger.trace("Task ID  {} requested {} bytes", taskAttemptId, size);
   ```



##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
                 .fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired 
as usize))
             {
                 return Err(resources_datafusion_err!(
-                    "Failed to acquire {} bytes due to overflow. Reserved: {}",
+                    "Task {} failed to acquire {} bytes due to overflow. 
Reserved: {}",

Review Comment:
   ```suggestion
                       "Task ID {} failed to acquire {} bytes due to overflow. 
Reserved: {}",
   ```



##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -64,10 +78,16 @@ public long acquireMemory(long size) {
 
   // Called by Comet native through JNI
   public void releaseMemory(long size) {
+    if (logger.isTraceEnabled()) {
+      logger.trace("Task {} released {} bytes", taskAttemptId, size);

Review Comment:
   ```suggestion
         logger.trace("Task ID {} released {} bytes", taskAttemptId, size);
   ```



##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
                 self.release_to_spark(acquired as usize)?;
 
                 return Err(resources_datafusion_err!(
-                    "Failed to acquire {} bytes, only got {}. Reserved: {}",
+                    "Task {} failed to acquire {} bytes, only got {}. 
Reserved: {}",

Review Comment:
   ```suggestion
                       "Task ID {} failed to acquire {} bytes, only got {}. 
Reserved: {}",
   ```



##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
     }
 
     fn shrink(&self, _: &MemoryReservation, size: usize) {
-        self.release_to_spark(size)
-            .unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
+        if let Err(e) = self.release_to_spark(size) {
+            panic!(
+                "Task {} failed to return {size} bytes to Spark: {e:?}",

Review Comment:
   ```suggestion
                   "Task ID {} failed to return {size} bytes to Spark: {e:?}",
   ```



##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -64,10 +78,16 @@ public long acquireMemory(long size) {
 
   // Called by Comet native through JNI
   public void releaseMemory(long size) {
+    if (logger.isTraceEnabled()) {
+      logger.trace("Task {} released {} bytes", taskAttemptId, size);
+    }
     long newUsed = used.addAndGet(-size);
     if (newUsed < 0) {
       logger.error(
-          "Used memory is negative: " + newUsed + " after releasing memory 
chunk of: " + size);
+          "Task {} used memory is negative ({}) after releasing {} bytes",

Review Comment:
   ```suggestion
             "Task ID {} used memory is negative ({}) after releasing {} bytes",
   ```



##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -40,22 +40,36 @@ public class CometTaskMemoryManager {
   /** The id uniquely identifies the native plan this memory manager is 
associated to */
   private final long id;
 
+  private final long taskAttemptId;
+
   public final TaskMemoryManager internal;
   private final NativeMemoryConsumer nativeMemoryConsumer;
   private final AtomicLong used = new AtomicLong();
 
-  public CometTaskMemoryManager(long id) {
+  public CometTaskMemoryManager(long id, long taskAttemptId) {
     this.id = id;
+    this.taskAttemptId = taskAttemptId;
     this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
     this.nativeMemoryConsumer = new NativeMemoryConsumer();
   }
 
   // Called by Comet native through JNI.
   // Returns the actual amount of memory (in bytes) granted.
   public long acquireMemory(long size) {
+    if (logger.isTraceEnabled()) {
+      logger.trace("Task {} requested {} bytes", taskAttemptId, size);
+    }
     long acquired = internal.acquireExecutionMemory(size, 
nativeMemoryConsumer);
-    used.addAndGet(acquired);
+    long newUsed = used.addAndGet(acquired);
     if (acquired < size) {
+      logger.warn(
+          "Task {} requested {} bytes but only received {} bytes. Current 
allocation is {} and "

Review Comment:
   ```suggestion
             "Task ID {} requested {} bytes but only received {} bytes. Current 
allocation is {} and "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to