This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 69854faaae [GLUTEN-11169][VL] Trigger GC before throwing OOM to ensure
unused off-heap broadcasted relations are correctly released (#11236)
69854faaae is described below
commit 69854faaae98b11cf40dcf74ef900dcbeb91230f
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Dec 15 16:45:25 2025 +0800
[GLUTEN-11169][VL] Trigger GC before throwing OOM to ensure unused off-heap
broadcasted relations are correctly released (#11236)
---
.../UnsafeColumnarBuildSideRelationTest.scala | 63 ++++++++++--------
.../memory/memtarget/ThrowOnOomMemoryTarget.java | 75 ++++++++++++++++++++--
2 files changed, 103 insertions(+), 35 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
index 252bf451f6..82adee0375 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelationTest.scala
@@ -45,19 +45,9 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
var unsafeRelWithIdentityMode: UnsafeColumnarBuildSideRelation = _
var unsafeRelWithHashMode: UnsafeColumnarBuildSideRelation = _
var output: Seq[Attribute] = _
- var sampleBytes: Array[Array[Byte]] = _
+ var sample1KBytes: Array[Byte] = _
var initialGlobalBytes: Long = _
- private def toUnsafeByteArray(bytes: Array[Byte]): UnsafeByteArray = {
- val buf = ArrowBufferAllocators.globalInstance().buffer(bytes.length)
- buf.setBytes(0, bytes, 0, bytes.length)
- try {
- new UnsafeByteArray(buf, bytes.length.toLong)
- } finally {
- buf.close()
- }
- }
-
private def toByteArray(unsafeByteArray: UnsafeByteArray): Array[Byte] = {
val byteArray = new Array[Byte](Math.toIntExact(unsafeByteArray.size()))
Platform.copyMemory(
@@ -73,9 +63,9 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
super.beforeAll()
initialGlobalBytes = GlobalOffHeapMemory.currentBytes()
output = Seq(AttributeReference("a", StringType, nullable = false, null)())
- sampleBytes = Array(randomBytes(10), randomBytes(100))
- unsafeRelWithIdentityMode = newUnsafeRelationWithIdentityMode(sampleBytes:
_*)
- unsafeRelWithHashMode = newUnsafeRelationWithHashMode(sampleBytes: _*)
+ sample1KBytes = randomBytes(1024)
+ unsafeRelWithIdentityMode = newUnsafeRelationWithIdentityMode(2)
+ unsafeRelWithHashMode = newUnsafeRelationWithHashMode(2)
}
override protected def afterAll(): Unit = {
@@ -84,7 +74,7 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
unsafeRelWithIdentityMode = null
unsafeRelWithHashMode = null
System.gc()
- Thread.sleep(500)
+ Thread.sleep(1000)
// FIXME: This should be zero. We had to assert with the initial bytes
because
// there were some allocations from the previous run suites.
assert(GlobalOffHeapMemory.currentBytes() == initialGlobalBytes)
@@ -97,22 +87,31 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
array
}
- private def newUnsafeRelationWithIdentityMode(
- bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
- require(bytes.nonEmpty)
+ private def sampleUnsafeByteArrayInKb(sizeInKb: Int): UnsafeByteArray = {
+ val sizeInBytes = sizeInKb * 1024
+ val buf = ArrowBufferAllocators.globalInstance().buffer(sizeInBytes)
+ for (i <- 0 until sizeInKb) {
+ buf.setBytes(i * 1024, sample1KBytes, 0, 1024)
+ }
+ try {
+ new UnsafeByteArray(buf, sizeInBytes)
+ } finally {
+ buf.close()
+ }
+ }
+
+ private def newUnsafeRelationWithIdentityMode(sizeInKb: Int):
UnsafeColumnarBuildSideRelation = {
UnsafeColumnarBuildSideRelation(
output,
- bytes.map(a => toUnsafeByteArray(a)),
+ (0 until sizeInKb).map(_ => sampleUnsafeByteArrayInKb(1)),
IdentityBroadcastMode
)
}
- private def newUnsafeRelationWithHashMode(
- bytes: Array[Byte]*): UnsafeColumnarBuildSideRelation = {
- require(bytes.nonEmpty)
+ private def newUnsafeRelationWithHashMode(sizeInKb: Int):
UnsafeColumnarBuildSideRelation = {
UnsafeColumnarBuildSideRelation(
output,
- bytes.map(a => toUnsafeByteArray(a)),
+ (0 until sizeInKb).map(_ => sampleUnsafeByteArrayInKb(1)),
HashedRelationBroadcastMode(output, isNullAware = false)
)
}
@@ -129,7 +128,7 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
assert(
util.Arrays.deepEquals(
obj.getBatches().map(toByteArray).toArray[AnyRef],
- sampleBytes.asInstanceOf[Array[AnyRef]]))
+ Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
// test unsafeRelWithHashMode
val buffer2 = serializerInstance.serialize(unsafeRelWithHashMode)
@@ -139,7 +138,7 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
assert(
util.Arrays.deepEquals(
obj2.getBatches().map(toByteArray).toArray[AnyRef],
- sampleBytes.asInstanceOf[Array[AnyRef]]))
+ Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
}
test("Kryo serialization") {
@@ -154,7 +153,7 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
assert(
util.Arrays.deepEquals(
obj.getBatches().map(toByteArray).toArray[AnyRef],
- sampleBytes.asInstanceOf[Array[AnyRef]]))
+ Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
// test unsafeRelWithHashMode
val buffer2 = serializerInstance.serialize(unsafeRelWithHashMode)
@@ -164,7 +163,7 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
assert(
util.Arrays.deepEquals(
obj2.getBatches().map(toByteArray).toArray[AnyRef],
- sampleBytes.asInstanceOf[Array[AnyRef]]))
+ Array(sample1KBytes, sample1KBytes).asInstanceOf[Array[AnyRef]]))
}
test("Should throw OOM when off-heap memory is running out") {
@@ -172,9 +171,17 @@ class UnsafeColumnarBuildSideRelationTest extends
SharedSparkSession {
val relations = mutable.ListBuffer[UnsafeColumnarBuildSideRelation]()
assertThrows[OutOfMemoryException] {
for (i <- 0 until 10) {
- relations +=
newUnsafeRelationWithHashMode(randomBytes(ByteUnit.MiB.toBytes(50).toInt))
+ relations +=
newUnsafeRelationWithHashMode(ByteUnit.MiB.toKiB(50).toInt)
}
}
relations.clear()
}
+
+ test("Should trigger GC before OOM") {
+ // 500 MiB > 200 MiB, but since we don't preserve the references to the
created relations,
+ // GC will be triggered and OOM should not be thrown.
+ for (i <- 0 until 10) {
+ newUnsafeRelationWithHashMode(ByteUnit.MiB.toKiB(50).toInt)
+ }
+ }
}
diff --git
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
index 5a2b10e279..395f956fe9 100644
---
a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
+++
b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java
@@ -22,12 +22,20 @@ import org.apache.spark.memory.SparkMemoryUtil;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.task.TaskResources;
import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
public class ThrowOnOomMemoryTarget implements MemoryTarget {
+ private static final Logger LOG =
LoggerFactory.getLogger(ThrowOnOomMemoryTarget.class);
+ // Max number of sleeps during retrying the reservation.
+ // Durations are orderly 1, 2, 4, 8, 16, 32, 64, 128, 256 (total 511 ms ~
0.5 s).
+ private static final int MAX_SLEEPS = 9;
+ private static final int MAX_WAIT_MS = 1000;
+
private static final List<String> PRINTED_NON_BYTES_CONFIGURATIONS =
Arrays.asList(
GlutenCoreConfig.SPARK_OFFHEAP_ENABLED_KEY(),
@@ -47,17 +55,70 @@ public class ThrowOnOomMemoryTarget implements MemoryTarget
{
@Override
public long borrow(long size) {
- long granted = target.borrow(size);
- if (granted >= size) {
- return granted;
+ long granted;
+ {
+ granted = target.borrow(size);
+ if (granted >= size) {
+ return granted;
+ }
+ if (granted != 0L) {
+ target.repay(granted);
+ }
+ }
+
+ // About to OOM.
+ LOG.warn("Off-heap reservation of {} bytes failed.", size);
+
+ // Invoke GC, then retry up to 9 times (1s extra delay in total) for this
+ // reservation. This is for ensuring we waited for GC to collect all the
+ // non-reachable objects, during which the off-heap allocations might also
+ // be returned to the memory manager. For example, UnsafeByteArray
implements
+ // `finalize` to release its off-heap memory allocation so its lifecycle
+ // relies on JVM GC.
+ LOG.warn("Invoking GC to try reclaiming some off-heap memory space if
applicable...");
+ System.gc();
+ final long start = System.currentTimeMillis();
+ int sleeps = 0;
+ long sleepTime = 1;
+ while (true) {
+ final long elapsedMs = System.currentTimeMillis() - start;
+ if (elapsedMs >= MAX_WAIT_MS) {
+ LOG.warn("Max wait time (in ms) {} has reached. ", MAX_WAIT_MS);
+ break;
+ }
+ LOG.warn(
+ "Retrying reserving {} bytes (finished {}/{} number of sleeps,
elapsed {}/{} ms)... ",
+ size,
+ sleeps,
+ MAX_SLEEPS,
+ elapsedMs,
+ MAX_WAIT_MS);
+ granted = target.borrow(size);
+ if (granted >= size) {
+ return granted;
+ }
+ if (granted != 0L) {
+ target.repay(granted);
+ }
+ if (sleeps >= MAX_SLEEPS) {
+ LOG.warn("Max number of sleeps {} has reached. ", MAX_SLEEPS);
+ break;
+ }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ sleepTime *= 2;
+ sleeps++;
}
+
// OOM happens.
// Note if the target is a Spark memory consumer, spilling should already
be requested but
// failed to reclaim enough memory.
- if (granted != 0L) {
- target.repay(granted);
- }
- // Log memory usage
+ //
+ // Log memory usage.
if (TaskResources.inSparkTask()) {
TaskResources.getLocalTaskContext().taskMemoryManager().showMemoryUsage();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]