This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-parallel-scan-kdev-build in repository https://gitbox.apache.org/repos/asf/auron.git
commit 10d7d0bbe144f05a145ea63d8a2f3463038849c1 Author: zhuangxian <[email protected]> AuthorDate: Wed Dec 10 13:26:34 2025 +0000 fix empty partition data size KDev_MR_linkļ¼https://ksurl.cn/yez9gIJf --- pom.xml | 2 +- .../celeborn/BlazeCelebornShuffleWriter.scala | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 57140bf3..0301c25f 100644 --- a/pom.xml +++ b/pom.xml @@ -416,7 +416,7 @@ <id>spark-3.5</id> <properties> <shimName>spark-3.5</shimName> - <pkgSuffix>-kwai-adapt-cele060</pkgSuffix> + <pkgSuffix>-kwai-adapt-cele060-fix-non-partition-len</pkgSuffix> <shimPkg>spark-extension-shims-spark3</shimPkg> <javaVersion>1.8</javaVersion> <scalaVersion>2.12</scalaVersion> diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala index 2c636307..c5c4df76 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/shuffle/celeborn/BlazeCelebornShuffleWriter.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.execution.blaze.shuffle.celeborn import org.apache.celeborn.client.ShuffleClient -import org.apache.spark.TaskContext +import org.apache.spark.{TaskContext, SparkEnv} import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.shuffle.ShuffleWriteMetricsReporter @@ -24,6 +24,7 @@ import org.apache.spark.shuffle.ShuffleWriter import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle import org.apache.spark.shuffle.celeborn.ExecutorShuffleIdTracker import org.apache.spark.shuffle.celeborn.SparkUtils +import org.apache.spark.sql.blaze.Shims import org.apache.spark.sql.execution.blaze.shuffle.BlazeRssShuffleWriterBase import org.apache.spark.sql.execution.blaze.shuffle.RssPartitionWriterBase import org.blaze.sparkver @@ -68,4 +69,19 @@ class BlazeCelebornShuffleWriter[K, V]( celebornShuffleWriter.write(Iterator.empty) // force flush celebornShuffleWriter.stop(success) } -} + + // Override stop to use partition length map directly instead of rssStop's mapStatus + // because celeborn writer doesn't populate partition sizes correctly when using native writer + override def stop(success: Boolean): Option[MapStatus] = { + if (!success) { + return None + } + + // Always use getPartitionLengthMap for Celeborn to get correct partition sizes + val blockManagerId = SparkEnv.get.blockManager.shuffleServerId + Some(Shims.get.getMapStatus( + blockManagerId, + celebornPartitionWriter.getPartitionLengthMap, + taskContext.partitionId())) + } +} \ No newline at end of file
