This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new def45fab9 minor: include taskAttemptId in log messages (#2467)
def45fab9 is described below
commit def45fab90b81e51578594e74202149866605f89
Author: Andy Grove <[email protected]>
AuthorDate: Wed Oct 1 17:36:27 2025 -0600
minor: include taskAttemptId in log messages (#2467)
---
native/core/src/execution/memory_pools/mod.rs | 3 ++-
.../src/execution/memory_pools/unified_pool.rs | 31 +++++++++++++++++-----
.../org/apache/spark/CometTaskMemoryManager.java | 26 +++++++++++++++---
.../scala/org/apache/comet/CometExecIterator.scala | 10 +++++--
4 files changed, 57 insertions(+), 13 deletions(-)
diff --git a/native/core/src/execution/memory_pools/mod.rs
b/native/core/src/execution/memory_pools/mod.rs
index c1fa5bbca..3e40dc692 100644
--- a/native/core/src/execution/memory_pools/mod.rs
+++ b/native/core/src/execution/memory_pools/mod.rs
@@ -42,7 +42,8 @@ pub(crate) fn create_memory_pool(
match memory_pool_config.pool_type {
MemoryPoolType::Unified => {
// Set Comet memory pool for native
- let memory_pool =
CometUnifiedMemoryPool::new(comet_task_memory_manager);
+ let memory_pool =
+ CometUnifiedMemoryPool::new(comet_task_memory_manager,
task_attempt_id);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
diff --git a/native/core/src/execution/memory_pools/unified_pool.rs
b/native/core/src/execution/memory_pools/unified_pool.rs
index bbde72258..88b273107 100644
--- a/native/core/src/execution/memory_pools/unified_pool.rs
+++ b/native/core/src/execution/memory_pools/unified_pool.rs
@@ -40,6 +40,7 @@ use log::warn;
pub struct CometUnifiedMemoryPool {
task_memory_manager_handle: Arc<GlobalRef>,
used: AtomicUsize,
+ task_attempt_id: i64,
}
impl Debug for CometUnifiedMemoryPool {
@@ -51,9 +52,13 @@ impl Debug for CometUnifiedMemoryPool {
}
impl CometUnifiedMemoryPool {
- pub fn new(task_memory_manager_handle: Arc<GlobalRef>) ->
CometUnifiedMemoryPool {
+ pub fn new(
+ task_memory_manager_handle: Arc<GlobalRef>,
+ task_attempt_id: i64,
+ ) -> CometUnifiedMemoryPool {
Self {
task_memory_manager_handle,
+ task_attempt_id,
used: AtomicUsize::new(0),
}
}
@@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
fn drop(&mut self) {
let used = self.used.load(Relaxed);
if used != 0 {
- warn!("CometUnifiedMemoryPool dropped with {used} bytes still
reserved");
+ warn!(
+ "Task {} dropped CometUnifiedMemoryPool with {used} bytes
still reserved",
+ self.task_attempt_id
+ );
}
}
}
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
}
fn shrink(&self, _: &MemoryReservation, size: usize) {
- self.release_to_spark(size)
- .unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
+ if let Err(e) = self.release_to_spark(size) {
+ panic!(
+ "Task {} failed to return {size} bytes to Spark: {e:?}",
+ self.task_attempt_id
+ );
+ }
if let Err(prev) = self
.used
.fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
{
- panic!("overflow when releasing {size} of {prev} bytes");
+ panic!(
+ "Task {} overflow when releasing {size} of {prev} bytes",
+ self.task_attempt_id
+ );
}
}
@@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
self.release_to_spark(acquired as usize)?;
return Err(resources_datafusion_err!(
- "Failed to acquire {} bytes, only got {}. Reserved: {}",
+ "Task {} failed to acquire {} bytes, only got {}.
Reserved: {}",
+ self.task_attempt_id,
additional,
acquired,
self.reserved()
@@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
.fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired
as usize))
{
return Err(resources_datafusion_err!(
- "Failed to acquire {} bytes due to overflow. Reserved: {}",
+ "Task {} failed to acquire {} bytes due to overflow.
Reserved: {}",
+ self.task_attempt_id,
additional,
prev
));
diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
index 23c753dd5..3f344da68 100644
--- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
+++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
@@ -40,12 +40,15 @@ public class CometTaskMemoryManager {
/** The id uniquely identifies the native plan this memory manager is
associated to */
private final long id;
+ private final long taskAttemptId;
+
public final TaskMemoryManager internal;
private final NativeMemoryConsumer nativeMemoryConsumer;
private final AtomicLong used = new AtomicLong();
- public CometTaskMemoryManager(long id) {
+ public CometTaskMemoryManager(long id, long taskAttemptId) {
this.id = id;
+ this.taskAttemptId = taskAttemptId;
this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
this.nativeMemoryConsumer = new NativeMemoryConsumer();
}
@@ -53,9 +56,20 @@ public class CometTaskMemoryManager {
// Called by Comet native through JNI.
// Returns the actual amount of memory (in bytes) granted.
public long acquireMemory(long size) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Task {} requested {} bytes", taskAttemptId, size);
+ }
long acquired = internal.acquireExecutionMemory(size,
nativeMemoryConsumer);
- used.addAndGet(acquired);
+ long newUsed = used.addAndGet(acquired);
if (acquired < size) {
+ logger.warn(
+ "Task {} requested {} bytes but only received {} bytes. Current
allocation is {} and "
+ + "the total memory consumption is {} bytes.",
+ taskAttemptId,
+ size,
+ acquired,
+ newUsed,
+ internal.getMemoryConsumptionForThisTask());
// If memory manager is not able to acquire the requested size, log
memory usage
internal.showMemoryUsage();
}
@@ -64,10 +78,16 @@ public class CometTaskMemoryManager {
// Called by Comet native through JNI
public void releaseMemory(long size) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Task {} released {} bytes", taskAttemptId, size);
+ }
long newUsed = used.addAndGet(-size);
if (newUsed < 0) {
logger.error(
- "Used memory is negative: " + newUsed + " after releasing memory
chunk of: " + size);
+ "Task {} used memory is negative ({}) after releasing {} bytes",
+ taskAttemptId,
+ newUsed,
+ size);
}
internal.releaseExecutionMemory(size, nativeMemoryConsumer);
}
diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
index 0bd41d05e..f62dd13e8 100644
--- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
+++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala
@@ -68,7 +68,8 @@ class CometExecIterator(
private val memoryMXBean = ManagementFactory.getMemoryMXBean
private val nativeLib = new Native()
private val nativeUtil = new NativeUtil()
- private val cometTaskMemoryManager = new CometTaskMemoryManager(id)
+ private val taskAttemptId = TaskContext.get().taskAttemptId
+ private val cometTaskMemoryManager = new CometTaskMemoryManager(id,
taskAttemptId)
private val cometBatchIterators = inputs.map { iterator =>
new CometBatchIterator(iterator, nativeUtil)
}.toArray
@@ -116,7 +117,7 @@ class CometExecIterator(
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
memoryLimit,
memoryLimitPerTask,
- taskAttemptId = TaskContext.get().taskAttemptId,
+ taskAttemptId,
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
tracingEnabled)
@@ -175,6 +176,11 @@ class CometExecIterator(
})
} catch {
case e: CometNativeException =>
+ // it is generally considered bad practice to log and then rethrow an
+ // exception, but it really helps debugging to be able to see which
task
+ // threw the exception, so we log the exception with taskAttemptId here
+ logError(s"Native execution for task $taskAttemptId failed", e)
+
val fileNotFoundPattern: Regex =
("""^External: Object at location (.+?) not found: No such file or
directory """ +
"""\(os error \d+\)$""").r
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]