This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4aea124e58f2 [SPARK-55026][CORE][SQL] Optimize BestEffortLazyVal
4aea124e58f2 is described below
commit 4aea124e58f2b6868765eabc90c31728b1a03c65
Author: Chenhao Li <[email protected]>
AuthorDate: Tue Jan 20 12:19:42 2026 +0800
[SPARK-55026][CORE][SQL] Optimize BestEffortLazyVal
### What changes were proposed in this pull request?
The change is inspired by
https://github.com/apache/spark/pull/49212#discussion_r1904755002, although I
don't use `AtomicReferenceFieldUpdater`, but use `VarHandle`, which is a more
modern utility class that provides atomic field access.
Before the change, the `(Transient)BestEffortLazyVal` needs to create an
`AtomicReference`. This requires an extra heap allocation of 16/24 bytes on
common platforms (depending on the object header size). After the change, the
size of `(Transient)BestEffortLazyVal` remains unchanged, and we can get rid of
this extra allocation.
`BestEffortLazyVal` is an infrastructure class in Spark, so it is worth to
add some complexity to reduce memory usage. It is widely used in planner
objects (`LogicalPlan`, `Expression`), so I expect this change to improve
driver stability when handling large plans.
### Why are the changes needed?
Reduce memory usage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Manual testing with `bin/spark-shell --conf spark.driver.memory=8g`.
Running a command `Array.fill(n)(new
org.apache.spark.util.BestEffortLazyVal[java.lang.Integer](() => 1))`:
- before the change, it can successfully allocate 1.9e8 elements, but OOMs
on 2e8 elements.
- after the change, it can successfully allocate 3e8 elements, but OOMs on
3.1e8 elements.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53761 from chenhao-db/optimize_lazy_val.
Authored-by: Chenhao Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/util/BestEffortLazyVal.java} | 58 ++++++++++++----------
.../spark/util/TransientBestEffortLazyVal.java} | 57 ++++++++++++++-------
2 files changed, 73 insertions(+), 42 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/util/TransientBestEffortLazyVal.scala
b/core/src/main/java/org/apache/spark/util/BestEffortLazyVal.java
similarity index 62%
rename from
core/src/main/scala/org/apache/spark/util/TransientBestEffortLazyVal.scala
rename to core/src/main/java/org/apache/spark/util/BestEffortLazyVal.java
index 033b783ede40..d5b2abb9cebf 100644
--- a/core/src/main/scala/org/apache/spark/util/TransientBestEffortLazyVal.scala
+++ b/core/src/main/java/org/apache/spark/util/BestEffortLazyVal.java
@@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util;
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.atomic.AtomicReference
+import scala.Function0;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
/**
* A lock-free implementation of a lazily-initialized variable.
@@ -28,10 +30,6 @@ import java.util.concurrent.atomic.AtomicReference
* This may be helpful for avoiding deadlocks in certain scenarios where
exactly-once
* value computation is not a hard requirement.
*
- * The main difference between this and [[BestEffortLazyVal]] is that:
- * [[BestEffortLazyVal]] serializes the cached value after computation, while
- * [[TransientBestEffortLazyVal]] always serializes the compute function.
- *
* @note
* This helper class has additional requirements on the compute function:
* 1) The compute function MUST not return null;
@@ -43,27 +41,37 @@ import java.util.concurrent.atomic.AtomicReference
*
href="https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html">Lazy
* Vals Initialization</a> for more details.
*/
-private[spark] class TransientBestEffortLazyVal[T <: AnyRef](
- private[this] val compute: () => T) extends Serializable {
-
- @transient
- private[this] var cached: AtomicReference[T] = new
AtomicReference(null.asInstanceOf[T])
+public class BestEffortLazyVal<T> implements Serializable {
+ private volatile Function0<T> compute;
+ protected volatile T cached;
- def apply(): T = {
- val value = cached.get()
- if (value != null) {
- value
- } else {
- val newValue = compute()
- assert(newValue != null, "compute function cannot return null.")
- cached.compareAndSet(null.asInstanceOf[T], newValue)
- cached.get()
+ private static final VarHandle HANDLE;
+ static {
+ try {
+ HANDLE = MethodHandles.lookup()
+ .in(BestEffortLazyVal.class)
+ .findVarHandle(BestEffortLazyVal.class, "cached", Object.class);
+ } catch (ReflectiveOperationException e) {
+ throw new IllegalStateException("Failed to initialize VarHandle", e);
}
}
- @throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream): Unit =
Utils.tryOrIOException {
- ois.defaultReadObject()
- cached = new AtomicReference(null.asInstanceOf[T])
+ public BestEffortLazyVal(Function0<T> compute) {
+ this.compute = compute;
+ }
+
+ public T apply() {
+ T value = cached;
+ if (value != null) {
+ return value;
+ }
+ Function0<T> f = compute;
+ if (f != null) {
+ T newValue = f.apply();
+ assert newValue != null: "compute function cannot return null.";
+ HANDLE.compareAndSet(this, null, newValue);
+ compute = null; // allow closure to be GC'd
+ }
+ return cached;
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/BestEffortLazyVal.scala
b/core/src/main/java/org/apache/spark/util/TransientBestEffortLazyVal.java
similarity index 54%
rename from core/src/main/scala/org/apache/spark/util/BestEffortLazyVal.scala
rename to
core/src/main/java/org/apache/spark/util/TransientBestEffortLazyVal.java
index 83044055fe40..784a7abdc4b4 100644
--- a/core/src/main/scala/org/apache/spark/util/BestEffortLazyVal.scala
+++ b/core/src/main/java/org/apache/spark/util/TransientBestEffortLazyVal.java
@@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.util
+package org.apache.spark.util;
-import java.util.concurrent.atomic.AtomicReference
+import scala.Function0;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
/**
* A lock-free implementation of a lazily-initialized variable.
@@ -27,6 +32,10 @@ import java.util.concurrent.atomic.AtomicReference
* This may be helpful for avoiding deadlocks in certain scenarios where
exactly-once
* value computation is not a hard requirement.
*
+ * The main difference between this and [[BestEffortLazyVal]] is that:
+ * [[BestEffortLazyVal]] serializes the cached value after computation, while
+ * [[TransientBestEffortLazyVal]] always serializes the compute function.
+ *
* @note
* This helper class has additional requirements on the compute function:
* 1) The compute function MUST not return null;
@@ -38,24 +47,38 @@ import java.util.concurrent.atomic.AtomicReference
*
href="https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html">Lazy
* Vals Initialization</a> for more details.
*/
-private[spark] class BestEffortLazyVal[T <: AnyRef](
- @volatile private[this] var compute: () => T) extends Serializable {
+public class TransientBestEffortLazyVal<T> implements Serializable {
+ private volatile Function0<T> compute;
+ protected transient volatile T cached;
- private[this] val cached: AtomicReference[T] = new
AtomicReference(null.asInstanceOf[T])
+ private static final VarHandle HANDLE;
+ static {
+ try {
+ HANDLE = MethodHandles.lookup()
+ .in(TransientBestEffortLazyVal.class)
+ .findVarHandle(TransientBestEffortLazyVal.class, "cached",
Object.class);
+ } catch (ReflectiveOperationException e) {
+ throw new IllegalStateException("Failed to initialize VarHandle", e);
+ }
+ }
- def apply(): T = {
- val value = cached.get()
+ public TransientBestEffortLazyVal(Function0<T> compute) {
+ this.compute = compute;
+ }
+
+ public T apply() {
+ T value = cached;
if (value != null) {
- value
- } else {
- val f = compute
- if (f != null) {
- val newValue = f()
- assert(newValue != null, "compute function cannot return null.")
- cached.compareAndSet(null.asInstanceOf[T], newValue)
- compute = null // allow closure to be GC'd
- }
- cached.get()
+ return value;
}
+ T newValue = compute.apply();
+ assert newValue != null: "compute function cannot return null.";
+ HANDLE.compareAndSet(this, null, newValue);
+ return cached;
+ }
+
+ private void readObject(ObjectInputStream ois) throws IOException,
ClassNotFoundException {
+ ois.defaultReadObject();
+ cached = null;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]