This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new fbf2a83d [AURON #1730] Fix data size calculation in Celeborn shuffle
writer (#1731)
fbf2a83d is described below
commit fbf2a83d73a6a45269d3033c6757da854329d813
Author: Dominik <[email protected]>
AuthorDate: Thu Dec 11 15:51:37 2025 +0800
[AURON #1730] Fix data size calculation in Celeborn shuffle writer (#1731)
<!--
Thanks for sending a pull request! Please keep the following tips in
mind:
- Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
Short summary...'.
- Make your PR title clear and descriptive, summarizing what this PR
changes.
- Provide a concise example to reproduce the issue, if possible.
- Keep the PR description up to date with all changes.
-->
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
Closes #1730
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
# What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
# Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
# How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add
some test cases that check the changes thoroughly including negative and
positive cases if possible.
If it was tested in a way different from regular unit tests, please
clarify how you tested step by step, ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future.
If tests were not added, please describe why they were not added and/or
why it was difficult to add.
-->
---
.../celeborn/AuronCelebornShuffleWriter.scala | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git
a/thirdparty/auron-celeborn-0.6/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/celeborn/AuronCelebornShuffleWriter.scala
b/thirdparty/auron-celeborn-0.6/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/celeborn/AuronCelebornShuffleWriter.scala
index ba5841b1..bdef9f4f 100644
---
a/thirdparty/auron-celeborn-0.6/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/celeborn/AuronCelebornShuffleWriter.scala
+++
b/thirdparty/auron-celeborn-0.6/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/celeborn/AuronCelebornShuffleWriter.scala
@@ -76,4 +76,24 @@ class AuronCelebornShuffleWriter[K, V](
Shims.get
.getMapStatus(blockManagerId,
celebornPartitionWriter.getPartitionLengthMap, mapId))
}
+
+ // Override stop to use partition length map directly instead of rssStop's
mapStatus
+ // because celeborn writer doesn't populate partition sizes correctly when
using native writer
+ override def stop(success: Boolean): Option[MapStatus] = {
+ if (!success) {
+ celebornShuffleWriter.stop(success)
+ return None
+ }
+
+ celebornShuffleWriter.write(Iterator.empty)
+ celebornShuffleWriter.stop(success)
+
+ // Always use getPartitionLengthMap for Celeborn to get correct partition
sizes
+ val blockManagerId = SparkEnv.get.blockManager.shuffleServerId
+ Some(
+ Shims.get.getMapStatus(
+ blockManagerId,
+ celebornPartitionWriter.getPartitionLengthMap,
+ taskContext.partitionId()))
+ }
}