Copilot commented on code in PR #3586:
URL: https://github.com/apache/celeborn/pull/3586#discussion_r2731674192
##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -314,7 +317,8 @@ class ReducePartitionCommitHandler(
numPartitions: Int,
crc32PerPartition: Array[Int],
bytesWrittenPerPartition: Array[Long]): (Boolean, Boolean) = {
- val (mapperAttemptFinishedSuccess, allMapperFinished) =
shuffleMapperAttempts.synchronized {
+ val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, _ => new
Object())
Review Comment:
`shuffleIdLocks.computeIfAbsent(shuffleId, _ => new Object())` relies on
Scala SAM-conversion of a Scala lambda to `java.util.function.Function`, which
is not available in Scala 2.11 without the `FunctionConverter` implicits; since
this project still cross-builds for 2.11 (see `CelebornBuild.scala`), this will
fail to compile for that Scala version. To keep 2.11 compatibility, either
import `org.apache.celeborn.common.util.FunctionConverter._` in this file (as
done in `CommitManager.scala`) or replace the lambda with an explicit
`java.util.function.Function[Int, Object]` implementation.
```suggestion
val shuffleLock = shuffleIdLocks.computeIfAbsent(
shuffleId,
new function.Function[Int, Object]() {
override def apply(id: Int): Object = new Object()
})
```
##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -428,7 +432,8 @@ class ReducePartitionCommitHandler(
}
private def initMapperAttempts(shuffleId: Int, numMappers: Int,
numPartitions: Int): Unit = {
- shuffleMapperAttempts.synchronized {
+ val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, _ => new
Object())
Review Comment:
Same issue as in `finishMapperAttempt`: using the Scala lambda `_ => new
Object()` as the second argument to `shuffleIdLocks.computeIfAbsent` depends on
SAM-conversion that is not available under Scala 2.11 without
`FunctionConverter` implicits, causing compilation failures for that
cross-build. Please either add `import
org.apache.celeborn.common.util.FunctionConverter._` to this file or construct
an explicit `java.util.function.Function[Int, Object]` instance here.
```suggestion
val shuffleLock = shuffleIdLocks.computeIfAbsent(
shuffleId,
new java.util.function.Function[Int, Object] {
override def apply(key: Int): Object = new Object()
})
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]