This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e734ceb55 [MINOR] Cleanup code
e734ceb55 is described below

commit e734ceb55830c24d9201efdb2d4cc17a60aff90f
Author: sychen <[email protected]>
AuthorDate: Mon Jun 19 11:31:51 2023 +0800

    [MINOR] Cleanup code
    
    ### What changes were proposed in this pull request?
    1. Use `<arg>-Ywarn-unused-import</arg>` to remove some unused imports
    There is no way to use `<arg>-Ywarn-unused-import</arg>` at this stage
    Because we have the following code
    ```
    // Can Remove this if celeborn don't support scala211 in future
    import org.apache.celeborn.common.util.FunctionConverter._
    ```
    2. Fix scala case match not fully covered, avoid `scala.MatchError`
    3. Fixed some scala compilation warnings
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1600 from cxzl25/cleanup_code.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/spark/shuffle/celeborn/RssShuffleReader.scala   |  2 +-
 .../scala/org/apache/celeborn/client/LifecycleManager.scala    | 10 ++++++++--
 .../org/apache/celeborn/client/ReleasePartitionManager.scala   |  2 +-
 .../org/apache/celeborn/client/commit/CommitHandler.scala      |  2 +-
 .../apache/celeborn/common/internal/config/ConfigBuilder.scala |  1 -
 .../apache/celeborn/common/internal/config/ConfigEntry.scala   |  2 --
 .../celeborn/common/meta/ShufflePartitionLocationInfo.scala    |  1 -
 .../scala/org/apache/celeborn/common/meta/WorkerInfo.scala     |  2 --
 .../org/apache/celeborn/common/metrics/MetricsSystem.scala     |  2 +-
 .../apache/celeborn/common/metrics/source/AbstractSource.scala |  2 +-
 .../org/apache/celeborn/common/metrics/source/JVMSource.scala  |  1 +
 .../celeborn/common/protocol/message/ControlMessages.scala     |  4 ++--
 .../org/apache/celeborn/common/util/FunctionConverter.scala    |  2 ++
 .../scala/org/apache/celeborn/common/util/ThreadUtils.scala    |  2 +-
 .../src/main/scala/org/apache/celeborn/common/util/Utils.scala |  2 +-
 .../org/apache/celeborn/common/meta/WorkerInfoSuite.scala      |  2 +-
 .../org/apache/celeborn/service/deploy/master/Master.scala     |  1 -
 .../celeborn/service/deploy/master/MasterArguments.scala       |  1 -
 .../service/deploy/master/clustermeta/ha/MasterNode.scala      |  2 --
 pom.xml                                                        |  9 +++++++++
 .../scala/org/apache/celeborn/tests/flink/WordCountTest.scala  |  2 --
 .../celeborn/service/deploy/worker/PushDataHandler.scala       |  8 +++++++-
 .../service/deploy/worker/storage/StorageManager.scala         |  2 ++
 23 files changed, 39 insertions(+), 25 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
index f647dd29e..e0cc31a06 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/RssShuffleReader.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{InterruptibleIterator, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.{ShuffleReader, ShuffleReadMetricsReporter}
 import org.apache.spark.sql.execution.UnsafeRowSerializer
-import org.apache.spark.sql.execution.columnar.{RssBatchBuilder, 
RssColumnarBatchBuilder, RssColumnarBatchSerializer}
+import org.apache.spark.sql.execution.columnar.{RssBatchBuilder, 
RssColumnarBatchSerializer}
 import org.apache.spark.util.CompletionIterator
 import org.apache.spark.util.collection.ExternalSorter
 
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 77dcfc559..779dec0ef 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.client
 
 import java.util
-import java.util.{function, HashSet => JHashSet, List => JList, Set => JSet}
+import java.util.{function, List => JList}
 import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
 
 import scala.collection.JavaConverters._
@@ -297,6 +297,8 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
             attemptId,
             partitionId,
             numMappers)
+        case _ =>
+          throw new UnsupportedOperationException(s"Not support $partitionType 
yet")
       }
 
     case GetReducerFileGroup(applicationId: String, shuffleId: Int) =>
@@ -340,6 +342,8 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
                 initialLocs)
             case PartitionType.REDUCE =>
               context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, 
initialLocs))
+            case _ =>
+              throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
           }
           return
         }
@@ -398,6 +402,8 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
                   context.reply(response)
                 }
               case PartitionType.REDUCE => context.reply(response)
+              case _ =>
+                throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
             }
           }))
         registeringShuffleRequest.remove(shuffleId)
@@ -406,7 +412,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
 
     // First, request to get allocated slots from Master
     val ids = new util.ArrayList[Integer](numPartitions)
-    (0 until numPartitions).foreach(idx => ids.add(new Integer(idx)))
+    (0 until numPartitions).foreach(idx => ids.add(Integer.valueOf(idx)))
     val res = requestMasterRequestSlotsWithRetry(applicationId, shuffleId, ids)
 
     res.status match {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
 
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
index 6dc635078..f8ae855f2 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.client
 
 import java.util
-import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, 
ScheduledFuture, TimeUnit}
+import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, 
TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.DurationInt
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index b88af9723..1b9a9e5f2 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 
 import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
 import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
-import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers, ShuffleFileGroups}
+import org.apache.celeborn.client.LifecycleManager.{ShuffleFailedWorkers, 
ShuffleFileGroups}
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, 
WorkerInfo}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
 
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
index 76fd10f09..0dff64d4d 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigBuilder.scala
@@ -22,7 +22,6 @@ import java.util.regex.PatternSyntaxException
 
 import scala.util.matching.Regex
 
-import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.network.util.ByteUnit
 import org.apache.celeborn.common.util.{JavaUtils, Utils}
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
 
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
index ebd391399..9360688b4 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
@@ -17,8 +17,6 @@
 
 package org.apache.celeborn.common.internal.config
 
-import java.util.concurrent.ConcurrentHashMap
-
 import 
org.apache.celeborn.common.internal.config.ConfigHelpers.AlternativesTransfer
 import org.apache.celeborn.common.util.JavaUtils
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
 
b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
index b40bc2917..6e661ef8a 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
@@ -19,7 +19,6 @@ package org.apache.celeborn.common.meta
 
 import java.util
 import java.util.concurrent.ConcurrentHashMap
-import java.util.stream.Collectors
 
 import scala.collection.JavaConverters._
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 407bb960a..02f036bcb 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -18,8 +18,6 @@
 package org.apache.celeborn.common.meta
 
 import java.util
-import java.util.Objects
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index 9f60c5cd1..0d20cf184 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -109,7 +109,7 @@ class MetricsSystem(
     sourceConfigs.foreach { kv =>
       val classPath = kv._2.getProperty("class")
       try {
-        val source = Utils.classForName(classPath).newInstance()
+        val source = 
Utils.classForName(classPath).getDeclaredConstructor().newInstance()
         registerSource(source.asInstanceOf[Source])
       } catch {
         case e: Exception => logError("Source class " + classPath + " cannot 
be instantiated", e)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index a445d74c7..d28986bb0 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -421,7 +421,7 @@ class TimerSupplier(val slidingWindowSize: Int)
 class GaugeSupplier[T](f: Unit => T) extends 
MetricRegistry.MetricSupplier[Gauge[_]] {
   override def newMetric(): Gauge[T] = {
     new Gauge[T] {
-      override def getValue: T = f()
+      override def getValue: T = f(())
     }
   }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
index a6e2af925..374024dd2 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/JVMSource.scala
@@ -37,6 +37,7 @@ class JVMSource(conf: CelebornConf, role: String) extends 
AbstractSource(conf, r
     .map { x =>
       x.getMetrics.asScala.map {
         case (name: String, metric: Gauge[_]) => addGauge(name, metric)
+        case (name, metric) => new IllegalArgumentException(s"Unknown metric 
type: $name: $metric")
       }
     }
   // start cleaner
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 438369a42..7e26561ce 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -574,7 +574,7 @@ object ControlMessages extends Logging {
             
PbFileGroup.newBuilder().addAllLocations(fileGroup.asScala.map(PbSerDeUtils
               .toPbPartitionLocation).toList.asJava).build())
         }.asJava)
-      builder.addAllAttempts(attempts.map(new Integer(_)).toIterable.asJava)
+      builder.addAllAttempts(attempts.map(Integer.valueOf).toIterable.asJava)
       builder.addAllPartitionIds(partitionIds)
       val payload = builder.build().toByteArray
       new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, 
payload)
@@ -734,7 +734,7 @@ object ControlMessages extends Logging {
         .setShuffleId(shuffleId)
         .addAllMasterIds(masterIds)
         .addAllSlaveIds(slaveIds)
-        .addAllMapAttempts(mapAttempts.map(new Integer(_)).toIterable.asJava)
+        .addAllMapAttempts(mapAttempts.map(Integer.valueOf).toIterable.asJava)
         .setEpoch(epoch)
         .build().toByteArray
       new TransportMessage(MessageType.COMMIT_FILES, payload)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
index 71c75cf28..8eef121f4 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/FunctionConverter.scala
@@ -17,6 +17,8 @@
 
 package org.apache.celeborn.common.util
 
+import scala.language.implicitConversions
+
 /**
  * Implicit conversion for scala(2.11) function to java function
  */
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index d4cb4417b..ca5066c34 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -18,12 +18,12 @@
 package org.apache.celeborn.common.util
 
 import java.util.concurrent._
+import java.util.concurrent.{ForkJoinPool => SForkJoinPool, 
ForkJoinWorkerThread => SForkJoinWorkerThread}
 
 import scala.collection.TraversableLike
 import scala.collection.generic.CanBuildFrom
 import scala.concurrent.{Awaitable, ExecutionContext, 
ExecutionContextExecutor, Future}
 import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, 
ForkJoinWorkerThread => SForkJoinWorkerThread}
 import scala.language.higherKinds
 import scala.util.control.NonFatal
 
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index dd835c35b..f5fa1bfe3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -27,7 +27,7 @@ import java.nio.charset.StandardCharsets
 import java.text.SimpleDateFormat
 import java.util
 import java.util.{Locale, Properties, Random, UUID}
-import java.util.concurrent.{Callable, ConcurrentHashMap, ThreadPoolExecutor, 
TimeoutException, TimeUnit}
+import java.util.concurrent.{Callable, ThreadPoolExecutor, TimeoutException, 
TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index 2a8b41b0c..8ccf1d295 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -19,7 +19,7 @@ package org.apache.celeborn.common.meta
 
 import java.util
 import java.util.{Map => jMap}
-import java.util.concurrent.{ConcurrentHashMap, Future, ThreadLocalRandom}
+import java.util.concurrent.{Future, ThreadLocalRandom}
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.ArrayBuffer
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 386038019..563580101 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
 import scala.util.Random
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.exception.CelebornRuntimeException
 import org.apache.celeborn.common.haclient.RssHARetryClient
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.internal.Logging
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
index 7b0d07e70..331e9a8b3 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterArguments.scala
@@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.master
 import scala.annotation.tailrec
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.CelebornConf._
 import org.apache.celeborn.common.util.{IntParam, Utils}
 import 
org.apache.celeborn.service.deploy.master.clustermeta.ha.MasterClusterInfo
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
index 0ad315704..a1ac2b67e 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterNode.scala
@@ -20,8 +20,6 @@ package 
org.apache.celeborn.service.deploy.master.clustermeta.ha
 import java.io.IOException
 import java.net.{InetAddress, InetSocketAddress}
 
-import scala.util.{Failure, Success}
-
 import org.apache.ratis.util.NetUtils
 
 import org.apache.celeborn.common.internal.Logging
diff --git a/pom.xml b/pom.xml
index 1ecf80ce8..f7d84964e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -653,6 +653,15 @@
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
           <version>${maven.plugin.scala.version}</version>
+          <configuration>
+            <args>
+              <arg>-unchecked</arg>
+              <arg>-deprecation</arg>
+              <arg>-feature</arg>
+              <arg>-explaintypes</arg>
+              <arg>-Xfatal-warnings</arg>
+            </args>
+          </configuration>
           <executions>
             <execution>
               <id>scala-compile-first</id>
diff --git 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index 463380855..fbf97665b 100644
--- 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++ 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -60,7 +60,6 @@ class WordCountTest extends AnyFunSuite with Logging with 
MiniClusterFeature
       "org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory")
     configuration.setString("celeborn.master.endpoints", "localhost:9097")
     configuration.setString("execution.batch-shuffle-mode", 
"ALL_EXCHANGES_BLOCKING")
-    configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
     configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH)
     configuration.setString("taskmanager.memory.network.min", "1024m")
     configuration.setString(RestOptions.BIND_PORT, "8081-8089")
@@ -70,7 +69,6 @@ class WordCountTest extends AnyFunSuite with Logging with 
MiniClusterFeature
     val env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
     env.getConfig.setExecutionMode(ExecutionMode.BATCH)
     env.getConfig.setParallelism(parallelism)
-    
env.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ALL)
     env.disableOperatorChaining()
     // make parameters available in the web interface
     WordCountHelper.execute(env, parallelism)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 91ddc16f3..3695ba20d 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -94,13 +94,16 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
               client,
               pushData.requestId,
               pushData.shuffleKey)
-            shufflePartitionType.getOrDefault(pushData.shuffleKey, 
PartitionType.REDUCE) match {
+            val partitionType =
+              shufflePartitionType.getOrDefault(pushData.shuffleKey, 
PartitionType.REDUCE)
+            partitionType match {
               case PartitionType.REDUCE => handlePushData(
                   pushData,
                   callback)
               case PartitionType.MAP => handleMapPartitionPushData(
                   pushData,
                   callback)
+              case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
             }
           })
       case pushMergedData: PushMergedData =>
@@ -843,6 +846,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
           (WorkerSource.MasterRegionStartTime, 
WorkerSource.SlaveRegionStartTime)
         case Type.REGION_FINISH =>
           (WorkerSource.MasterRegionFinishTime, 
WorkerSource.SlaveRegionFinishTime)
+        case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
 
     val location =
@@ -891,6 +895,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
             message.asInstanceOf[RegionStart].isBroadcast)
         case Type.REGION_FINISH =>
           fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
+        case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
       // for master, send data to slave
       if (location.hasPeer && isMaster) {
@@ -1004,6 +1009,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
         case Type.REGION_FINISH => (
             StatusCode.REGION_FINISH_FAIL_MASTER,
             StatusCode.REGION_FINISH_FAIL_SLAVE)
+        case _ => throw new IllegalArgumentException(s"Not support 
$messageType yet")
       }
     callback.onFailure(new CelebornIOException(
       if (isMaster) messageMaster else messageSlave,
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 3c0267fa1..62cf7a3b5 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -311,6 +311,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
               splitThreshold,
               splitMode,
               rangeReadFilter)
+          case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
         }
 
         fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
@@ -354,6 +355,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
                 splitThreshold,
                 splitMode,
                 rangeReadFilter)
+            case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
           }
           deviceMonitor.registerFileWriter(fileWriter)
           val map = workingDirWriters.computeIfAbsent(dir, 
workingDirWriterListFunc)

Reply via email to