This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 681b8a8da [VL] Expose API SparkMemoryUtil.dumpMemoryManagerStats(tmm: 
TaskMemoryManager) for debugging purpose
681b8a8da is described below

commit 681b8a8dacf00b32479720b120d897caade00b75
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Jul 29 16:09:40 2024 +0800

    [VL] Expose API SparkMemoryUtil.dumpMemoryManagerStats(tmm: 
TaskMemoryManager) for debugging purpose
---
 .../org/apache/spark/memory/SparkMemoryUtil.scala  | 140 +++++++++++----------
 1 file changed, 71 insertions(+), 69 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
index 178310fd6..6fd2d7a3e 100644
--- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
@@ -56,80 +56,82 @@ object SparkMemoryUtil {
     smp.memoryFree + emp.memoryFree
   }
 
+  def dumpMemoryManagerStats(tmm: TaskMemoryManager): String = {
+    val stats = tmm.synchronized {
+      val consumers = 
consumersField.get(tmm).asInstanceOf[util.HashSet[MemoryConsumer]]
+
+      // create stats map
+      val statsMap = new util.HashMap[String, MemoryUsageStats]()
+      consumers.asScala.foreach {
+        case mt: KnownNameAndStats =>
+          statsMap.put(mt.name(), mt.stats())
+        case mc =>
+          statsMap.put(
+            mc.toString,
+            MemoryUsageStats
+              .newBuilder()
+              .setCurrent(mc.getUsed)
+              .setPeak(-1L)
+              .build())
+      }
+      Preconditions.checkState(statsMap.size() == consumers.size())
+
+      // add root
+      new KnownNameAndStats {
+        override def name(): String = s"Task.${taskIdField.get(tmm)}"
+
+        override def stats(): MemoryUsageStats = MemoryUsageStats
+          .newBuilder()
+          .setCurrent(tmm.getMemoryConsumptionForThisTask)
+          .setPeak(-1L)
+          .putAllChildren(statsMap)
+          .build()
+      }
+    }
+
+    prettyPrintStats("Memory consumer stats: ", stats)
+  }
+
   def dumpMemoryTargetStats(target: MemoryTarget): String = {
-    def collectRootStats(target: MemoryTarget): KnownNameAndStats = {
-      target.accept(new MemoryTargetVisitor[KnownNameAndStats] {
-        override def visit(overAcquire: OverAcquire): KnownNameAndStats = {
-          overAcquire.getTarget.accept(this)
-        }
-
-        @nowarn
-        override def visit(regularMemoryConsumer: RegularMemoryConsumer): 
KnownNameAndStats = {
-          
collectFromTaskMemoryManager(regularMemoryConsumer.getTaskMemoryManager)
-        }
-
-        override def visit(throwOnOomMemoryTarget: ThrowOnOomMemoryTarget): 
KnownNameAndStats = {
-          throwOnOomMemoryTarget.target().accept(this)
-        }
-
-        override def visit(treeMemoryConsumer: TreeMemoryConsumer): 
KnownNameAndStats = {
-          collectFromTaskMemoryManager(treeMemoryConsumer.getTaskMemoryManager)
-        }
-
-        override def visit(node: TreeMemoryTargets.Node): KnownNameAndStats = {
-          node.parent().accept(this) // walk up to find the one bound with 
task memory manager
-        }
-
-        private def collectFromTaskMemoryManager(tmm: TaskMemoryManager): 
KnownNameAndStats = {
-          tmm.synchronized {
-            val consumers = 
consumersField.get(tmm).asInstanceOf[util.HashSet[MemoryConsumer]]
-
-            // create stats map
-            val statsMap = new util.HashMap[String, MemoryUsageStats]()
-            consumers.asScala.foreach {
-              case mt: KnownNameAndStats =>
-                statsMap.put(mt.name(), mt.stats())
-              case mc =>
-                statsMap.put(
-                  mc.toString,
-                  MemoryUsageStats
-                    .newBuilder()
-                    .setCurrent(mc.getUsed)
-                    .setPeak(-1L)
-                    .build())
-            }
-            Preconditions.checkState(statsMap.size() == consumers.size())
-
-            // add root
-            new KnownNameAndStats {
-              override def name(): String = s"Task.${taskIdField.get(tmm)}"
-
-              override def stats(): MemoryUsageStats = MemoryUsageStats
-                .newBuilder()
-                .setCurrent(tmm.getMemoryConsumptionForThisTask)
-                .setPeak(-1L)
-                .putAllChildren(statsMap)
-                .build()
-            }
-          }
-        }
+    target.accept(new MemoryTargetVisitor[String] {
+      override def visit(overAcquire: OverAcquire): String = {
+        overAcquire.getTarget.accept(this)
+      }
 
-        override def visit(loggingMemoryTarget: LoggingMemoryTarget): 
KnownNameAndStats = {
-          loggingMemoryTarget.delegated().accept(this)
-        }
+      @nowarn
+      override def visit(regularMemoryConsumer: RegularMemoryConsumer): String 
= {
+        
collectFromTaskMemoryManager(regularMemoryConsumer.getTaskMemoryManager)
+      }
 
-        override def visit(noopMemoryTarget: NoopMemoryTarget): 
KnownNameAndStats = {
-          noopMemoryTarget
-        }
+      override def visit(throwOnOomMemoryTarget: ThrowOnOomMemoryTarget): 
String = {
+        throwOnOomMemoryTarget.target().accept(this)
+      }
 
-        override def visit(dynamicOffHeapSizingMemoryTarget: 
DynamicOffHeapSizingMemoryTarget)
-            : KnownNameAndStats = {
-          dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
-        }
-      })
-    }
+      override def visit(treeMemoryConsumer: TreeMemoryConsumer): String = {
+        collectFromTaskMemoryManager(treeMemoryConsumer.getTaskMemoryManager)
+      }
+
+      override def visit(node: TreeMemoryTargets.Node): String = {
+        node.parent().accept(this) // walk up to find the one bound with task 
memory manager
+      }
+
+      private def collectFromTaskMemoryManager(tmm: TaskMemoryManager): String 
= {
+        dumpMemoryManagerStats(tmm)
+      }
 
-    prettyPrintStats("Memory consumer stats: ", collectRootStats(target))
+      override def visit(loggingMemoryTarget: LoggingMemoryTarget): String = {
+        loggingMemoryTarget.delegated().accept(this)
+      }
+
+      override def visit(noopMemoryTarget: NoopMemoryTarget): String = {
+        prettyPrintStats("No-op memory manager stats: ", noopMemoryTarget)
+      }
+
+      override def visit(
+          dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget): 
String = {
+        dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
+      }
+    })
   }
 
   def prettyPrintStats(title: String, stats: KnownNameAndStats): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to