This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new df809159d [CELEBORN-1898] SparkOutOfMemoryError compatible with Spark 
4.0 and 4.1
df809159d is described below

commit df809159d1f8d3070ced5e37e7696c2a348ac113
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 10 15:19:50 2025 +0800

    [CELEBORN-1898] SparkOutOfMemoryError compatible with Spark 4.0 and 4.1
    
    ### What changes were proposed in this pull request?
    
    SPARK-49946 (4.0.0) removes single String constructor of class 
`SparkOutOfMemoryError` and introduces `_LEGACY_ERROR_TEMP_3301` error 
condition, SPARK-51386 (4.1.0) renames the error condition to 
`POINTER_ARRAY_OUT_OF_MEMORY`.
    
    ### Why are the changes needed?
    
    To be compatible with Spark 4.0 and 4.1
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GHA checks Spark 2.4 to Spark 3.5, I manually tested with Spark 4.0.0 RC2
    
    Closes #3141 from pan3793/CELEBORN-1898.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../shuffle/celeborn/ShuffleInMemorySorter.java    |  3 +-
 .../spark/shuffle/celeborn/SparkCommonUtils.java   | 40 ++++++++++++++++++++++
 .../shuffle/celeborn/SparkCommonUtilsSuiteJ.java   | 29 ++++++++++++++++
 3 files changed, 70 insertions(+), 2 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
index 57d569eba..c3af2ebd6 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
@@ -18,7 +18,6 @@
 package org.apache.spark.shuffle.celeborn;
 
 import org.apache.spark.memory.MemoryConsumer;
-import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.util.collection.unsafe.sort.RadixSort;
@@ -87,7 +86,7 @@ public class ShuffleInMemorySorter {
   public void expandPointerArray(LongArray newArray) {
     if (array != null) {
       if (newArray.size() < array.size()) {
-        throw new SparkOutOfMemoryError("Not enough memory to grow pointer 
array");
+        SparkCommonUtils.throwSparkOutOfMemoryError();
       }
       Platform.copyMemory(
           array.getBaseObject(),
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
index 9b959a4b8..a24e06d5a 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
@@ -17,8 +17,15 @@
 
 package org.apache.spark.shuffle.celeborn;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
+import org.apache.spark.memory.SparkOutOfMemoryError;
+
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.reflect.DynMethods;
 
 public class SparkCommonUtils {
   public static void validateAttemptConfig(SparkConf conf) throws 
IllegalArgumentException {
@@ -48,4 +55,37 @@ public class SparkCommonUtils {
   public static int getEncodedAttemptNumber(TaskContext context) {
     return (context.stageAttemptNumber() << 16) | context.attemptNumber();
   }
+
+  public static void throwSparkOutOfMemoryError() {
+    try { // for Spark 3.5 and earlier
+      throw DynConstructors.builder()
+          .impl(SparkOutOfMemoryError.class, String.class)
+          .<SparkOutOfMemoryError>build()
+          .newInstance("Not enough memory to grow pointer array");
+    } catch (RuntimeException e) {
+      // SPARK-44838 (4.0.0)
+      DynMethods.StaticMethod isValidErrorClassMethod =
+          DynMethods.builder("isValidErrorClass")
+              .impl("org.apache.spark.SparkThrowableHelper", String.class)
+              .buildStatic();
+      // SPARK-49946 (4.0.0) removes single String constructor and introduces
+      // _LEGACY_ERROR_TEMP_3301 error condition, SPARK-51386 (4.1.0) renames
+      // the error condition to POINTER_ARRAY_OUT_OF_MEMORY.
+      if (isValidErrorClassMethod.invoke("POINTER_ARRAY_OUT_OF_MEMORY")) { // 
for Spark 4.1 onwards
+        throw DynConstructors.builder()
+            .impl(SparkOutOfMemoryError.class, String.class, Map.class)
+            .<SparkOutOfMemoryError>build()
+            .newInstance("POINTER_ARRAY_OUT_OF_MEMORY", Collections.EMPTY_MAP);
+      } else if (isValidErrorClassMethod.invoke("_LEGACY_ERROR_TEMP_3301")) { 
// for Spark 4.0
+        throw DynConstructors.builder()
+            .impl(SparkOutOfMemoryError.class, String.class, Map.class)
+            .<SparkOutOfMemoryError>build()
+            .newInstance("_LEGACY_ERROR_TEMP_3301", Collections.EMPTY_MAP);
+      } else {
+        throw new OutOfMemoryError(
+            "Unable to construct a SparkOutOfMemoryError, please report this 
bug to the "
+                + "corresponding communities or vendors, and provide the full 
stack trace.");
+      }
+    }
+  }
 }
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java
new file mode 100644
index 000000000..02ed7fa17
--- /dev/null
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.celeborn;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.junit.Test;
+
+public class SparkCommonUtilsSuiteJ {
+
+  @Test(expected = SparkOutOfMemoryError.class)
+  public void testThrowSparkOutOfMemoryError() {
+    SparkCommonUtils.throwSparkOutOfMemoryError();
+  }
+}

Reply via email to