Copilot commented on code in PR #3586:
URL: https://github.com/apache/celeborn/pull/3586#discussion_r2731674192


##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -314,7 +317,8 @@ class ReducePartitionCommitHandler(
       numPartitions: Int,
       crc32PerPartition: Array[Int],
       bytesWrittenPerPartition: Array[Long]): (Boolean, Boolean) = {
-    val (mapperAttemptFinishedSuccess, allMapperFinished) = 
shuffleMapperAttempts.synchronized {
+    val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, _ => new 
Object())

Review Comment:
   `shuffleIdLocks.computeIfAbsent(shuffleId, _ => new Object())` relies on 
Scala SAM-conversion of a Scala lambda to `java.util.function.Function`, which 
is not available in Scala 2.11 without the `FunctionConverter` implicits; since 
this project still cross-builds for 2.11 (see `CelebornBuild.scala`), this will 
fail to compile for that Scala version. To keep 2.11 compatibility, either 
import `org.apache.celeborn.common.util.FunctionConverter._` in this file (as 
done in `CommitManager.scala`) or replace the lambda with an explicit 
`java.util.function.Function[Int, Object]` implementation.
   ```suggestion
       val shuffleLock = shuffleIdLocks.computeIfAbsent(
         shuffleId,
         new function.Function[Int, Object]() {
           override def apply(id: Int): Object = new Object()
         })
   ```



##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -428,7 +432,8 @@ class ReducePartitionCommitHandler(
   }
 
   private def initMapperAttempts(shuffleId: Int, numMappers: Int, 
numPartitions: Int): Unit = {
-    shuffleMapperAttempts.synchronized {
+    val shuffleLock = shuffleIdLocks.computeIfAbsent(shuffleId, _ => new 
Object())

Review Comment:
   Same issue as in `finishMapperAttempt`: using the Scala lambda `_ => new 
Object()` as the second argument to `shuffleIdLocks.computeIfAbsent` depends on 
SAM-conversion that is not available under Scala 2.11 without 
`FunctionConverter` implicits, causing compilation failures for that 
cross-build. Please either add `import 
org.apache.celeborn.common.util.FunctionConverter._` to this file or construct 
an explicit `java.util.function.Function[Int, Object]` instance here.
   ```suggestion
       val shuffleLock = shuffleIdLocks.computeIfAbsent(
         shuffleId,
         new java.util.function.Function[Int, Object] {
           override def apply(key: Int): Object = new Object()
         })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to