This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aea124e58f2 [SPARK-55026][CORE][SQL] Optimize BestEffortLazyVal
4aea124e58f2 is described below

commit 4aea124e58f2b6868765eabc90c31728b1a03c65
Author: Chenhao Li <[email protected]>
AuthorDate: Tue Jan 20 12:19:42 2026 +0800

    [SPARK-55026][CORE][SQL] Optimize BestEffortLazyVal
    
    ### What changes were proposed in this pull request?
    
    The change is inspired by 
https://github.com/apache/spark/pull/49212#discussion_r1904755002, although I 
don't use `AtomicReferenceFieldUpdater`, but use `VarHandle`, which is a more 
modern utility class that provides atomic field access.
    
    Before the change, the `(Transient)BestEffortLazyVal` needs to create an 
`AtomicReference`. This requires an extra heap allocation of 16/24 bytes on 
common platforms (depending on the object header size). After the change, the 
size of `(Transient)BestEffortLazyVal` remains unchanged, and we can get rid of 
this extra allocation.
    
    `BestEffortLazyVal` is an infrastructure class in Spark, so it is worth to 
add some complexity to reduce memory usage. It is widely used in planner 
objects (`LogicalPlan`, `Expression`), so I expect this change to improve 
driver stability when handling large plans.
    
    ### Why are the changes needed?
    
    Reduce memory usage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Manual testing with `bin/spark-shell --conf spark.driver.memory=8g`. 
Running a command `Array.fill(n)(new 
org.apache.spark.util.BestEffortLazyVal[java.lang.Integer](() => 1))`:
    - before the change, it can successfully allocate 1.9e8 elements, but OOMs 
on 2e8 elements.
    - after the change, it can successfully allocate 3e8 elements, but OOMs on 
3.1e8 elements.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53761 from chenhao-db/optimize_lazy_val.
    
    Authored-by: Chenhao Li <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/util/BestEffortLazyVal.java}  | 58 ++++++++++++----------
 .../spark/util/TransientBestEffortLazyVal.java}    | 57 ++++++++++++++-------
 2 files changed, 73 insertions(+), 42 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/util/TransientBestEffortLazyVal.scala 
b/core/src/main/java/org/apache/spark/util/BestEffortLazyVal.java
similarity index 62%
rename from 
core/src/main/scala/org/apache/spark/util/TransientBestEffortLazyVal.scala
rename to core/src/main/java/org/apache/spark/util/BestEffortLazyVal.java
index 033b783ede40..d5b2abb9cebf 100644
--- a/core/src/main/scala/org/apache/spark/util/TransientBestEffortLazyVal.scala
+++ b/core/src/main/java/org/apache/spark/util/BestEffortLazyVal.java
@@ -14,10 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.util
+package org.apache.spark.util;
 
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.atomic.AtomicReference
+import scala.Function0;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 
 /**
  * A lock-free implementation of a lazily-initialized variable.
@@ -28,10 +30,6 @@ import java.util.concurrent.atomic.AtomicReference
  * This may be helpful for avoiding deadlocks in certain scenarios where 
exactly-once
  * value computation is not a hard requirement.
  *
- * The main difference between this and [[BestEffortLazyVal]] is that:
- * [[BestEffortLazyVal]] serializes the cached value after computation, while
- * [[TransientBestEffortLazyVal]] always serializes the compute function.
- *
  * @note
  * This helper class has additional requirements on the compute function:
  *   1) The compute function MUST not return null;
@@ -43,27 +41,37 @@ import java.util.concurrent.atomic.AtomicReference
  *   
href="https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html";>Lazy
  *   Vals Initialization</a> for more details.
  */
-private[spark] class TransientBestEffortLazyVal[T <: AnyRef](
-    private[this] val compute: () => T) extends Serializable {
-
-  @transient
-  private[this] var cached: AtomicReference[T] = new 
AtomicReference(null.asInstanceOf[T])
+public class BestEffortLazyVal<T> implements Serializable {
+  private volatile Function0<T> compute;
+  protected volatile T cached;
 
-  def apply(): T = {
-    val value = cached.get()
-    if (value != null) {
-      value
-    } else {
-      val newValue = compute()
-      assert(newValue != null, "compute function cannot return null.")
-      cached.compareAndSet(null.asInstanceOf[T], newValue)
-      cached.get()
+  private static final VarHandle HANDLE;
+  static {
+    try {
+      HANDLE = MethodHandles.lookup()
+        .in(BestEffortLazyVal.class)
+        .findVarHandle(BestEffortLazyVal.class, "cached", Object.class);
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalStateException("Failed to initialize VarHandle", e);
     }
   }
 
-  @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream): Unit = 
Utils.tryOrIOException {
-    ois.defaultReadObject()
-    cached = new AtomicReference(null.asInstanceOf[T])
+  public BestEffortLazyVal(Function0<T> compute) {
+    this.compute = compute;
+  }
+
+  public T apply() {
+    T value = cached;
+    if (value != null) {
+      return value;
+    }
+    Function0<T> f = compute;
+    if (f != null) {
+      T newValue = f.apply();
+      assert newValue != null: "compute function cannot return null.";
+      HANDLE.compareAndSet(this, null, newValue);
+      compute = null; // allow closure to be GC'd
+    }
+    return cached;
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/util/BestEffortLazyVal.scala 
b/core/src/main/java/org/apache/spark/util/TransientBestEffortLazyVal.java
similarity index 54%
rename from core/src/main/scala/org/apache/spark/util/BestEffortLazyVal.scala
rename to 
core/src/main/java/org/apache/spark/util/TransientBestEffortLazyVal.java
index 83044055fe40..784a7abdc4b4 100644
--- a/core/src/main/scala/org/apache/spark/util/BestEffortLazyVal.scala
+++ b/core/src/main/java/org/apache/spark/util/TransientBestEffortLazyVal.java
@@ -14,9 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.util
+package org.apache.spark.util;
 
-import java.util.concurrent.atomic.AtomicReference
+import scala.Function0;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 
 /**
  * A lock-free implementation of a lazily-initialized variable.
@@ -27,6 +32,10 @@ import java.util.concurrent.atomic.AtomicReference
  * This may be helpful for avoiding deadlocks in certain scenarios where 
exactly-once
  * value computation is not a hard requirement.
  *
+ * The main difference between this and [[BestEffortLazyVal]] is that:
+ * [[BestEffortLazyVal]] serializes the cached value after computation, while
+ * [[TransientBestEffortLazyVal]] always serializes the compute function.
+ *
  * @note
  * This helper class has additional requirements on the compute function:
  *   1) The compute function MUST not return null;
@@ -38,24 +47,38 @@ import java.util.concurrent.atomic.AtomicReference
  *   
href="https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html";>Lazy
  *   Vals Initialization</a> for more details.
  */
-private[spark] class BestEffortLazyVal[T <: AnyRef](
-    @volatile private[this] var compute: () => T) extends Serializable {
+public class TransientBestEffortLazyVal<T> implements Serializable {
+  private volatile Function0<T> compute;
+  protected transient volatile T cached;
 
-  private[this] val cached: AtomicReference[T] = new 
AtomicReference(null.asInstanceOf[T])
+  private static final VarHandle HANDLE;
+  static {
+    try {
+      HANDLE = MethodHandles.lookup()
+        .in(TransientBestEffortLazyVal.class)
+        .findVarHandle(TransientBestEffortLazyVal.class, "cached", 
Object.class);
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalStateException("Failed to initialize VarHandle", e);
+    }
+  }
 
-  def apply(): T = {
-    val value = cached.get()
+  public TransientBestEffortLazyVal(Function0<T> compute) {
+    this.compute = compute;
+  }
+
+  public T apply() {
+    T value = cached;
     if (value != null) {
-      value
-    } else {
-      val f = compute
-      if (f != null) {
-        val newValue = f()
-        assert(newValue != null, "compute function cannot return null.")
-        cached.compareAndSet(null.asInstanceOf[T], newValue)
-        compute = null  // allow closure to be GC'd
-      }
-      cached.get()
+      return value;
     }
+    T newValue = compute.apply();
+    assert newValue != null: "compute function cannot return null.";
+    HANDLE.compareAndSet(this, null, newValue);
+    return cached;
+  }
+
+  private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+    ois.defaultReadObject();
+    cached = null;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to