This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9c2135fbe49 [SPARK-42686][CORE] Defer formatting for debug messages in
TaskMemoryManager
9c2135fbe49 is described below
commit 9c2135fbe49f5bed37f361f002163d791c757545
Author: Alkis Evlogimenos <[email protected]>
AuthorDate: Fri Mar 10 10:13:09 2023 +0900
[SPARK-42686][CORE] Defer formatting for debug messages in TaskMemoryManager
### What changes were proposed in this pull request?
Defer formatting of bytes until debug logging is required. Otherwise we are
always spending cycles doing formatting irrespective of if debug logging is
enabled.
### Why are the changes needed?
Formatting shows up in profiling scan benchmarks.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually.
Closes #40302 from alkis/faster-task-memory-manager.
Authored-by: Alkis Evlogimenos <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/memory/TaskMemoryManager.java | 41 +++++++++++++++-------
1 file changed, 28 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index e2e44a54eb2..83352611770 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -149,8 +149,10 @@ public class TaskMemoryManager {
// Try to release memory from other consumers first, then we can reduce
the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
- logger.debug("Task {} need to spill {} for {}", taskAttemptId,
- Utils.bytesToString(required - got), requestingConsumer);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+ Utils.bytesToString(required - got), requestingConsumer);
+ }
// We need to call spill() on consumers to free up more memory. We
want to optimize for two
// things:
// * Minimize the number of spill calls, to reduce the number of spill
files and avoid small
@@ -193,8 +195,10 @@ public class TaskMemoryManager {
}
consumers.add(requestingConsumer);
- logger.debug("Task {} acquired {} for {}", taskAttemptId,
Utils.bytesToString(got),
- requestingConsumer);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Task {} acquired {} for {}", taskAttemptId,
Utils.bytesToString(got),
+ requestingConsumer);
+ }
return got;
}
}
@@ -215,14 +219,18 @@ public class TaskMemoryManager {
int idx) {
MemoryMode mode = requestingConsumer.getMode();
MemoryConsumer consumerToSpill = cList.get(idx);
- logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
- Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
+ Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
+ }
try {
long released = consumerToSpill.spill(requested, requestingConsumer);
if (released > 0) {
- logger.debug("Task {} spilled {} of requested {} from {} for {}",
taskAttemptId,
- Utils.bytesToString(released), Utils.bytesToString(requested),
consumerToSpill,
- requestingConsumer);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Task {} spilled {} of requested {} from {} for {}",
taskAttemptId,
+ Utils.bytesToString(released), Utils.bytesToString(requested),
consumerToSpill,
+ requestingConsumer);
+ }
// When our spill handler releases memory,
`ExecutionMemoryPool#releaseMemory()` will
// immediately notify other tasks that memory has been freed, and they
may acquire the
@@ -251,7 +259,10 @@ public class TaskMemoryManager {
* Release N bytes of execution memory for a MemoryConsumer.
*/
public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
- logger.debug("Task {} release {} from {}", taskAttemptId,
Utils.bytesToString(size), consumer);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Task {} release {} from {}", taskAttemptId,
Utils.bytesToString(size),
+ consumer);
+ }
memoryManager.releaseExecutionMemory(size, taskAttemptId,
consumer.getMode());
}
@@ -446,15 +457,19 @@ public class TaskMemoryManager {
synchronized (this) {
for (MemoryConsumer c: consumers) {
if (c != null && c.getUsed() > 0) {
- // In case of failed task, it's normal to see leaked memory
- logger.debug("unreleased " + Utils.bytesToString(c.getUsed()) + "
memory from " + c);
+ if (logger.isDebugEnabled()) {
+ // In case of failed task, it's normal to see leaked memory
+ logger.debug("unreleased {} memory from {}",
Utils.bytesToString(c.getUsed()), c);
+ }
}
}
consumers.clear();
for (MemoryBlock page : pageTable) {
if (page != null) {
- logger.debug("unreleased page: " + page + " in task " +
taskAttemptId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("unreleased page: {} in task {}", page,
taskAttemptId);
+ }
page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
memoryManager.tungstenMemoryAllocator().free(page);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]