This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ef0442 [AURON #2052] NPE fix in `AuronUniffleShuffleReader` (#2053)
f4ef0442 is described below

commit f4ef0442c6a3872e1cbe5db75ef5748ce3bb5139
Author: slfan1989 <[email protected]>
AuthorDate: Wed Mar 4 14:44:24 2026 +0800

    [AURON #2052] NPE fix in `AuronUniffleShuffleReader` (#2053)
    
    ### Which issue does this PR close?
    
    Closes #2052
    
    ### Rationale for this change
    
    In `AuronUniffleShuffleReader`, there are potential
    `NullPointerException` risks when handling partitions:
    
    - `partitionToExpectBlocks.get(partition)` may return null, but the code
    directly calls `.isEmpty()` without null check
    - `partitionToShuffleServers.get(partition)` may return null, but the
    code uses it directly without validationThis can cause the shuffle
    reader to crash when reading partitions that have missing or incomplete
    metadata.
    
    This can cause the shuffle reader to crash when reading partitions that
    have missing or incomplete metadata.
    
    ### What changes are included in this PR?
    
    ### Are there any user-facing changes?
    
    No.
    
    ### How was this patch tested?
    
    Exists Junit Test.
    
    Signed-off-by: slfan1989 <[email protected]>
---
 .../uniffle/AuronUniffleShuffleReader.scala        | 80 ++++++++++++----------
 1 file changed, 43 insertions(+), 37 deletions(-)

diff --git 
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
 
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
index 8ce1d70e..94e139ac 100644
--- 
a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
+++ 
b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala
@@ -121,47 +121,53 @@ class AuronUniffleShuffleReader[K, C](
 
       val emptyPartitionIds = new util.ArrayList[Int]
       for (partition <- startPartition until endPartition) {
-        if (partitionToExpectBlocks.get(partition).isEmpty) {
-          logInfo(s"$partition is a empty partition")
+        val expectedBlocks = partitionToExpectBlocks.get(partition)
+        if (expectedBlocks == null || expectedBlocks.isEmpty) {
+          logDebug(s"$partition is a empty partition")
           emptyPartitionIds.add(partition)
         } else {
           val shuffleServerInfoList: util.List[ShuffleServerInfo] =
             partitionToShuffleServers.get(partition)
-          // This mechanism of expectedTaskIdsBitmap filter is to filter out 
the most of data.
-          // especially for AQE skew optimization
-          val expectedTaskIdsBitmapFilterEnable: Boolean =
-            !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE) || 
shuffleServerInfoList.size > 1
-          val retryMax: Int = rssConf.getInteger(
-            RssClientConfig.RSS_CLIENT_RETRY_MAX,
-            RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE)
-          val retryIntervalMax: Long = rssConf.getLong(
-            RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
-            RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE)
-          val shuffleReadClient: ShuffleReadClient =
-            ShuffleClientFactory.getInstance.createShuffleReadClient(
-              ShuffleClientFactory.newReadBuilder
-                .appId(appId)
-                .shuffleId(shuffleId)
-                .partitionId(partition)
-                .basePath(basePath)
-                .partitionNumPerRange(1)
-                .partitionNum(partitionNum)
-                .blockIdBitmap(partitionToExpectBlocks.get(partition))
-                .taskIdBitmap(taskIdBitmap)
-                .shuffleServerInfoList(shuffleServerInfoList)
-                .hadoopConf(hadoopConf)
-                .shuffleDataDistributionType(dataDistributionType)
-                
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
-                .retryMax(retryMax)
-                .retryIntervalMax(retryIntervalMax)
-                .rssConf(rssConf))
-          val iterator: RssShuffleDataIterWrapper[K, C] =
-            new RssShuffleDataIterWrapper[K, C](
-              dependency,
-              shuffleReadClient,
-              readMetrics,
-              rssConf)
-          shuffleDataIterList.add(iterator)
+          if (shuffleServerInfoList == null || shuffleServerInfoList.isEmpty) {
+            logWarning(s"$partition has no shuffle servers, treat as empty 
partition")
+            emptyPartitionIds.add(partition)
+          } else {
+            // This mechanism of expectedTaskIdsBitmap filter is to filter out 
the most of data.
+            // especially for AQE skew optimization
+            val expectedTaskIdsBitmapFilterEnable: Boolean =
+              !(mapStartIndex == 0 && mapEndIndex == Integer.MAX_VALUE) || 
shuffleServerInfoList.size > 1
+            val retryMax: Int = rssConf.getInteger(
+              RssClientConfig.RSS_CLIENT_RETRY_MAX,
+              RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE)
+            val retryIntervalMax: Long = rssConf.getLong(
+              RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX,
+              RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE)
+            val shuffleReadClient: ShuffleReadClient =
+              ShuffleClientFactory.getInstance.createShuffleReadClient(
+                ShuffleClientFactory.newReadBuilder
+                  .appId(appId)
+                  .shuffleId(shuffleId)
+                  .partitionId(partition)
+                  .basePath(basePath)
+                  .partitionNumPerRange(1)
+                  .partitionNum(partitionNum)
+                  .blockIdBitmap(expectedBlocks)
+                  .taskIdBitmap(taskIdBitmap)
+                  .shuffleServerInfoList(shuffleServerInfoList)
+                  .hadoopConf(hadoopConf)
+                  .shuffleDataDistributionType(dataDistributionType)
+                  
.expectedTaskIdsBitmapFilterEnable(expectedTaskIdsBitmapFilterEnable)
+                  .retryMax(retryMax)
+                  .retryIntervalMax(retryIntervalMax)
+                  .rssConf(rssConf))
+            val iterator: RssShuffleDataIterWrapper[K, C] =
+              new RssShuffleDataIterWrapper[K, C](
+                dependency,
+                shuffleReadClient,
+                readMetrics,
+                rssConf)
+            shuffleDataIterList.add(iterator)
+          }
         }
       }
       if (!emptyPartitionIds.isEmpty) {

Reply via email to