This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d6c4334a1 [CELEBORN-901] Add support for Scala 2.13
d6c4334a1 is described below
commit d6c4334a11ee3ba16ffe1dcbfbb31ed43ef8bd6c
Author: Fu Chen <[email protected]>
AuthorDate: Tue Aug 22 20:35:05 2023 +0800
[CELEBORN-901] Add support for Scala 2.13
### What changes were proposed in this pull request?
This PR introduces support for Scala 2.13
1. Resolved a compilation issue specific to Scala 2.13
2. Successfully validated compatibility with Scala 2.13 through the
comprehensive suite of unit tests
3. Enabled SBT CI for Scala 2.13 within the "server" module and the "spark
client"
For more detailed guidance on migrating to Scala 2.13, please consult the
following resources:
1. https://www.scala-lang.org/blog/2017/02/28/collections-rework.html
2. https://docs.scala-lang.org/overviews/core/collections-migration-213.html
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #1825 from cfmcgrady/scala213.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.github/workflows/sbt.yml | 95 +++++++++++++++++-----
.../org/apache/celeborn/client/CommitManager.scala | 2 +-
.../apache/celeborn/client/LifecycleManager.scala | 4 +-
.../celeborn/client/commit/CommitHandler.scala | 2 +-
.../apache/celeborn/common/meta/DeviceInfo.scala | 2 +-
.../celeborn/common/metrics/MetricsSystem.scala | 2 +-
.../common/protocol/message/ControlMessages.scala | 3 +-
.../apache/celeborn/common/util/ThreadUtils.scala | 10 +--
.../apache/celeborn/common/util/UtilsSuite.scala | 2 +-
.../deploy/worker/storage/DeviceMonitor.scala | 3 +-
10 files changed, 85 insertions(+), 40 deletions(-)
diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index 5967a48dd..c4ae29268 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -38,6 +38,9 @@ jobs:
- 8
- 11
- 17
+ scala:
+ - '2.12.15'
+ - '2.13.5'
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
@@ -48,12 +51,12 @@ jobs:
check-latest: false
- name: Test Service with SBT
run: |
- build/sbt "clean; test"
+ build/sbt ++${{ matrix.scala }} "clean; test"
- name: Upload test log
if: failure()
uses: actions/upload-artifact@v3
with:
- name: service-${{ matrix.java }}-unit-test-log
+ name: service-java-${{ matrix.java }}-scala-${{ matrix.scala
}}-unit-test-log
path: |
**/target/test-reports/**
@@ -90,24 +93,74 @@ jobs:
strategy:
fail-fast: false
matrix:
- java:
- - 8
- - 11
- - 17
- spark:
- - '3.0'
- - '3.1'
- - '3.2'
- - '3.3'
- - '3.4'
- exclude:
+ include:
+ # Spark 3.0
+ - spark: '3.0'
+ scala: '2.12.10'
+ java: 8
+ - spark: '3.0'
+ scala: '2.12.10'
+ java: 11
+ # Spark 3.1
+ - spark: '3.1'
+ scala: '2.12.10'
+ java: 8
+ - spark: '3.1'
+ scala: '2.12.10'
+ java: 11
+ # Spark supports scala 2.13 since 3.2.0
+ # Spark 3.2
+ - spark: '3.2'
+ scala: '2.12.15'
+ java: 8
+ - spark: '3.2'
+ scala: '2.12.15'
+ java: 11
+ - spark: '3.2'
+ scala: '2.13.5'
+ java: 8
+ - spark: '3.2'
+ scala: '2.13.5'
+ java: 11
# SPARK-33772: Spark supports JDK 17 since 3.3.0
- - java: 17
- spark: '3.0'
- - java: 17
- spark: '3.1'
- - java: 17
- spark: '3.2'
+ # Spark 3.3
+ - spark: '3.3'
+ scala: '2.12.15'
+ java: 8
+ - spark: '3.3'
+ scala: '2.12.15'
+ java: 11
+ - spark: '3.3'
+ scala: '2.12.15'
+ java: 17
+ - spark: '3.3'
+ scala: '2.13.5'
+ java: 8
+ - spark: '3.3'
+ scala: '2.13.5'
+ java: 11
+ - spark: '3.3'
+ scala: '2.13.5'
+ java: 17
+ # Spark 3.4
+ - spark: '3.4'
+ scala: '2.12.17'
+ java: 8
+ - spark: '3.4'
+ scala: '2.12.17'
+ java: 11
+ - spark: '3.4'
+ scala: '2.12.17'
+ java: 17
+ - spark: '3.4'
+ scala: '2.13.5'
+ java: 8
+ - spark: '3.4'
+ scala: '2.13.5'
+ java: 11
+ - spark: '3.4'
+ scala: '2.13.5'
+ java: 17
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
@@ -118,12 +171,12 @@ jobs:
check-latest: false
- name: Test with SBT
run: |
- build/sbt -Pspark-${{ matrix.spark }} "clean;
celeborn-spark-group/test"
+ build/sbt -Pspark-${{ matrix.spark }} ++${{ matrix.scala }} "clean;
celeborn-spark-group/test"
- name: Upload test log
if: failure()
uses: actions/upload-artifact@v3
with:
- name: spark-${{ matrix.spark }}-unit-test-log
+ name: spark-${{ matrix.spark }}-scala-${{ matrix.scala
}}-unit-test-log
path: |
**/target/test-reports/**
diff --git
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index f211f9ba6..0baacd10e 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -117,7 +117,7 @@ class CommitManager(appUniqueId: String, val conf:
CelebornConf, lifecycleManage
Math.min(workerToRequests.size,
conf.clientRpcMaxParallelism)
try {
ThreadUtils.parmap(
- workerToRequests.to,
+ workerToRequests,
"CommitFiles",
parallelism) {
case (worker, requests) =>
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 6e052a0b9..85748f4f8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -424,7 +424,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
// Second, for each worker, try to initialize the endpoint.
val parallelism = Math.min(Math.max(1, slots.size()),
conf.clientRpcMaxParallelism)
- ThreadUtils.parmap(slots.asScala.to, "InitWorkerRef", parallelism) { case
(workerInfo, _) =>
+ ThreadUtils.parmap(slots.asScala, "InitWorkerRef", parallelism) { case
(workerInfo, _) =>
try {
workerInfo.endpoint =
rpcEnv.setupEndpointRef(RpcAddress.apply(workerInfo.host,
workerInfo.rpcPort), WORKER_EP)
@@ -648,7 +648,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
val workerPartitionLocations = slots.asScala.filter(p => !p._2._1.isEmpty
|| !p._2._2.isEmpty)
val parallelism =
Math.min(Math.max(1, workerPartitionLocations.size),
conf.clientRpcMaxParallelism)
- ThreadUtils.parmap(workerPartitionLocations.to, "ReserveSlot",
parallelism) {
+ ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
case (workerInfo, (primaryLocations, replicaLocations)) =>
val res = requestWorkerReserveSlots(
workerInfo.endpoint,
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 6a011e670..5f17e4ef0 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -197,7 +197,7 @@ abstract class CommitHandler(
val workerPartitionLocations =
allocatedWorkers.asScala.filter(!_._2.isEmpty)
val parallelism = Math.min(workerPartitionLocations.size,
conf.clientRpcMaxParallelism)
ThreadUtils.parmap(
- workerPartitionLocations.to,
+ workerPartitionLocations,
"CommitFiles",
parallelism) { case (worker, partitionLocationInfo) =>
val primaryParts =
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index ab11a7952..6bd49a1a0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -131,7 +131,7 @@ class DiskInfo(
val (emptyShuffles, nonEmptyShuffles) =
shuffleAllocations.asScala.partition(_._2 == 0)
s"DiskInfo(maxSlots: $maxSlots," +
s" committed shuffles ${emptyShuffles.size}" +
- s" shuffleAllocations: $nonEmptyShuffles," +
+ s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
s" mountPoint: $mountPoint," +
s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 0d20cf184..fdc1fef0b 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -81,7 +81,7 @@ class MetricsSystem(
}
def getSourcesByName(sourceName: String): Seq[Source] =
- sources.filter(_.sourceName == sourceName)
+ sources.filter(_.sourceName == sourceName).toSeq
def registerSource(source: Source) {
sources += source
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index d376832a3..89ef41a93 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -802,7 +802,8 @@ object ControlMessages extends Logging {
val estimatedAppDiskUsage = new util.HashMap[String, java.lang.Long]()
val userResourceConsumption =
PbSerDeUtils.fromPbUserResourceConsumption(
pbHeartbeatFromWorker.getUserResourceConsumptionMap)
- val pbDisks =
pbHeartbeatFromWorker.getDisksList.asScala.map(PbSerDeUtils.fromPbDiskInfo)
+ val pbDisks =
+
pbHeartbeatFromWorker.getDisksList.asScala.toSeq.map(PbSerDeUtils.fromPbDiskInfo)
if (!pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap.isEmpty) {
estimatedAppDiskUsage.putAll(pbHeartbeatFromWorker.getEstimatedAppDiskUsageMap)
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index ca5066c34..417e3650b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -20,8 +20,6 @@ package org.apache.celeborn.common.util
import java.util.concurrent._
import java.util.concurrent.{ForkJoinPool => SForkJoinPool,
ForkJoinWorkerThread => SForkJoinWorkerThread}
-import scala.collection.TraversableLike
-import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Awaitable, ExecutionContext,
ExecutionContextExecutor, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.language.higherKinds
@@ -287,13 +285,7 @@ object ThreadUtils {
* @return new collection in which each element was given from the input
collection `in` by
* applying the lambda function `f`.
*/
- def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]](
- in: Col[I],
- prefix: String,
- maxThreads: Int)(f: I => O)(implicit
- cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
- cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
- ): Col[O] = {
+ def parmap[I, O](in: Iterable[I], prefix: String, maxThreads: Int)(f: I =>
O): Iterable[O] = {
val pool = newForkJoinPool(prefix, maxThreads)
try {
implicit val ec = ExecutionContext.fromExecutor(pool)
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index ff9157448..bce77cc68 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -162,7 +162,7 @@ class UtilsSuite extends CelebornFunSuite {
GetReducerFileGroupResponse]
assert(response.status == responseTrans.status)
- assert(response.attempts.deep == responseTrans.attempts.deep)
+ assert(util.Arrays.equals(response.attempts, responseTrans.attempts))
val set =
(response.fileGroup.values().toArray diff
responseTrans.fileGroup.values().toArray).toSet
assert(set.size == 0)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index aeea6c232..fbd1dc1e9 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -17,7 +17,7 @@
package org.apache.celeborn.service.deploy.worker.storage
-import java.io._
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader,
IOException}
import java.nio.charset.Charset
import java.util
import java.util.concurrent.TimeUnit
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
-import org.slf4j.LoggerFactory
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging