jerqi commented on code in PR #1258:
URL:
https://github.com/apache/incubator-uniffle/pull/1258#discussion_r1377183745
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -163,6 +239,90 @@ public void unregisterShuffle(int shuffleId) {
shuffleStatus.remove(shuffleId);
}
+ private static class ShuffleServerFailureRecord {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock =
lock.writeLock();
+ private final Map<String, AtomicInteger> shuffleServerFailureRecordCount;
+ private int stageAttemptNumber;
+
+ private ShuffleServerFailureRecord(
+ Map<String, AtomicInteger> shuffleServerFailureRecordCount, int
stageAttemptNumber) {
+ this.shuffleServerFailureRecordCount = shuffleServerFailureRecordCount;
+ this.stageAttemptNumber = stageAttemptNumber;
+ }
+
+ private <T> T withReadLock(Supplier<T> fn) {
+ readLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private <T> T withWriteLock(Supplier<T> fn) {
+ writeLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public int getStageAttempt() {
+ return withReadLock(() -> this.stageAttemptNumber);
+ }
+
+ public int resetStageAttemptIfNecessary(int stageAttemptNumber) {
+ return withWriteLock(
+ () -> {
+ if (this.stageAttemptNumber < stageAttemptNumber) {
+ // a new stage attempt is issued. Record the shuffleServer
status of the Map should be
+ // clear and reset.
+ shuffleServerFailureRecordCount.clear();
+ this.stageAttemptNumber = stageAttemptNumber;
+ return 1;
+ } else if (this.stageAttemptNumber > stageAttemptNumber) {
+ return -1;
+ }
+ return 0;
+ });
+ }
+
+ public boolean incPartitionWriteFailure(
+ int stageAttemptNumber,
+ List<ShuffleServerInfo> shuffleServerInfos,
+ RssShuffleManagerInterface shuffleManager) {
+ return withWriteLock(
+ () -> {
Review Comment:
```
if (this.stageAttemptNumber != stageAttemptNumber) {
// do nothing here
return false;
}
shuffleServerInfos.forEach(
shuffleServerInfo -> {
shuffleServerFailureRecordCount
.computeIfAbsent(shuffleServerInfo.getId(), k -> new
AtomicInteger())
.incrementAndGet();
});
List<Map.Entry<String, AtomicInteger>> list =
new ArrayList(shuffleServerFailureRecordCount.entrySet());
Collections.sort(list, (o1, o2) -> (o1.getValue().get() -
o2.getValue().get()));
Map.Entry<String, AtomicInteger> shuffleServerInfoIntegerEntry
= list.get(0);
if (shuffleServerInfoIntegerEntry.getValue().get()
> shuffleManager.getMaxFetchFailures()) {
shuffleManager.addFailuresShuffleServerInfos(
shuffleServerInfoIntegerEntry.getKey());
return true;
}
return false;
```
More clear.
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -37,12 +40,85 @@
public class ShuffleManagerGrpcService extends ShuffleManagerImplBase {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleManagerGrpcService.class);
private final Map<Integer, RssShuffleStatus> shuffleStatus =
JavaUtils.newConcurrentMap();
+ // The shuffleId mapping records the number of ShuffleServer write failures
+ private final Map<Integer, ShuffleServerFailureRecord> shuffleWrtieStatus =
+ JavaUtils.newConcurrentMap();
private final RssShuffleManagerInterface shuffleManager;
public ShuffleManagerGrpcService(RssShuffleManagerInterface shuffleManager) {
this.shuffleManager = shuffleManager;
}
+ @Override
+ public void reportShuffleWriteFailure(
+ RssProtos.ReportShuffleWriteFailureRequest request,
+ StreamObserver<RssProtos.ReportShuffleWriteFailureResponse>
responseObserver) {
+ String appId = request.getAppId();
+ int shuffleId = request.getShuffleId();
+ int stageAttemptNumber = request.getStageAttemptNumber();
+ List<RssProtos.ShuffleServerId> shuffleServerIdsList =
request.getShuffleServerIdsList();
+ RssProtos.StatusCode code;
+ boolean reSubmitWholeStage;
+ String msg;
+ if (!appId.equals(shuffleManager.getAppId())) {
+ msg =
+ String.format(
+ "got a wrong shuffle write failure report from appId: %s,
expected appId: %s",
+ appId, shuffleManager.getAppId());
+ LOG.warn(msg);
+ code = RssProtos.StatusCode.INVALID_REQUEST;
+ reSubmitWholeStage = false;
+ } else {
+ Map<String, AtomicInteger> shuffleServerInfoIntegerMap =
JavaUtils.newConcurrentMap();
+ List<ShuffleServerInfo> shuffleServerInfos =
+ ShuffleServerInfo.fromProto(shuffleServerIdsList);
+ shuffleServerInfos.forEach(
+ shuffleServerInfo -> {
+ shuffleServerInfoIntegerMap.put(shuffleServerInfo.getId(), new
AtomicInteger(0));
+ });
+ ShuffleServerFailureRecord shuffleServerFailureRecord =
+ shuffleWrtieStatus.computeIfAbsent(
+ shuffleId,
+ key ->
+ new ShuffleServerFailureRecord(shuffleServerInfoIntegerMap,
stageAttemptNumber));
+ int c =
shuffleServerFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
Review Comment:
Could we give this variable a better name?
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -163,6 +239,90 @@ public void unregisterShuffle(int shuffleId) {
shuffleStatus.remove(shuffleId);
}
+ private static class ShuffleServerFailureRecord {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock =
lock.writeLock();
+ private final Map<String, AtomicInteger> shuffleServerFailureRecordCount;
+ private int stageAttemptNumber;
+
+ private ShuffleServerFailureRecord(
+ Map<String, AtomicInteger> shuffleServerFailureRecordCount, int
stageAttemptNumber) {
+ this.shuffleServerFailureRecordCount = shuffleServerFailureRecordCount;
+ this.stageAttemptNumber = stageAttemptNumber;
+ }
+
+ private <T> T withReadLock(Supplier<T> fn) {
+ readLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private <T> T withWriteLock(Supplier<T> fn) {
+ writeLock.lock();
+ try {
+ return fn.get();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public int getStageAttempt() {
+ return withReadLock(() -> this.stageAttemptNumber);
+ }
+
+ public int resetStageAttemptIfNecessary(int stageAttemptNumber) {
Review Comment:
Could we return `bool` type here?
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -163,6 +239,90 @@ public void unregisterShuffle(int shuffleId) {
shuffleStatus.remove(shuffleId);
}
+ private static class ShuffleServerFailureRecord {
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock =
lock.writeLock();
+ private final Map<String, AtomicInteger> shuffleServerFailureRecordCount;
+ private int stageAttemptNumber;
+
+ private ShuffleServerFailureRecord(
+ Map<String, AtomicInteger> shuffleServerFailureRecordCount, int
stageAttemptNumber) {
+ this.shuffleServerFailureRecordCount = shuffleServerFailureRecordCount;
+ this.stageAttemptNumber = stageAttemptNumber;
+ }
+
+ private <T> T withReadLock(Supplier<T> fn) {
Review Comment:
These methods seems useful. Could we extract some methods?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]