This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new df809159d [CELEBORN-1898] SparkOutOfMemoryError compatible with Spark
4.0 and 4.1
df809159d is described below
commit df809159d1f8d3070ced5e37e7696c2a348ac113
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Mar 10 15:19:50 2025 +0800
[CELEBORN-1898] SparkOutOfMemoryError compatible with Spark 4.0 and 4.1
### What changes were proposed in this pull request?
SPARK-49946 (4.0.0) removes single String constructor of class
`SparkOutOfMemoryError` and introduces `_LEGACY_ERROR_TEMP_3301` error
condition, SPARK-51386 (4.1.0) renames the error condition to
`POINTER_ARRAY_OUT_OF_MEMORY`.
### Why are the changes needed?
To be compatible with Spark 4.0 and 4.1
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GHA checks Spark 2.4 to Spark 3.5, I manually tested with Spark 4.0.0 RC2
Closes #3141 from pan3793/CELEBORN-1898.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../shuffle/celeborn/ShuffleInMemorySorter.java | 3 +-
.../spark/shuffle/celeborn/SparkCommonUtils.java | 40 ++++++++++++++++++++++
.../shuffle/celeborn/SparkCommonUtilsSuiteJ.java | 29 ++++++++++++++++
3 files changed, 70 insertions(+), 2 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
index 57d569eba..c3af2ebd6 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java
@@ -18,7 +18,6 @@
package org.apache.spark.shuffle.celeborn;
import org.apache.spark.memory.MemoryConsumer;
-import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.util.collection.unsafe.sort.RadixSort;
@@ -87,7 +86,7 @@ public class ShuffleInMemorySorter {
public void expandPointerArray(LongArray newArray) {
if (array != null) {
if (newArray.size() < array.size()) {
- throw new SparkOutOfMemoryError("Not enough memory to grow pointer
array");
+ SparkCommonUtils.throwSparkOutOfMemoryError();
}
Platform.copyMemory(
array.getBaseObject(),
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
index 9b959a4b8..a24e06d5a 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java
@@ -17,8 +17,15 @@
package org.apache.spark.shuffle.celeborn;
+import java.util.Collections;
+import java.util.Map;
+
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
+import org.apache.spark.memory.SparkOutOfMemoryError;
+
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.reflect.DynMethods;
public class SparkCommonUtils {
public static void validateAttemptConfig(SparkConf conf) throws
IllegalArgumentException {
@@ -48,4 +55,37 @@ public class SparkCommonUtils {
public static int getEncodedAttemptNumber(TaskContext context) {
return (context.stageAttemptNumber() << 16) | context.attemptNumber();
}
+
+ public static void throwSparkOutOfMemoryError() {
+ try { // for Spark 3.5 and earlier
+ throw DynConstructors.builder()
+ .impl(SparkOutOfMemoryError.class, String.class)
+ .<SparkOutOfMemoryError>build()
+ .newInstance("Not enough memory to grow pointer array");
+ } catch (RuntimeException e) {
+ // SPARK-44838 (4.0.0)
+ DynMethods.StaticMethod isValidErrorClassMethod =
+ DynMethods.builder("isValidErrorClass")
+ .impl("org.apache.spark.SparkThrowableHelper", String.class)
+ .buildStatic();
+ // SPARK-49946 (4.0.0) removes single String constructor and introduces
+ // _LEGACY_ERROR_TEMP_3301 error condition, SPARK-51386 (4.1.0) renames
+ // the error condition to POINTER_ARRAY_OUT_OF_MEMORY.
+ if (isValidErrorClassMethod.invoke("POINTER_ARRAY_OUT_OF_MEMORY")) { //
for Spark 4.1 onwards
+ throw DynConstructors.builder()
+ .impl(SparkOutOfMemoryError.class, String.class, Map.class)
+ .<SparkOutOfMemoryError>build()
+ .newInstance("POINTER_ARRAY_OUT_OF_MEMORY", Collections.EMPTY_MAP);
+ } else if (isValidErrorClassMethod.invoke("_LEGACY_ERROR_TEMP_3301")) {
// for Spark 4.0
+ throw DynConstructors.builder()
+ .impl(SparkOutOfMemoryError.class, String.class, Map.class)
+ .<SparkOutOfMemoryError>build()
+ .newInstance("_LEGACY_ERROR_TEMP_3301", Collections.EMPTY_MAP);
+ } else {
+ throw new OutOfMemoryError(
+ "Unable to construct a SparkOutOfMemoryError, please report this
bug to the "
+ + "corresponding communities or vendors, and provide the full
stack trace.");
+ }
+ }
+ }
}
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java
new file mode 100644
index 000000000..02ed7fa17
--- /dev/null
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.celeborn;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.junit.Test;
+
+public class SparkCommonUtilsSuiteJ {
+
+ @Test(expected = SparkOutOfMemoryError.class)
+ public void testThrowSparkOutOfMemoryError() {
+ SparkCommonUtils.throwSparkOutOfMemoryError();
+ }
+}