This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a399ac2 [#1420] fix(client): reportShuffleWriteFailure failed 
because of IndexOutOfBoundsException (#1421)
15a399ac2 is described below

commit 15a399ac2a67632913384863bf8457ed18c0da65
Author: RickyMa <[email protected]>
AuthorDate: Fri Jan 5 20:01:42 2024 +0800

    [#1420] fix(client): reportShuffleWriteFailure failed because of 
IndexOutOfBoundsException (#1421)
    
    ### What changes were proposed in this pull request?
    
    To fix IndexOutOfBoundsException.
    
    ### Why are the changes needed?
    
    For [#1420](https://github.com/apache/incubator-uniffle/issues/1420)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Rerun successfully.
---
 .../shuffle/manager/ShuffleManagerGrpcService.java        | 15 +++++++++------
 .../shuffle/manager/ShuffleManagerGrpcServiceTest.java    | 14 ++++++++++++++
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 444e82cfb..e95aad609 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -330,12 +330,15 @@ public class ShuffleManagerGrpcService extends 
ShuffleManagerImplBase {
                 });
             List<Map.Entry<String, AtomicInteger>> list =
                 new ArrayList(shuffleServerFailureRecordCount.entrySet());
-            Collections.sort(list, (o1, o2) -> (o1.getValue().get() - 
o2.getValue().get()));
-            Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry = 
list.get(0);
-            if (shuffleServerInfoIntegerEntry.getValue().get()
-                > shuffleManager.getMaxFetchFailures()) {
-              
shuffleManager.addFailuresShuffleServerInfos(shuffleServerInfoIntegerEntry.getKey());
-              return true;
+            if (!list.isEmpty()) {
+              Collections.sort(list, (o1, o2) -> (o1.getValue().get() - 
o2.getValue().get()));
+              Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry = 
list.get(0);
+              if (shuffleServerInfoIntegerEntry.getValue().get()
+                  > shuffleManager.getMaxFetchFailures()) {
+                shuffleManager.addFailuresShuffleServerInfos(
+                    shuffleServerInfoIntegerEntry.getKey());
+                return true;
+              }
             }
             return false;
           });
diff --git 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
index 16d4418f6..6dc2abbf6 100644
--- 
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
@@ -24,6 +24,8 @@ import org.mockito.Mockito;
 
 import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
 import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureResponse;
 import org.apache.uniffle.proto.RssProtos.StatusCode;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -107,5 +109,17 @@ public class ShuffleManagerGrpcServiceTest {
     service.reportShuffleFetchFailure(req, appIdResponseObserver);
     assertEquals(StatusCode.INVALID_REQUEST, 
appIdResponseObserver.value.getStatus());
     assertTrue(appIdResponseObserver.value.getMsg().contains("old stage"));
+
+    // reportShuffleWriteFailure with an empty list of shuffleServerIds
+    MockedStreamObserver<ReportShuffleWriteFailureResponse>
+        reportShuffleWriteFailureResponseObserver = new 
MockedStreamObserver<>();
+    ReportShuffleWriteFailureRequest reportShuffleWriteFailureRequest =
+        ReportShuffleWriteFailureRequest.newBuilder()
+            .setAppId(appId)
+            .setShuffleId(shuffleId)
+            .buildPartial();
+    service.reportShuffleWriteFailure(
+        reportShuffleWriteFailureRequest, 
reportShuffleWriteFailureResponseObserver);
+    assertEquals(StatusCode.SUCCESS, 
reportShuffleWriteFailureResponseObserver.value.getStatus());
   }
 }

Reply via email to