comphead commented on code in PR #2467:
URL: https://github.com/apache/datafusion-comet/pull/2467#discussion_r2395486489
##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
}
fn shrink(&self, _: &MemoryReservation, size: usize) {
- self.release_to_spark(size)
- .unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
+ if let Err(e) = self.release_to_spark(size) {
+ panic!(
+ "Task {} failed to return {size} bytes to Spark: {e:?}",
+ self.task_attempt_id
+ );
+ }
if let Err(prev) = self
.used
.fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
{
- panic!("overflow when releasing {size} of {prev} bytes");
+ panic!(
+ "Task {} overflow when releasing {size} of {prev} bytes",
Review Comment:
```suggestion
"Task ID {} overflow when releasing {size} of {prev} bytes",
```
##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
fn drop(&mut self) {
let used = self.used.load(Relaxed);
if used != 0 {
- warn!("CometUnifiedMemoryPool dropped with {used} bytes still
reserved");
+ warn!(
+ "Task {} dropped CometUnifiedMemoryPool with {used} bytes
still reserved",
Review Comment:
```suggestion
"Task ID {} dropped CometUnifiedMemoryPool with {used} bytes
still reserved",
```
##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -40,22 +40,36 @@ public class CometTaskMemoryManager {
/** The id uniquely identifies the native plan this memory manager is
associated to */
private final long id;
+ private final long taskAttemptId;
+
public final TaskMemoryManager internal;
private final NativeMemoryConsumer nativeMemoryConsumer;
private final AtomicLong used = new AtomicLong();
- public CometTaskMemoryManager(long id) {
+ public CometTaskMemoryManager(long id, long taskAttemptId) {
this.id = id;
+ this.taskAttemptId = taskAttemptId;
this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
this.nativeMemoryConsumer = new NativeMemoryConsumer();
}
// Called by Comet native through JNI.
// Returns the actual amount of memory (in bytes) granted.
public long acquireMemory(long size) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Task {} requested {} bytes", taskAttemptId, size);
Review Comment:
```suggestion
logger.trace("Task ID {} requested {} bytes", taskAttemptId, size);
```
##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
.fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired
as usize))
{
return Err(resources_datafusion_err!(
- "Failed to acquire {} bytes due to overflow. Reserved: {}",
+ "Task {} failed to acquire {} bytes due to overflow.
Reserved: {}",
Review Comment:
```suggestion
"Task ID {} failed to acquire {} bytes due to overflow.
Reserved: {}",
```
##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -64,10 +78,16 @@ public long acquireMemory(long size) {
// Called by Comet native through JNI
public void releaseMemory(long size) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Task {} released {} bytes", taskAttemptId, size);
Review Comment:
```suggestion
logger.trace("Task ID {} released {} bytes", taskAttemptId, size);
```
##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
self.release_to_spark(acquired as usize)?;
return Err(resources_datafusion_err!(
- "Failed to acquire {} bytes, only got {}. Reserved: {}",
+ "Task {} failed to acquire {} bytes, only got {}.
Reserved: {}",
Review Comment:
```suggestion
"Task ID {} failed to acquire {} bytes, only got {}.
Reserved: {}",
```
##########
native/core/src/execution/memory_pools/unified_pool.rs:
##########
@@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
}
fn shrink(&self, _: &MemoryReservation, size: usize) {
- self.release_to_spark(size)
- .unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
+ if let Err(e) = self.release_to_spark(size) {
+ panic!(
+ "Task {} failed to return {size} bytes to Spark: {e:?}",
Review Comment:
```suggestion
"Task ID {} failed to return {size} bytes to Spark: {e:?}",
```
##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -64,10 +78,16 @@ public long acquireMemory(long size) {
// Called by Comet native through JNI
public void releaseMemory(long size) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Task {} released {} bytes", taskAttemptId, size);
+ }
long newUsed = used.addAndGet(-size);
if (newUsed < 0) {
logger.error(
- "Used memory is negative: " + newUsed + " after releasing memory
chunk of: " + size);
+ "Task {} used memory is negative ({}) after releasing {} bytes",
Review Comment:
```suggestion
"Task ID {} used memory is negative ({}) after releasing {} bytes",
```
##########
spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java:
##########
@@ -40,22 +40,36 @@ public class CometTaskMemoryManager {
/** The id uniquely identifies the native plan this memory manager is
associated to */
private final long id;
+ private final long taskAttemptId;
+
public final TaskMemoryManager internal;
private final NativeMemoryConsumer nativeMemoryConsumer;
private final AtomicLong used = new AtomicLong();
- public CometTaskMemoryManager(long id) {
+ public CometTaskMemoryManager(long id, long taskAttemptId) {
this.id = id;
+ this.taskAttemptId = taskAttemptId;
this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
this.nativeMemoryConsumer = new NativeMemoryConsumer();
}
// Called by Comet native through JNI.
// Returns the actual amount of memory (in bytes) granted.
public long acquireMemory(long size) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Task {} requested {} bytes", taskAttemptId, size);
+ }
long acquired = internal.acquireExecutionMemory(size,
nativeMemoryConsumer);
- used.addAndGet(acquired);
+ long newUsed = used.addAndGet(acquired);
if (acquired < size) {
+ logger.warn(
+ "Task {} requested {} bytes but only received {} bytes. Current
allocation is {} and "
Review Comment:
```suggestion
"Task ID {} requested {} bytes but only received {} bytes. Current
allocation is {} and "
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]