This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new f4ef0442 [AURON #2052] NPE fix in `AuronUniffleShuffleReader` (#2053)
f4ef0442 is described below
commit f4ef0442c6a3872e1cbe5db75ef5748ce3bb5139
Author: slfan1989 <[email protected]>
AuthorDate: Wed Mar 4 14:44:24 2026 +0800
[AURON #2052] NPE fix in `AuronUniffleShuffleReader` (#2053)
### Which issue does this PR close?
Closes #2052
### Rationale for this change
In `AuronUniffleShuffleReader`, there are potential
`NullPointerException` risks when handling partitions:
- `partitionToExpectBlocks.get(partition)` may return null, but the code
directly calls `.isEmpty()` without null check
- `partitionToShuffleServers.get(partition)` may return null, but the
code uses it directly without validationThis can cause the shuffle
reader to crash when reading partitions that have missing or incomplete
metadata.
This can cause the shuffle reader to crash when reading partitions that
have missing or incomplete metadata.
### What changes are included in this PR?
### Are there any user-facing changes?
No.
### How was this patch tested?
Exists Junit Test.
Signed-off-by: slfan1989 <[email protected]>
---
.../uniffle/AuronUniffleShuffleReader.scala | 80 ++++++++++++----------
1 file changed, 43 insertions(+), 37 deletions(-)
diff --git
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
index 8ce1d70e..94e139ac 100644
---
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
+++
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
@@ -121,47 +121,53 @@ class AuronUniffleShuffleReader[K, C](
val emptyPartitionIds = new util.ArrayList[Int]
for (partition <- startPartition until endPartition) {
- if (partitionToExpectBlocks.get(partition).isEmpty) {
- logInfo(s"$partition is a empty partition")
+ val expectedBlocks = partitionToExpectBlocks.get(partition)
+ if (expectedBlocks == null || expectedBlocks.isEmpty) {
+ logDebug(s"$partition is a empty partition")
emptyPartitionIds.add(partition)
} else {
val shuffleServerInfoList: util.List[ShuffleServerInfo] =
partitionToShuffleServers.get(partition)
- // This mechanism of expectedTaskIdsBitmap filter is to filter out
the most of data.
- // especially for AQE skew optimization
- val expectedTaskIdsBitmapFilterEnable: Boolean =
- !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE) ||
shuffleServerInfoList.size > 1
- val retryMax: Int = rssConf.getInteger(
- RssClientConfig.RSS_CLIENT_RETRY_MAX,
- RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE)
- val retryIntervalMax: Long = rssConf.getLong(
- RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
- RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE)
- val shuffleReadClient: ShuffleReadClient =
- ShuffleClientFactory.getInstance.createShuffleReadClient(
- ShuffleClientFactory.newReadBuilder
- .appId(appId)
- .shuffleId(shuffleId)
- .partitionId(partition)
- .basePath(basePath)
- .partitionNumPerRange(1)
- .partitionNum(partitionNum)
- .blockIdBitmap(partitionToExpectBlocks.get(partition))
- .taskIdBitmap(taskIdBitmap)
- .shuffleServerInfoList(shuffleServerInfoList)
- .hadoopConf(hadoopConf)
- .shuffleDataDistributionType(dataDistributionType)
-
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
- .retryMax(retryMax)
- .retryIntervalMax(retryIntervalMax)
- .rssConf(rssConf))
- val iterator: RssShuffleDataIterWrapper[K, C] =
- new RssShuffleDataIterWrapper[K, C](
- dependency,
- shuffleReadClient,
- readMetrics,
- rssConf)
- shuffleDataIterList.add(iterator)
+ if (shuffleServerInfoList == null || shuffleServerInfoList.isEmpty) {
+ logWarning(s"$partition has no shuffle servers, treat as empty
partition")
+ emptyPartitionIds.add(partition)
+ } else {
+ // This mechanism of expectedTaskIdsBitmap filter is to filter out
the most of data.
+ // especially for AQE skew optimization
+ val expectedTaskIdsBitmapFilterEnable: Boolean =
+ !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE) ||
shuffleServerInfoList.size > 1
+ val retryMax: Int = rssConf.getInteger(
+ RssClientConfig.RSS_CLIENT_RETRY_MAX,
+ RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE)
+ val retryIntervalMax: Long = rssConf.getLong(
+ RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+ RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE)
+ val shuffleReadClient: ShuffleReadClient =
+ ShuffleClientFactory.getInstance.createShuffleReadClient(
+ ShuffleClientFactory.newReadBuilder
+ .appId(appId)
+ .shuffleId(shuffleId)
+ .partitionId(partition)
+ .basePath(basePath)
+ .partitionNumPerRange(1)
+ .partitionNum(partitionNum)
+ .blockIdBitmap(expectedBlocks)
+ .taskIdBitmap(taskIdBitmap)
+ .shuffleServerInfoList(shuffleServerInfoList)
+ .hadoopConf(hadoopConf)
+ .shuffleDataDistributionType(dataDistributionType)
+
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+ .retryMax(retryMax)
+ .retryIntervalMax(retryIntervalMax)
+ .rssConf(rssConf))
+ val iterator: RssShuffleDataIterWrapper[K, C] =
+ new RssShuffleDataIterWrapper[K, C](
+ dependency,
+ shuffleReadClient,
+ readMetrics,
+ rssConf)
+ shuffleDataIterList.add(iterator)
+ }
}
}
if (!emptyPartitionIds.isEmpty) {