This is an automated email from the ASF dual-hosted git repository.

zhaokuo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b572715e28 [GLUTEN-8128][VL] Retry borrowing when granted size is less 
than requested in multi-slot and shared mode (#8132)
b572715e28 is described below

commit b572715e28b6817f51f3f0813aafae42e40fb551
Author: zhaokuo <[email protected]>
AuthorDate: Tue Dec 17 12:19:13 2024 +0800

    [GLUTEN-8128][VL] Retry borrowing when granted size is less than requested 
in multi-slot and shared mode (#8132)
---
 .../memory/memtarget/MemoryTargetVisitor.java      |   2 +
 .../gluten/memory/memtarget/MemoryTargets.java     |  19 +++-
 .../memory/memtarget/RetryOnOomMemoryTarget.java   | 115 +++++++++++++++++++++
 .../org/apache/spark/memory/SparkMemoryUtil.scala  |   4 +
 4 files changed, 136 insertions(+), 4 deletions(-)

diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
index e58dbb295b..a42a51e0ce 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
@@ -35,4 +35,6 @@ public interface MemoryTargetVisitor<T> {
   T visit(NoopMemoryTarget noopMemoryTarget);
 
   T visit(DynamicOffHeapSizingMemoryTarget dynamicOffHeapSizingMemoryTarget);
+
+  T visit(RetryOnOomMemoryTarget retryOnOomMemoryTarget);
 }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 6f7cc9bd9c..c0f74c7990 100644
--- 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -20,8 +20,10 @@ import org.apache.gluten.GlutenConfig;
 import org.apache.gluten.memory.MemoryUsageStatsBuilder;
 import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
 
+import org.apache.spark.SparkEnv;
 import org.apache.spark.annotation.Experimental;
 import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.util.SparkResourceUtil;
 
 import java.util.Map;
 
@@ -43,6 +45,14 @@ public final class MemoryTargets {
     return new OverAcquire(target, overTarget, overAcquiredRatio);
   }
 
+  public static TreeMemoryTarget retrySpillOnOom(TreeMemoryTarget target) {
+    SparkEnv env = SparkEnv.get();
+    if (env != null && env.conf() != null && 
SparkResourceUtil.getTaskSlots(env.conf()) > 1) {
+      return new RetryOnOomMemoryTarget(target);
+    }
+    return target;
+  }
+
   @Experimental
   public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget 
memoryTarget) {
     if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) {
@@ -59,11 +69,12 @@ public final class MemoryTargets {
       Map<String, MemoryUsageStatsBuilder> virtualChildren) {
     final TreeMemoryConsumers.Factory factory;
     if (GlutenConfig.getConf().memoryIsolation()) {
-      factory = TreeMemoryConsumers.isolated();
+      return TreeMemoryConsumers.isolated().newConsumer(tmm, name, spiller, 
virtualChildren);
     } else {
-      factory = TreeMemoryConsumers.shared();
+      // Retry of spilling is needed in shared mode because the 
maxMemoryPerTask of Vanilla Spark
+      // ExecutionMemoryPool is dynamic when with multi-slot config.
+      return MemoryTargets.retrySpillOnOom(
+          TreeMemoryConsumers.shared().newConsumer(tmm, name, spiller, 
virtualChildren));
     }
-
-    return factory.newConsumer(tmm, name, spiller, virtualChildren);
   }
 }
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/RetryOnOomMemoryTarget.java
 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/RetryOnOomMemoryTarget.java
new file mode 100644
index 0000000000..1a5388d0d1
--- /dev/null
+++ 
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/RetryOnOomMemoryTarget.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.memory.memtarget;
+
+import org.apache.gluten.memory.MemoryUsageStatsBuilder;
+import org.apache.gluten.proto.MemoryUsageStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class RetryOnOomMemoryTarget implements TreeMemoryTarget {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RetryOnOomMemoryTarget.class);
+  private final TreeMemoryTarget target;
+
+  RetryOnOomMemoryTarget(TreeMemoryTarget target) {
+    this.target = target;
+  }
+
+  @Override
+  public long borrow(long size) {
+    long granted = target.borrow(size);
+    if (granted < size) {
+      LOGGER.info("Retrying spill require:{} got:{}", size, granted);
+      final long spilled = retryingSpill(Long.MAX_VALUE);
+      final long remaining = size - granted;
+      if (spilled >= remaining) {
+        granted += target.borrow(remaining);
+      }
+      LOGGER.info("Retrying spill spilled:{} final granted:{}", spilled, 
granted);
+    }
+    return granted;
+  }
+
+  private long retryingSpill(long size) {
+    TreeMemoryTarget rootTarget = target;
+    while (true) {
+      try {
+        rootTarget = rootTarget.parent();
+      } catch (IllegalStateException e) {
+        // Reached the root node
+        break;
+      }
+    }
+    return TreeMemoryTargets.spillTree(rootTarget, size);
+  }
+
+  @Override
+  public long repay(long size) {
+    return target.repay(size);
+  }
+
+  @Override
+  public long usedBytes() {
+    return target.usedBytes();
+  }
+
+  @Override
+  public <T> T accept(MemoryTargetVisitor<T> visitor) {
+    return visitor.visit(this);
+  }
+
+  @Override
+  public String name() {
+    return target.name();
+  }
+
+  @Override
+  public MemoryUsageStats stats() {
+    return target.stats();
+  }
+
+  @Override
+  public TreeMemoryTarget newChild(
+      String name,
+      long capacity,
+      Spiller spiller,
+      Map<String, MemoryUsageStatsBuilder> virtualChildren) {
+    return target.newChild(name, capacity, spiller, virtualChildren);
+  }
+
+  @Override
+  public Map<String, TreeMemoryTarget> children() {
+    return target.children();
+  }
+
+  @Override
+  public TreeMemoryTarget parent() {
+    return target.parent();
+  }
+
+  @Override
+  public Spiller getNodeSpiller() {
+    return target.getNodeSpiller();
+  }
+
+  public TreeMemoryTarget target() {
+    return target;
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
index d221fafce4..637ef8b22f 100644
--- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
@@ -131,6 +131,10 @@ object SparkMemoryUtil {
           dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget): 
String = {
         dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
       }
+
+      override def visit(retryOnOomMemoryTarget: RetryOnOomMemoryTarget): 
String = {
+        retryOnOomMemoryTarget.target().accept(this)
+      }
     })
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to