This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 47f66a87a [CELEBORN-678] ShuffleClientImpl::mapperEnded should not
consider attemptId
47f66a87a is described below
commit 47f66a87a1f6021839b7cde8420c63f1df6675e2
Author: onebox-li <[email protected]>
AuthorDate: Wed Jun 14 21:01:06 2023 +0800
[CELEBORN-678] ShuffleClientImpl::mapperEnded should not consider attemptId
### What changes were proposed in this pull request?
ShuffleClientImpl::mapperEnded should not consider attemptId, speculation
tasks will update attemptId.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test
Closes #1591 from onebox-li/fix-mapend.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../flink/readclient/FlinkShuffleClientImpl.java | 57 ++--------------------
.../apache/celeborn/client/ShuffleClientImpl.java | 42 ++++++++--------
2 files changed, 28 insertions(+), 71 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index ca1479637..5d2d1a081 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -206,20 +206,6 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
// mapKey
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
- // return if shuffle stage already ended
- if (mapperEnded(shuffleId, mapId, attemptId)) {
- logger.info(
- "Push data byteBuf to location {} ignored because mapper already
ended for shuffle {} map {} attempt {}.",
- location.hostAndPushPort(),
- shuffleId,
- mapId,
- attemptId);
- PushState pushState = pushStates.get(mapKey);
- if (pushState != null) {
- pushState.cleanup();
- }
- return 0;
- }
PushState pushState = getPushState(mapKey);
@@ -258,14 +244,6 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
@Override
public void onSuccess(ByteBuffer response) {
pushState.removeBatch(nextBatchId, location.hostAndPushPort());
- if (response.remaining() > 0) {
- byte reason = response.get();
- if (reason == StatusCode.STAGE_ENDED.getValue()) {
- mapperEndMap
- .computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet())
- .add(mapKey);
- }
- }
logger.debug(
"Push data byteBuf to {} success for shuffle {} map {}
attemptId {} batch {}.",
location.hostAndPushPort(),
@@ -281,21 +259,11 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
if (pushState.exception.get() != null) {
return;
}
- if (!mapperEnded(shuffleId, mapId, attemptId)) {
- String errorMsg =
- String.format(
- "Push data byteBuf to %s failed for shuffle %d map %d
attempt %d batch %d.",
- location.hostAndPushPort(), shuffleId, mapId, attemptId,
nextBatchId);
- pushState.exception.compareAndSet(null, new
CelebornIOException(errorMsg, e));
- } else {
- logger.warn(
- "Push data to {} failed but mapper already ended for shuffle
{} map {} attempt {} batch {}.",
- location.hostAndPushPort(),
- shuffleId,
- mapId,
- attemptId,
- nextBatchId);
- }
+ String errorMsg =
+ String.format(
+ "Push data byteBuf to %s failed for shuffle %d map %d
attempt %d batch %d.",
+ location.hostAndPushPort(), shuffleId, mapId, attemptId,
nextBatchId);
+ pushState.exception.compareAndSet(null, new
CelebornIOException(errorMsg, e));
}
};
// do push data
@@ -431,11 +399,6 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
StatusCode respStatus = Utils.toStatusCode(response.getStatus());
if (StatusCode.SUCCESS.equals(respStatus)) {
return
Optional.of(PbSerDeUtils.fromPbPartitionLocation(response.getLocation()));
- } else if (StatusCode.MAP_ENDED.equals(respStatus)) {
- mapperEndMap
- .computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet())
- .add(mapKey);
- return Optional.empty();
} else {
// throw exception
logger.error(
@@ -492,16 +455,6 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
try {
// mapKey
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
- // return if shuffle stage already ended
- if (mapperEnded(shuffleId, mapId, attemptId)) {
- logger.debug(
- "Send message to {} ignored because mapper already ended for
shuffle {} map {} attempt {}.",
- location.hostAndPushPort(),
- shuffleId,
- mapId,
- attemptId);
- return null;
- }
pushState = getPushState(mapKey);
// add inFlight requests
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index a2a381dea..799ba6a75 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -89,9 +89,13 @@ public class ShuffleClientImpl extends ShuffleClient {
private final Map<Integer, ConcurrentHashMap<Integer, PartitionLocation>>
reducePartitionMap =
JavaUtils.newConcurrentHashMap();
- protected final ConcurrentHashMap<Integer, Set<String>> mapperEndMap =
+ // key: shuffleId, value: Set(mapId)
+ protected final ConcurrentHashMap<Integer, Set<Integer>> mapperEndMap =
JavaUtils.newConcurrentHashMap();
+ // shuffleIds which have finished all map tasks
+ protected final Set<Integer> stageEndShuffleSet =
ConcurrentHashMap.newKeySet();
+
// key: shuffleId-mapId-attemptId
protected final Map<String, PushState> pushStates =
JavaUtils.newConcurrentHashMap();
@@ -212,7 +216,7 @@ public class ShuffleClientImpl extends ShuffleClient {
applicationId, shuffleId, mapId, attemptId, partitionId,
loc.getEpoch(), loc, cause)) {
wrappedCallback.onFailure(
new CelebornIOException(cause + " then revive but " +
StatusCode.REVIVE_FAILED));
- } else if (mapperEnded(shuffleId, mapId, attemptId)) {
+ } else if (mapperEnded(shuffleId, mapId)) {
logger.debug(
"Revive for push data success, but the mapper already ended for
shuffle {} map {} attempt {} partition {} batch {} location {}.",
shuffleId,
@@ -304,7 +308,7 @@ public class ShuffleClientImpl extends ShuffleClient {
new CelebornIOException(cause + " then revive but " +
StatusCode.REVIVE_FAILED)));
return;
}
- } else if (mapperEnded(shuffleId, mapId, attemptId)) {
+ } else if (mapperEnded(shuffleId, mapId)) {
logger.debug(
"Revive for push merged data success, but the mapper already ended
for shuffle {} map {} attempt {} partition {} batch {}.",
shuffleId,
@@ -557,8 +561,7 @@ public class ShuffleClientImpl extends ShuffleClient {
epoch);
return true;
}
- String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
- if (mapperEnded(shuffleId, mapId, attemptId)) {
+ if (mapperEnded(shuffleId, mapId)) {
logger.debug(
"Revive success, but the mapper ended for shuffle {} map {} attempt
{} partition {}, just return true(Assume revive successfully).",
shuffleId,
@@ -594,7 +597,7 @@ public class ShuffleClientImpl extends ShuffleClient {
mapId,
attemptId,
partitionId);
- mapperEndMap.computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet()).add(mapKey);
+ mapperEndMap.computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet()).add(mapId);
return true;
} else {
return false;
@@ -629,7 +632,7 @@ public class ShuffleClientImpl extends ShuffleClient {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
final String shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId);
// return if shuffle stage already ended
- if (mapperEnded(shuffleId, mapId, attemptId)) {
+ if (mapperEnded(shuffleId, mapId)) {
logger.debug(
"Push or merge data ignored because mapper already ended for shuffle
{} map {} attempt {} partition {}.",
shuffleId,
@@ -668,7 +671,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
}
- if (mapperEnded(shuffleId, mapId, attemptId)) {
+ if (mapperEnded(shuffleId, mapId)) {
logger.debug(
"Push or merge data ignored because mapper already ended for shuffle
{} map {} attempt {} partition {}.",
shuffleId,
@@ -728,9 +731,7 @@ public class ShuffleClientImpl extends ShuffleClient {
// TODO Need to adjust maxReqsInFlight if server response is
congested, see
// CELEBORN-62
if (response.remaining() > 0 && response.get() ==
StatusCode.STAGE_ENDED.getValue()) {
- mapperEndMap
- .computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet())
- .add(mapKey);
+ stageEndShuffleSet.add(shuffleId);
}
logger.debug(
"Push data to {} success for shuffle {} map {} attempt {}
partition {} batch {}.",
@@ -857,7 +858,7 @@ public class ShuffleClientImpl extends ShuffleClient {
remainReviveTimes,
e);
// async retry push data
- if (!mapperEnded(shuffleId, mapId, attemptId)) {
+ if (!mapperEnded(shuffleId, mapId)) {
// For blacklisted partition location, Celeborn should not use
retry quota.
if (!pushStatusIsBlacklisted(cause)) {
remainReviveTimes = remainReviveTimes - 1;
@@ -1115,9 +1116,7 @@ public class ShuffleClientImpl extends ShuffleClient {
pushState.removeBatch(groupedBatchId, hostPort);
// TODO Need to adjust maxReqsInFlight if server response is
congested, see CELEBORN-62
if (response.remaining() > 0 && response.get() ==
StatusCode.STAGE_ENDED.getValue()) {
- mapperEndMap
- .computeIfAbsent(shuffleId, (id) ->
ConcurrentHashMap.newKeySet())
- .add(Utils.makeMapKey(shuffleId, mapId, attemptId));
+ stageEndShuffleSet.add(shuffleId);
}
}
@@ -1242,7 +1241,7 @@ public class ShuffleClientImpl extends ShuffleClient {
Arrays.toString(batchIds),
remainReviveTimes,
e);
- if (!mapperEnded(shuffleId, mapId, attemptId)) {
+ if (!mapperEnded(shuffleId, mapId)) {
int tmpRemainReviveTimes = remainReviveTimes;
// For blacklisted partition location, Celeborn should not use
retry quota.
if (!pushStatusIsBlacklisted(cause)) {
@@ -1379,6 +1378,7 @@ public class ShuffleClientImpl extends ShuffleClient {
reducePartitionMap.remove(shuffleId);
reduceFileGroupsMap.remove(shuffleId);
mapperEndMap.remove(shuffleId);
+ stageEndShuffleSet.remove(shuffleId);
splitting.remove(shuffleId);
logger.info("Unregistered shuffle {}.", shuffleId);
@@ -1525,9 +1525,13 @@ public class ShuffleClientImpl extends ShuffleClient {
driverRssMetaService = endpointRef;
}
- protected boolean mapperEnded(int shuffleId, int mapId, int attemptId) {
- return mapperEndMap.containsKey(shuffleId)
- && mapperEndMap.get(shuffleId).contains(Utils.makeMapKey(shuffleId,
mapId, attemptId));
+ protected boolean mapperEnded(int shuffleId, int mapId) {
+ return (mapperEndMap.containsKey(shuffleId) &&
mapperEndMap.get(shuffleId).contains(mapId))
+ || stageEnded(shuffleId);
+ }
+
+ protected boolean stageEnded(int shuffleId) {
+ return stageEndShuffleSet.contains(shuffleId);
}
private boolean pushStatusIsBlacklisted(StatusCode cause) {