This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new d6c4334a1 [CELEBORN-901] Add support for Scala 2.13
d6c4334a1 is described below

commit d6c4334a11ee3ba16ffe1dcbfbb31ed43ef8bd6c
Author: Fu Chen <[email protected]>
AuthorDate: Tue Aug 22 20:35:05 2023 +0800

    [CELEBORN-901] Add support for Scala 2.13
    
    ### What changes were proposed in this pull request?
    
    This PR introduces support for Scala 2.13
    
    1. Resolved a compilation issue specific to Scala 2.13
    2. Successfully validated compatibility with Scala 2.13 through the 
comprehensive suite of unit tests
    3. Enabled SBT CI for Scala 2.13 within the "server" module and the "spark 
client"
    
    For more detailed guidance on migrating to Scala 2.13, please consult the 
following resources:
    
    1. https://www.scala-lang.org/blog/2017/02/28/collections-rework.html
    2. https://docs.scala-lang.org/overviews/core/collections-migration-213.html
    
    ### Why are the changes needed?
    
    As title
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #1825 from cfmcgrady/scala213.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .github/workflows/sbt.yml                          | 95 +++++++++++++++++-----
 .../org/apache/celeborn/client/CommitManager.scala |  2 +-
 .../apache/celeborn/client/LifecycleManager.scala  |  4 +-
 .../celeborn/client/commit/CommitHandler.scala     |  2 +-
 .../apache/celeborn/common/meta/DeviceInfo.scala   |  2 +-
 .../celeborn/common/metrics/MetricsSystem.scala    |  2 +-
 .../common/protocol/message/ControlMessages.scala  |  3 +-
 .../apache/celeborn/common/util/ThreadUtils.scala  | 10 +--
 .../apache/celeborn/common/util/UtilsSuite.scala   |  2 +-
 .../deploy/worker/storage/DeviceMonitor.scala      |  3 +-
 10 files changed, 85 insertions(+), 40 deletions(-)

diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index 5967a48dd..c4ae29268 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -38,6 +38,9 @@ jobs:
           - 8
           - 11
           - 17
+        scala:
+          - '2.12.15'
+          - '2.13.5'
     steps:
     - uses: actions/checkout@v2
     - name: Setup JDK ${{ matrix.java }}
@@ -48,12 +51,12 @@ jobs:
         check-latest: false
     - name: Test Service with SBT
       run: |
-        build/sbt "clean; test"
+        build/sbt ++${{ matrix.scala }} "clean; test"
     - name: Upload test log
       if: failure()
       uses: actions/upload-artifact@v3
       with:
-          name: service-${{ matrix.java }}-unit-test-log
+          name: service-java-${{ matrix.java }}-scala-${{ matrix.scala 
}}-unit-test-log
           path: |
               **/target/test-reports/**
 
@@ -90,24 +93,74 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        java:
-          - 8
-          - 11
-          - 17
-        spark:
-          - '3.0'
-          - '3.1'
-          - '3.2'
-          - '3.3'
-          - '3.4'
-        exclude:
+        include:
+          # Spark 3.0
+          - spark: '3.0'
+            scala: '2.12.10'
+            java: 8
+          - spark: '3.0'
+            scala: '2.12.10'
+            java: 11
+          # Spark 3.1
+          - spark: '3.1'
+            scala: '2.12.10'
+            java: 8
+          - spark: '3.1'
+            scala: '2.12.10'
+            java: 11
+          # Spark supports scala 2.13 since 3.2.0
+          # Spark 3.2
+          - spark: '3.2'
+            scala: '2.12.15'
+            java: 8
+          - spark: '3.2'
+            scala: '2.12.15'
+            java: 11
+          - spark: '3.2'
+            scala: '2.13.5'
+            java: 8
+          - spark: '3.2'
+            scala: '2.13.5'
+            java: 11
           # SPARK-33772: Spark supports JDK 17 since 3.3.0
-          - java: 17
-            spark: '3.0'
-          - java: 17
-            spark: '3.1'
-          - java: 17
-            spark: '3.2'
+          # Spark 3.3
+          - spark: '3.3'
+            scala: '2.12.15'
+            java: 8
+          - spark: '3.3'
+            scala: '2.12.15'
+            java: 11
+          - spark: '3.3'
+            scala: '2.12.15'
+            java: 17
+          - spark: '3.3'
+            scala: '2.13.5'
+            java: 8
+          - spark: '3.3'
+            scala: '2.13.5'
+            java: 11
+          - spark: '3.3'
+            scala: '2.13.5'
+            java: 17
+          # Spark 3.4
+          - spark: '3.4'
+            scala: '2.12.17'
+            java: 8
+          - spark: '3.4'
+            scala: '2.12.17'
+            java: 11
+          - spark: '3.4'
+            scala: '2.12.17'
+            java: 17
+          - spark: '3.4'
+            scala: '2.13.5'
+            java: 8
+          - spark: '3.4'
+            scala: '2.13.5'
+            java: 11
+          - spark: '3.4'
+            scala: '2.13.5'
+            java: 17
     steps:
     - uses: actions/checkout@v2
     - name: Setup JDK ${{ matrix.java }}
@@ -118,12 +171,12 @@ jobs:
         check-latest: false
     - name: Test with SBT
       run: |
-        build/sbt -Pspark-${{ matrix.spark }} "clean; 
celeborn-spark-group/test"
+        build/sbt -Pspark-${{ matrix.spark }} ++${{ matrix.scala }} "clean; 
celeborn-spark-group/test"
     - name: Upload test log
       if: failure()
       uses: actions/upload-artifact@v3
       with:
-          name: spark-${{ matrix.spark }}-unit-test-log
+          name: spark-${{ matrix.spark }}-scala-${{ matrix.scala 
}}-unit-test-log
           path: |
               **/target/test-reports/**
 
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index f211f9ba6..0baacd10e 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -117,7 +117,7 @@ class CommitManager(appUniqueId: String, val conf: 
CelebornConf, lifecycleManage
                         Math.min(workerToRequests.size, 
conf.clientRpcMaxParallelism)
                       try {
                         ThreadUtils.parmap(
-                          workerToRequests.to,
+                          workerToRequests,
                           "CommitFiles",
                           parallelism) {
                           case (worker, requests) =>
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 6e052a0b9..85748f4f8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -424,7 +424,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
     // Second, for each worker, try to initialize the endpoint.
     val parallelism = Math.min(Math.max(1, slots.size()), 
conf.clientRpcMaxParallelism)
-    ThreadUtils.parmap(slots.asScala.to, "InitWorkerRef", parallelism) { case 
(workerInfo, _) =>
+    ThreadUtils.parmap(slots.asScala, "InitWorkerRef", parallelism) { case 
(workerInfo, _) =>
       try {
         workerInfo.endpoint =
           rpcEnv.setupEndpointRef(RpcAddress.apply(workerInfo.host, 
workerInfo.rpcPort), WORKER_EP)
@@ -648,7 +648,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty 
|| !p._2._2.isEmpty)
     val parallelism =
       Math.min(Math.max(1, workerPartitionLocations.size), 
conf.clientRpcMaxParallelism)
-    ThreadUtils.parmap(workerPartitionLocations.to, "ReserveSlot", 
parallelism) {
+    ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
       case (workerInfo, (primaryLocations, replicaLocations)) =>
         val res = requestWorkerReserveSlots(
           workerInfo.endpoint,
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 6a011e670..5f17e4ef0 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -197,7 +197,7 @@ abstract class CommitHandler(
     val workerPartitionLocations = 
allocatedWorkers.asScala.filter(!_._2.isEmpty)
     val parallelism = Math.min(workerPartitionLocations.size, 
conf.clientRpcMaxParallelism)
     ThreadUtils.parmap(
-      workerPartitionLocations.to,
+      workerPartitionLocations,
       "CommitFiles",
       parallelism) { case (worker, partitionLocationInfo) =>
       val primaryParts =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index ab11a7952..6bd49a1a0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -131,7 +131,7 @@ class DiskInfo(
     val (emptyShuffles, nonEmptyShuffles) = 
shuffleAllocations.asScala.partition(_._2 == 0)
     s"DiskInfo(maxSlots: $maxSlots," +
       s" committed shuffles ${emptyShuffles.size}" +
-      s" shuffleAllocations: $nonEmptyShuffles," +
+      s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
       s" mountPoint: $mountPoint," +
       s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
       s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 0d20cf184..fdc1fef0b 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -81,7 +81,7 @@ class MetricsSystem(
   }
 
   def getSourcesByName(sourceName: String): Seq[Source] =
-    sources.filter(_.sourceName == sourceName)
+    sources.filter(_.sourceName == sourceName).toSeq
 
   def registerSource(source: Source) {
     sources += source
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index d376832a3..89ef41a93 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -802,7 +802,8 @@ object ControlMessages extends Logging {
         val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]()
         val userResourceConsumption = 
PbSerDeUtils.fromPbUserResourceConsumption(
           pbHeartbeatFromWorker.getUserResourceConsumptionMap)
-        val pbDisks = 
pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo)
+        val pbDisks =
+          
pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo)
         if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) {
           
estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap)
         }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index ca5066c34..417e3650b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -20,8 +20,6 @@ package org.apache.celeborn.common.util
 import java.util.concurrent._
 import java.util.concurrent.{ForkJoinPool => SForkJoinPool, 
ForkJoinWorkerThread => SForkJoinWorkerThread}
 
-import scala.collection.TraversableLike
-import scala.collection.generic.CanBuildFrom
 import scala.concurrent.{Awaitable, ExecutionContext, 
ExecutionContextExecutor, Future}
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.language.higherKinds
@@ -287,13 +285,7 @@ object ThreadUtils {
    * @return new collection in which each element was given from the input 
collection `in` by
    *         applying the lambda function `f`.
    */
-  def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]](
-      in: Col[I],
-      prefix: String,
-      maxThreads: Int)(f: I => O)(implicit
-      cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
-      cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
-  ): Col[O] = {
+  def parmap[I, O](in: Iterable[I], prefix: String, maxThreads: Int)(f: I => 
O): Iterable[O] = {
     val pool = newForkJoinPool(prefix, maxThreads)
     try {
       implicit val ec = ExecutionContext.fromExecutor(pool)
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index ff9157448..bce77cc68 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -162,7 +162,7 @@ class UtilsSuite extends CelebornFunSuite {
       GetReducerFileGroupResponse]
 
     assert(response.status == responseTrans.status)
-    assert(response.attempts.deep == responseTrans.attempts.deep)
+    assert(util.Arrays.equals(response.attempts, responseTrans.attempts))
     val set =
       (response.fileGroup.values().toArray diff 
responseTrans.fileGroup.values().toArray).toSet
     assert(set.size == 0)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index aeea6c232..fbd1dc1e9 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
-import java.io._
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, 
IOException}
 import java.nio.charset.Charset
 import java.util
 import java.util.concurrent.TimeUnit
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 
 import org.apache.commons.io.FileUtils
-import org.slf4j.LoggerFactory
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging

Reply via email to