This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 2fbc1a06f [CELEBORN-901] Add support for Scala 2.13
2fbc1a06f is described below

commit 2fbc1a06f3448bf3bb643fdafb766ac948a3dcc3
Author: Fu Chen <[email protected]>
AuthorDate: Tue Aug 22 20:35:05 2023 +0800

    [CELEBORN-901] Add support for Scala 2.13
    
    This PR introduces support for Scala 2.13
    
    1. Resolved a compilation issue specific to Scala 2.13
    2. Successfully validated compatibility with Scala 2.13 through the 
comprehensive suite of unit tests
    3. Enabled SBT CI for Scala 2.13 within the "server" module and the "spark 
client"
    
    For more detailed guidance on migrating to Scala 2.13, please consult the 
following resources:
    
    1. https://www.scala-lang.org/blog/2017/02/28/collections-rework.html
    2. https://docs.scala-lang.org/overviews/core/collections-migration-213.html
    
    As title
    
    No
    
    Pass GA
    
    Closes #1825 from cfmcgrady/scala213.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit d6c4334a11ee3ba16ffe1dcbfbb31ed43ef8bd6c)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../main/scala/org/apache/celeborn/client/CommitManager.scala  |  2 +-
 .../scala/org/apache/celeborn/client/LifecycleManager.scala    |  4 ++--
 .../org/apache/celeborn/client/commit/CommitHandler.scala      |  2 +-
 .../scala/org/apache/celeborn/common/meta/DeviceInfo.scala     |  2 +-
 .../org/apache/celeborn/common/metrics/MetricsSystem.scala     |  2 +-
 .../celeborn/common/protocol/message/ControlMessages.scala     |  3 ++-
 .../scala/org/apache/celeborn/common/util/ThreadUtils.scala    | 10 +---------
 .../scala/org/apache/celeborn/common/util/UtilsSuite.scala     |  2 +-
 .../celeborn/service/deploy/worker/storage/DeviceMonitor.scala |  3 +--
 9 files changed, 11 insertions(+), 19 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index f211f9ba6..0baacd10e 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -117,7 +117,7 @@ class CommitManager(appUniqueId: String, val conf: 
CelebornConf, lifecycleManage
                         Math.min(workerToRequests.size, 
conf.clientRpcMaxParallelism)
                       try {
                         ThreadUtils.parmap(
-                          workerToRequests.to,
+                          workerToRequests,
                           "CommitFiles",
                           parallelism) {
                           case (worker, requests) =>
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 6e052a0b9..85748f4f8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -424,7 +424,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
     // Second, for each worker, try to initialize the endpoint.
     val parallelism = Math.min(Math.max(1, slots.size()), 
conf.clientRpcMaxParallelism)
-    ThreadUtils.parmap(slots.asScala.to, "InitWorkerRef", parallelism) { case 
(workerInfo, _) =>
+    ThreadUtils.parmap(slots.asScala, "InitWorkerRef", parallelism) { case 
(workerInfo, _) =>
       try {
         workerInfo.endpoint =
           rpcEnv.setupEndpointRef(RpcAddress.apply(workerInfo.host, 
workerInfo.rpcPort), WORKER_EP)
@@ -648,7 +648,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty 
|| !p._2._2.isEmpty)
     val parallelism =
       Math.min(Math.max(1, workerPartitionLocations.size), 
conf.clientRpcMaxParallelism)
-    ThreadUtils.parmap(workerPartitionLocations.to, "ReserveSlot", 
parallelism) {
+    ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
       case (workerInfo, (primaryLocations, replicaLocations)) =>
         val res = requestWorkerReserveSlots(
           workerInfo.endpoint,
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 6a011e670..5f17e4ef0 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -197,7 +197,7 @@ abstract class CommitHandler(
     val workerPartitionLocations = 
allocatedWorkers.asScala.filter(!_._2.isEmpty)
     val parallelism = Math.min(workerPartitionLocations.size, 
conf.clientRpcMaxParallelism)
     ThreadUtils.parmap(
-      workerPartitionLocations.to,
+      workerPartitionLocations,
       "CommitFiles",
       parallelism) { case (worker, partitionLocationInfo) =>
       val primaryParts =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index ab11a7952..6bd49a1a0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -131,7 +131,7 @@ class DiskInfo(
     val (emptyShuffles, nonEmptyShuffles) = 
shuffleAllocations.asScala.partition(_._2 == 0)
     s"DiskInfo(maxSlots: $maxSlots," +
       s" committed shuffles ${emptyShuffles.size}" +
-      s" shuffleAllocations: $nonEmptyShuffles," +
+      s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
       s" mountPoint: $mountPoint," +
       s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
       s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 0d20cf184..fdc1fef0b 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -81,7 +81,7 @@ class MetricsSystem(
   }
 
   def getSourcesByName(sourceName: String): Seq[Source] =
-    sources.filter(_.sourceName == sourceName)
+    sources.filter(_.sourceName == sourceName).toSeq
 
   def registerSource(source: Source) {
     sources += source
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index c358e17e2..0cbfe8cfc 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -828,7 +828,8 @@ object ControlMessages extends Logging {
         val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]()
         val userResourceConsumption = 
PbSerDeUtils.fromPbUserResourceConsumption(
           pbHeartbeatFromWorker.getUserResourceConsumptionMap)
-        val pbDisks = 
pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo)
+        val pbDisks =
+          
pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo)
         if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) {
           
estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap)
         }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index ca5066c34..417e3650b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -20,8 +20,6 @@ package org.apache.celeborn.common.util
 import java.util.concurrent._
 import java.util.concurrent.{ForkJoinPool => SForkJoinPool, 
ForkJoinWorkerThread => SForkJoinWorkerThread}
 
-import scala.collection.TraversableLike
-import scala.collection.generic.CanBuildFrom
 import scala.concurrent.{Awaitable, ExecutionContext, 
ExecutionContextExecutor, Future}
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.language.higherKinds
@@ -287,13 +285,7 @@ object ThreadUtils {
    * @return new collection in which each element was given from the input 
collection `in` by
    *         applying the lambda function `f`.
    */
-  def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]](
-      in: Col[I],
-      prefix: String,
-      maxThreads: Int)(f: I => O)(implicit
-      cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
-      cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
-  ): Col[O] = {
+  def parmap[I, O](in: Iterable[I], prefix: String, maxThreads: Int)(f: I => 
O): Iterable[O] = {
     val pool = newForkJoinPool(prefix, maxThreads)
     try {
       implicit val ec = ExecutionContext.fromExecutor(pool)
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index ff9157448..bce77cc68 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -162,7 +162,7 @@ class UtilsSuite extends CelebornFunSuite {
       GetReducerFileGroupResponse]
 
     assert(response.status == responseTrans.status)
-    assert(response.attempts.deep == responseTrans.attempts.deep)
+    assert(util.Arrays.equals(response.attempts, responseTrans.attempts))
     val set =
       (response.fileGroup.values().toArray diff 
responseTrans.fileGroup.values().toArray).toSet
     assert(set.size == 0)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index aeea6c232..fbd1dc1e9 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
-import java.io._
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, 
IOException}
 import java.nio.charset.Charset
 import java.util
 import java.util.concurrent.TimeUnit
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 
 import org.apache.commons.io.FileUtils
-import org.slf4j.LoggerFactory
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging

Reply via email to