RexXiong commented on code in PR #2373:
URL:
https://github.com/apache/incubator-celeborn/pull/2373#discussion_r1529991243
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1393,7 +1414,13 @@ public void onSuccess(ByteBuffer response) {
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
-
+ if (dataPushFailureTrackingEnabled) {
Review Comment:
There is no need for HARD_SPLIT to do this. as worker never write the batch
when HARD_SPLIT. cc @waitinfuture
##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -244,6 +264,18 @@ class ReducePartitionCommitHandler(
val attempts = shuffleMapperAttempts.get(shuffleId)
if (attempts(mapId) < 0) {
attempts(mapId) = attemptId
+ if (null != pushFailedBatches && !pushFailedBatches.isEmpty) {
+ val pushFailedBatchesMap = shufflePushFailedBatches.computeIfAbsent(
+ shuffleId,
+ newShuffleId2PushFailedBatchMapFunc)
+ pushFailedBatches.forEach((k, v) => {
+ val partitionPushFailedBatches =
pushFailedBatchesMap.computeIfAbsent(
+ k,
+ uniqueId2PushFailedBatchMapFunc)
+ partitionPushFailedBatches.addAll(v)
+ })
+ pushFailedBatchesMap.get(pushFailedBatches)
Review Comment:
Seems `pushFailedBatchesMap.get(pushFailedBatches)` is useless.
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4671,4 +4671,13 @@ object CelebornConf extends Logging {
.version("0.5.0")
.intConf
.createWithDefault(10000)
+
+ val CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED: ConfigEntry[Boolean] =
Review Comment:
May be we can use another configuration name for enable optimize skew join.
The `CLIENT_DATA_PUSH_FAILURE_TRACKING_ENABLED` doesn't feel so
straightforward.
##########
client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java:
##########
@@ -615,6 +663,17 @@ private boolean fillBuffer() throws IOException {
// de-duplicate
if (attemptId == attempts[mapId]) {
+ if (splitSkewPartitionWithoutMapRange) {
Review Comment:
1. We can reuse one PushFailedBatch object and update inner fields to
improve memory-efficient.
2. Better to check failedBatches is empty or not first. May be we never need
to check failed batches.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]