This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 526f7bb4f [CELEBORN-2240] Adapt to SPARK-51756 which add a new
parameter `checksumValue` in `MapStatus.apply`
526f7bb4f is described below
commit 526f7bb4fd31c3e0d362f643da43c92072d71e84
Author: Cheng Pan <[email protected]>
AuthorDate: Sat Dec 20 23:27:43 2025 +0800
[CELEBORN-2240] Adapt to SPARK-51756 which add a new parameter
`checksumValue` in `MapStatus.apply`
### What changes were proposed in this pull request?
Adapt to SPARK-51756, which changes the MapStatus API used by Celeborn.
### Why are the changes needed?
A necessary step to make Celeborn support Spark 4.1.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
Yes, it makes
### How was this patch tested?
Have integrated with Spark 4.1 in the internal test env, and verified by
some simple queries.
Closes #3570 from pan3793/CELEBORN-2240.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../java/org/apache/spark/shuffle/celeborn/SparkUtils.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
index 2b30a2020..b50d3546e 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java
@@ -82,9 +82,17 @@ public class SparkUtils {
public static final String FETCH_FAILURE_ERROR_MSG =
"Celeborn FetchFailure appShuffleId/shuffleId: ";
+ private static final DynMethods.BoundMethod MAP_STATUS_APPLY_METHOD =
+ DynMethods.builder("apply")
+ // for SPARK-51756 (4.1.0) and later
+ .impl(MapStatus$.class, BlockManagerId.class, long[].class,
long.class, long.class)
+ // for Spark 4.0 and earlier
+ .impl(MapStatus$.class, BlockManagerId.class, long[].class,
long.class)
+ .build(MapStatus$.MODULE$);
+
public static MapStatus createMapStatus(
BlockManagerId loc, long[] uncompressedSizes, long mapTaskId) {
- return MapStatus$.MODULE$.apply(loc, uncompressedSizes, mapTaskId);
+ return MAP_STATUS_APPLY_METHOD.invoke(loc, uncompressedSizes, mapTaskId,
0L);
}
private static final DynFields.UnboundField<SQLMetric>
DATA_SIZE_METRIC_FIELD =