This is an automated email from the ASF dual-hosted git repository.
zhaokuo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b572715e28 [GLUTEN-8128][VL] Retry borrowing when granted size is less
than requested in multi-slot and shared mode (#8132)
b572715e28 is described below
commit b572715e28b6817f51f3f0813aafae42e40fb551
Author: zhaokuo <[email protected]>
AuthorDate: Tue Dec 17 12:19:13 2024 +0800
[GLUTEN-8128][VL] Retry borrowing when granted size is less than requested
in multi-slot and shared mode (#8132)
---
.../memory/memtarget/MemoryTargetVisitor.java | 2 +
.../gluten/memory/memtarget/MemoryTargets.java | 19 +++-
.../memory/memtarget/RetryOnOomMemoryTarget.java | 115 +++++++++++++++++++++
.../org/apache/spark/memory/SparkMemoryUtil.scala | 4 +
4 files changed, 136 insertions(+), 4 deletions(-)
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
index e58dbb295b..a42a51e0ce 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java
@@ -35,4 +35,6 @@ public interface MemoryTargetVisitor<T> {
T visit(NoopMemoryTarget noopMemoryTarget);
T visit(DynamicOffHeapSizingMemoryTarget dynamicOffHeapSizingMemoryTarget);
+
+ T visit(RetryOnOomMemoryTarget retryOnOomMemoryTarget);
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
index 6f7cc9bd9c..c0f74c7990 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java
@@ -20,8 +20,10 @@ import org.apache.gluten.GlutenConfig;
import org.apache.gluten.memory.MemoryUsageStatsBuilder;
import org.apache.gluten.memory.memtarget.spark.TreeMemoryConsumers;
+import org.apache.spark.SparkEnv;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.util.SparkResourceUtil;
import java.util.Map;
@@ -43,6 +45,14 @@ public final class MemoryTargets {
return new OverAcquire(target, overTarget, overAcquiredRatio);
}
+ public static TreeMemoryTarget retrySpillOnOom(TreeMemoryTarget target) {
+ SparkEnv env = SparkEnv.get();
+ if (env != null && env.conf() != null &&
SparkResourceUtil.getTaskSlots(env.conf()) > 1) {
+ return new RetryOnOomMemoryTarget(target);
+ }
+ return target;
+ }
+
@Experimental
public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget
memoryTarget) {
if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) {
@@ -59,11 +69,12 @@ public final class MemoryTargets {
Map<String, MemoryUsageStatsBuilder> virtualChildren) {
final TreeMemoryConsumers.Factory factory;
if (GlutenConfig.getConf().memoryIsolation()) {
- factory = TreeMemoryConsumers.isolated();
+ return TreeMemoryConsumers.isolated().newConsumer(tmm, name, spiller,
virtualChildren);
} else {
- factory = TreeMemoryConsumers.shared();
+ // Retry of spilling is needed in shared mode because the
maxMemoryPerTask of Vanilla Spark
+ // ExecutionMemoryPool is dynamic when with multi-slot config.
+ return MemoryTargets.retrySpillOnOom(
+ TreeMemoryConsumers.shared().newConsumer(tmm, name, spiller,
virtualChildren));
}
-
- return factory.newConsumer(tmm, name, spiller, virtualChildren);
}
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/RetryOnOomMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/RetryOnOomMemoryTarget.java
new file mode 100644
index 0000000000..1a5388d0d1
--- /dev/null
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/RetryOnOomMemoryTarget.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.memory.memtarget;
+
+import org.apache.gluten.memory.MemoryUsageStatsBuilder;
+import org.apache.gluten.proto.MemoryUsageStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class RetryOnOomMemoryTarget implements TreeMemoryTarget {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RetryOnOomMemoryTarget.class);
+ private final TreeMemoryTarget target;
+
+ RetryOnOomMemoryTarget(TreeMemoryTarget target) {
+ this.target = target;
+ }
+
+ @Override
+ public long borrow(long size) {
+ long granted = target.borrow(size);
+ if (granted < size) {
+ LOGGER.info("Retrying spill require:{} got:{}", size, granted);
+ final long spilled = retryingSpill(Long.MAX_VALUE);
+ final long remaining = size - granted;
+ if (spilled >= remaining) {
+ granted += target.borrow(remaining);
+ }
+ LOGGER.info("Retrying spill spilled:{} final granted:{}", spilled,
granted);
+ }
+ return granted;
+ }
+
+ private long retryingSpill(long size) {
+ TreeMemoryTarget rootTarget = target;
+ while (true) {
+ try {
+ rootTarget = rootTarget.parent();
+ } catch (IllegalStateException e) {
+ // Reached the root node
+ break;
+ }
+ }
+ return TreeMemoryTargets.spillTree(rootTarget, size);
+ }
+
+ @Override
+ public long repay(long size) {
+ return target.repay(size);
+ }
+
+ @Override
+ public long usedBytes() {
+ return target.usedBytes();
+ }
+
+ @Override
+ public <T> T accept(MemoryTargetVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
+
+ @Override
+ public String name() {
+ return target.name();
+ }
+
+ @Override
+ public MemoryUsageStats stats() {
+ return target.stats();
+ }
+
+ @Override
+ public TreeMemoryTarget newChild(
+ String name,
+ long capacity,
+ Spiller spiller,
+ Map<String, MemoryUsageStatsBuilder> virtualChildren) {
+ return target.newChild(name, capacity, spiller, virtualChildren);
+ }
+
+ @Override
+ public Map<String, TreeMemoryTarget> children() {
+ return target.children();
+ }
+
+ @Override
+ public TreeMemoryTarget parent() {
+ return target.parent();
+ }
+
+ @Override
+ public Spiller getNodeSpiller() {
+ return target.getNodeSpiller();
+ }
+
+ public TreeMemoryTarget target() {
+ return target;
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
index d221fafce4..637ef8b22f 100644
--- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala
@@ -131,6 +131,10 @@ object SparkMemoryUtil {
dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget):
String = {
dynamicOffHeapSizingMemoryTarget.delegated().accept(this)
}
+
+ override def visit(retryOnOomMemoryTarget: RetryOnOomMemoryTarget):
String = {
+ retryOnOomMemoryTarget.target().accept(this)
+ }
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]