This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 681b8a8da [VL] Expose API SparkMemoryUtil.dumpMemoryManagerStats(tmm:
TaskMemoryManager) for debugging purpose
681b8a8da is described below
commit 681b8a8dacf00b32479720b120d897caade00b75
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Jul 29 16:09:40 2024 +0800
[VL] Expose API SparkMemoryUtil.dumpMemoryManagerStats(tmm:
TaskMemoryManager) for debugging purpose
---
.../org/apache/spark/memory/SparkMemoryUtil.scala | 140 +++++++++++----------
1 file changed, 71 insertions(+), 69 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
index 178310fd6..6fd2d7a3e 100644
--- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
@@ -56,80 +56,82 @@ object SparkMemoryUtil {
smp.memoryFree + emp.memoryFree
}
+ def dumpMemoryManagerStats(tmm: TaskMemoryManager): String = {
+ val stats = tmm.synchronized {
+ val consumers =
consumersField.get(tmm).asInstanceOf[util.HashSet[MemoryConsumer]]
+
+ // create stats map
+ val statsMap = new util.HashMap[String, MemoryUsageStats]()
+ consumers.asScala.foreach {
+ case mt: KnownNameAndStats =>
+ statsMap.put(mt.name(), mt.stats())
+ case mc =>
+ statsMap.put(
+ mc.toString,
+ MemoryUsageStats
+ .newBuilder()
+ .setCurrent(mc.getUsed)
+ .setPeak(-1L)
+ .build())
+ }
+ Preconditions.checkState(statsMap.size() == consumers.size())
+
+ // add root
+ new KnownNameAndStats {
+ override def name(): String = s"Task.${taskIdField.get(tmm)}"
+
+ override def stats(): MemoryUsageStats = MemoryUsageStats
+ .newBuilder()
+ .setCurrent(tmm.getMemoryConsumptionForThisTask)
+ .setPeak(-1L)
+ .putAllChildren(statsMap)
+ .build()
+ }
+ }
+
+ prettyPrintStats("Memory consumer stats: ", stats)
+ }
+
def dumpMemoryTargetStats(target: MemoryTarget): String = {
- def collectRootStats(target: MemoryTarget): KnownNameAndStats = {
- target.accept(new MemoryTargetVisitor[KnownNameAndStats] {
- override def visit(overAcquire: OverAcquire): KnownNameAndStats = {
- overAcquire.getTarget.accept(this)
- }
-
- @nowarn
- override def visit(regularMemoryConsumer: RegularMemoryConsumer):
KnownNameAndStats = {
-
collectFromTaskMemoryManager(regularMemoryConsumer.getTaskMemoryManager)
- }
-
- override def visit(throwOnOomMemoryTarget: ThrowOnOomMemoryTarget):
KnownNameAndStats = {
- throwOnOomMemoryTarget.target().accept(this)
- }
-
- override def visit(treeMemoryConsumer: TreeMemoryConsumer):
KnownNameAndStats = {
- collectFromTaskMemoryManager(treeMemoryConsumer.getTaskMemoryManager)
- }
-
- override def visit(node: TreeMemoryTargets.Node): KnownNameAndStats = {
- node.parent().accept(this) // walk up to find the one bound with
task memory manager
- }
-
- private def collectFromTaskMemoryManager(tmm: TaskMemoryManager):
KnownNameAndStats = {
- tmm.synchronized {
- val consumers =
consumersField.get(tmm).asInstanceOf[util.HashSet[MemoryConsumer]]
-
- // create stats map
- val statsMap = new util.HashMap[String, MemoryUsageStats]()
- consumers.asScala.foreach {
- case mt: KnownNameAndStats =>
- statsMap.put(mt.name(), mt.stats())
- case mc =>
- statsMap.put(
- mc.toString,
- MemoryUsageStats
- .newBuilder()
- .setCurrent(mc.getUsed)
- .setPeak(-1L)
- .build())
- }
- Preconditions.checkState(statsMap.size() == consumers.size())
-
- // add root
- new KnownNameAndStats {
- override def name(): String = s"Task.${taskIdField.get(tmm)}"
-
- override def stats(): MemoryUsageStats = MemoryUsageStats
- .newBuilder()
- .setCurrent(tmm.getMemoryConsumptionForThisTask)
- .setPeak(-1L)
- .putAllChildren(statsMap)
- .build()
- }
- }
- }
+ target.accept(new MemoryTargetVisitor[String] {
+ override def visit(overAcquire: OverAcquire): String = {
+ overAcquire.getTarget.accept(this)
+ }
- override def visit(loggingMemoryTarget: LoggingMemoryTarget):
KnownNameAndStats = {
- loggingMemoryTarget.delegated().accept(this)
- }
+ @nowarn
+ override def visit(regularMemoryConsumer: RegularMemoryConsumer): String
= {
+
collectFromTaskMemoryManager(regularMemoryConsumer.getTaskMemoryManager)
+ }
- override def visit(noopMemoryTarget: NoopMemoryTarget):
KnownNameAndStats = {
- noopMemoryTarget
- }
+ override def visit(throwOnOomMemoryTarget: ThrowOnOomMemoryTarget):
String = {
+ throwOnOomMemoryTarget.target().accept(this)
+ }
- override def visit(dynamicOffHeapSizingMemoryTarget:
DynamicOffHeapSizingMemoryTarget)
- : KnownNameAndStats = {
- dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
- }
- })
- }
+ override def visit(treeMemoryConsumer: TreeMemoryConsumer): String = {
+ collectFromTaskMemoryManager(treeMemoryConsumer.getTaskMemoryManager)
+ }
+
+ override def visit(node: TreeMemoryTargets.Node): String = {
+ node.parent().accept(this) // walk up to find the one bound with task
memory manager
+ }
+
+ private def collectFromTaskMemoryManager(tmm: TaskMemoryManager): String
= {
+ dumpMemoryManagerStats(tmm)
+ }
- prettyPrintStats("Memory consumer stats: ", collectRootStats(target))
+ override def visit(loggingMemoryTarget: LoggingMemoryTarget): String = {
+ loggingMemoryTarget.delegated().accept(this)
+ }
+
+ override def visit(noopMemoryTarget: NoopMemoryTarget): String = {
+ prettyPrintStats("No-op memory manager stats: ", noopMemoryTarget)
+ }
+
+ override def visit(
+ dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget):
String = {
+ dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
+ }
+ })
}
def prettyPrintStats(title: String, stats: KnownNameAndStats): String = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]