waitinfuture commented on code in PR #2609:
URL: https://github.com/apache/celeborn/pull/2609#discussion_r1699329004
##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -136,6 +136,11 @@ public static int celebornShuffleId(
}
}
+ public static int getMapAttemptNumber(TaskContext context) {
+ assert (context.stageAttemptNumber() < (1 << 15) &&
context.attemptNumber() < (1 << 16));
+ return (context.stageAttemptNumber() << 16) | context.attemptNumber();
+ }
+
Review Comment:
> Hi @mridulm , I have the same opinion with you. I also thought about using
`taskId`, in fact I think it's perhaps doable.
>
> The first thing is compatibility, if the protocol is only used in client
side then there is no harm because an application always has the same client
version during its lifecycle. Currently the server side does read `attemptId`
field, but only used for logging like
>
> ```
> // TODO just info log for ended attempt
> logWarning(s"Append data failed for task(shuffle $shuffleKey,
map $mapId, attempt" +
> s" $attemptId), caused by AlreadyClosedException,
endedAttempt $endedAttempt, error message: ${e.getMessage}")
> ```
>
> Another thing is for Flink (i.e. MapPartition), @RexXiong @FMX can help
check is it OK for MapPartition?
After further digging, I found `PartitionFilesSorter` reads `compressedSize`
by seeking to 12 bytes offset
```
int compressedSize = Platform.getInt(batchHeader, Platform.BYTE_ARRAY_OFFSET
+ 12);
```
which prohibits us from changing `attemptId` from `int` to `long`. So
considering backward compatibility, seems we can't use `taskI`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]