zuston commented on code in PR #1845:
URL:
https://github.com/apache/incubator-uniffle/pull/1845#discussion_r1862875054
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdSelfManagedShuffleWriteClient.java:
##########
@@ -77,6 +77,31 @@ public void reportShuffleResult(
managerClientSupplier.get().reportShuffleResult(request);
}
+ @Override
+ public void reportShuffleResult(
+ Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds,
+ String appId,
+ int shuffleId,
+ long taskAttemptId,
+ int bitmapNum,
+ Set<ShuffleServerInfo> reportFailureServers,
+ boolean enableWriteFailureRetry) {
+ Map<Integer, List<Long>> partitionToBlockIds = new HashMap<>();
+ for (Map<Integer, Set<Long>> k : serverToPartitionToBlockIds.values()) {
+ for (Map.Entry<Integer, Set<Long>> entry : k.entrySet()) {
+ int partitionId = entry.getKey();
+ partitionToBlockIds
+ .computeIfAbsent(partitionId, x -> new ArrayList<>())
+ .addAll(entry.getValue());
+ }
+ }
+
+ RssReportShuffleResultRequest request =
+ new RssReportShuffleResultRequest(
+ appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum);
+ managerClientSupplier.get().reportShuffleResult(request);
Review Comment:
It should also support the stage attempt id as the condition for blockId
selfManaged of spark, and then support deleting when stage retry.
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdManager.java:
##########
@@ -40,18 +40,22 @@ public BlockIdManager() {
}
public void add(int shuffleId, int partitionId, List<Long> ids) {
+
if (CollectionUtils.isEmpty(ids)) {
return;
}
Map<Integer, Roaring64NavigableMap> partitionedBlockIds =
- blockIds.computeIfAbsent(shuffleId, (k) ->
JavaUtils.newConcurrentMap());
- partitionedBlockIds.compute(
- partitionId,
- (id, bitmap) -> {
- Roaring64NavigableMap store = bitmap == null ?
Roaring64NavigableMap.bitmapOf() : bitmap;
- ids.stream().forEach(x -> store.add(x));
- return store;
- });
+ blockIds.computeIfAbsent(shuffleId, k -> JavaUtils.newConcurrentMap());
+ synchronized (partitionedBlockIds) {
Review Comment:
Why needing this synchronized, from my sight, the concurrenthashmap will
ensure thread safe.
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -75,43 +79,62 @@ public void reportShuffleWriteFailure(
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
- Map<String, AtomicInteger> shuffleServerInfoIntegerMap =
JavaUtils.newConcurrentMap();
+ Map<String, AtomicInteger> initServerFailures =
JavaUtils.newConcurrentMap();
List<ShuffleServerInfo> shuffleServerInfos =
ShuffleServerInfo.fromProto(shuffleServerIdsList);
shuffleServerInfos.forEach(
- shuffleServerInfo -> {
- shuffleServerInfoIntegerMap.put(shuffleServerInfo.getId(), new
AtomicInteger(0));
- });
- ShuffleServerFailureRecord shuffleServerFailureRecord =
+ shuffleServerInfo ->
+ initServerFailures.computeIfAbsent(
+ shuffleServerInfo.getId(), key -> new AtomicInteger(0)));
+ ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
shuffleWriteStatus.computeIfAbsent(
shuffleId,
- key ->
- new ShuffleServerFailureRecord(shuffleServerInfoIntegerMap,
stageAttemptNumber));
+ key -> new ShuffleServerWriterFailureRecord(stageAttemptNumber,
initServerFailures));
boolean resetflag =
-
shuffleServerFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
+
shuffleServerWriterFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
if (resetflag) {
msg =
String.format(
- "got an old stage(%d vs %d) shuffle write failure report,
which should be impossible.",
- shuffleServerFailureRecord.getStageAttempt(),
stageAttemptNumber);
+ "got an old stage(%d_%d) shuffle write failure report, which
should be impossible.",
+ stageAttemptId, stageAttemptNumber);
LOG.warn(msg);
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
code = RssProtos.StatusCode.SUCCESS;
// update the stage shuffleServer write failed count
- boolean fetchFailureflag =
- shuffleServerFailureRecord.incPartitionWriteFailure(
+ boolean isFetchFailed =
+ shuffleServerWriterFailureRecord.incWriteFailureForShuffleServer(
stageAttemptNumber, shuffleServerInfos, shuffleManager);
- if (fetchFailureflag) {
+ if (isFetchFailed) {
reSubmitWholeStage = true;
msg =
String.format(
- "report shuffle write failure as maximum number(%d) of
shuffle write is occurred",
+ "Report shuffle write failure as maximum number(%d) of
shuffle write is occurred.",
shuffleManager.getMaxFetchFailures());
+ if (!shuffleServerWriterFailureRecord.isClearedMapTrackerBlock()) {
+ try {
+ // Clear the metadata of the completed task, otherwise some of
the stage's data will
+ // be lost.
+ shuffleManager.unregisterAllMapOutput(shuffleId);
Review Comment:
Why the `unregisterAllMapOutput` is executed in the report phase rather than
in the `getPartitionToShufflerServerWithStageRetry` ?
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/BlockIdSelfManagedShuffleWriteClient.java:
##########
@@ -77,6 +77,31 @@ public void reportShuffleResult(
managerClientSupplier.get().reportShuffleResult(request);
}
+ @Override
+ public void reportShuffleResult(
+ Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds,
+ String appId,
+ int shuffleId,
+ long taskAttemptId,
+ int bitmapNum,
+ Set<ShuffleServerInfo> reportFailureServers,
+ boolean enableWriteFailureRetry) {
+ Map<Integer, List<Long>> partitionToBlockIds = new HashMap<>();
+ for (Map<Integer, Set<Long>> k : serverToPartitionToBlockIds.values()) {
+ for (Map.Entry<Integer, Set<Long>> entry : k.entrySet()) {
+ int partitionId = entry.getKey();
+ partitionToBlockIds
+ .computeIfAbsent(partitionId, x -> new ArrayList<>())
+ .addAll(entry.getValue());
+ }
+ }
+
+ RssReportShuffleResultRequest request =
+ new RssReportShuffleResultRequest(
+ appId, shuffleId, taskAttemptId, partitionToBlockIds, bitmapNum);
+ managerClientSupplier.get().reportShuffleResult(request);
Review Comment:
Maybe this could be supported in the another PR, if you dont want to do it,
I'm willing to submit PR
##########
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java:
##########
@@ -353,7 +353,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
shuffleIdToPartitionNum.putIfAbsent(shuffleId,
dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId,
dependency.rdd().partitions().length);
- if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
+ if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
Review Comment:
Why the scope of stage retry reduced to the write failure ?
##########
client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java:
##########
@@ -75,43 +79,62 @@ public void reportShuffleWriteFailure(
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
- Map<String, AtomicInteger> shuffleServerInfoIntegerMap =
JavaUtils.newConcurrentMap();
+ Map<String, AtomicInteger> initServerFailures =
JavaUtils.newConcurrentMap();
List<ShuffleServerInfo> shuffleServerInfos =
ShuffleServerInfo.fromProto(shuffleServerIdsList);
shuffleServerInfos.forEach(
- shuffleServerInfo -> {
- shuffleServerInfoIntegerMap.put(shuffleServerInfo.getId(), new
AtomicInteger(0));
- });
- ShuffleServerFailureRecord shuffleServerFailureRecord =
+ shuffleServerInfo ->
+ initServerFailures.computeIfAbsent(
+ shuffleServerInfo.getId(), key -> new AtomicInteger(0)));
+ ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
shuffleWriteStatus.computeIfAbsent(
shuffleId,
- key ->
- new ShuffleServerFailureRecord(shuffleServerInfoIntegerMap,
stageAttemptNumber));
+ key -> new ShuffleServerWriterFailureRecord(stageAttemptNumber,
initServerFailures));
boolean resetflag =
-
shuffleServerFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
+
shuffleServerWriterFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
if (resetflag) {
msg =
String.format(
- "got an old stage(%d vs %d) shuffle write failure report,
which should be impossible.",
- shuffleServerFailureRecord.getStageAttempt(),
stageAttemptNumber);
+ "got an old stage(%d_%d) shuffle write failure report, which
should be impossible.",
+ stageAttemptId, stageAttemptNumber);
LOG.warn(msg);
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else {
code = RssProtos.StatusCode.SUCCESS;
// update the stage shuffleServer write failed count
- boolean fetchFailureflag =
- shuffleServerFailureRecord.incPartitionWriteFailure(
+ boolean isFetchFailed =
Review Comment:
Is this only valid for shuffle write failure, not including shuffle reading
failure?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]