This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 15a399ac2 [#1420] fix(client): reportShuffleWriteFailure failed
because of IndexOutOfBoundsException (#1421)
15a399ac2 is described below
commit 15a399ac2a67632913384863bf8457ed18c0da65
Author: RickyMa <[email protected]>
AuthorDate: Fri Jan 5 20:01:42 2024 +0800
[#1420] fix(client): reportShuffleWriteFailure failed because of
IndexOutOfBoundsException (#1421)
### What changes were proposed in this pull request?
To fix IndexOutOfBoundsException.
### Why are the changes needed?
For [#1420](https://github.com/apache/incubator-uniffle/issues/1420)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Rerun successfully.
---
.../shuffle/manager/ShuffleManagerGrpcService.java | 15 +++++++++------
.../shuffle/manager/ShuffleManagerGrpcServiceTest.java | 14 ++++++++++++++
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
index 444e82cfb..e95aad609 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java
@@ -330,12 +330,15 @@ public class ShuffleManagerGrpcService extends
ShuffleManagerImplBase {
});
List<Map.Entry<String, AtomicInteger>> list =
new ArrayList(shuffleServerFailureRecordCount.entrySet());
- Collections.sort(list, (o1, o2) -> (o1.getValue().get() -
o2.getValue().get()));
- Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry =
list.get(0);
- if (shuffleServerInfoIntegerEntry.getValue().get()
- > shuffleManager.getMaxFetchFailures()) {
-
shuffleManager.addFailuresShuffleServerInfos(shuffleServerInfoIntegerEntry.getKey());
- return true;
+ if (!list.isEmpty()) {
+ Collections.sort(list, (o1, o2) -> (o1.getValue().get() -
o2.getValue().get()));
+ Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry =
list.get(0);
+ if (shuffleServerInfoIntegerEntry.getValue().get()
+ > shuffleManager.getMaxFetchFailures()) {
+ shuffleManager.addFailuresShuffleServerInfos(
+ shuffleServerInfoIntegerEntry.getKey());
+ return true;
+ }
}
return false;
});
diff --git
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
index 16d4418f6..6dc2abbf6 100644
---
a/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
+++
b/client-spark/common/src/test/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcServiceTest.java
@@ -24,6 +24,8 @@ import org.mockito.Mockito;
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureRequest;
+import org.apache.uniffle.proto.RssProtos.ReportShuffleWriteFailureResponse;
import org.apache.uniffle.proto.RssProtos.StatusCode;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -107,5 +109,17 @@ public class ShuffleManagerGrpcServiceTest {
service.reportShuffleFetchFailure(req, appIdResponseObserver);
assertEquals(StatusCode.INVALID_REQUEST,
appIdResponseObserver.value.getStatus());
assertTrue(appIdResponseObserver.value.getMsg().contains("old stage"));
+
+ // reportShuffleWriteFailure with an empty list of shuffleServerIds
+ MockedStreamObserver<ReportShuffleWriteFailureResponse>
+ reportShuffleWriteFailureResponseObserver = new
MockedStreamObserver<>();
+ ReportShuffleWriteFailureRequest reportShuffleWriteFailureRequest =
+ ReportShuffleWriteFailureRequest.newBuilder()
+ .setAppId(appId)
+ .setShuffleId(shuffleId)
+ .buildPartial();
+ service.reportShuffleWriteFailure(
+ reportShuffleWriteFailureRequest,
reportShuffleWriteFailureResponseObserver);
+ assertEquals(StatusCode.SUCCESS,
reportShuffleWriteFailureResponseObserver.value.getStatus());
}
}