This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 0fb12d77aec7175e8dbdb8b97e34bebebefcdbee
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Dec 14 10:04:39 2022 +0800

    [CELEBORN-132] ShuffleClient should not implement Cloneable (#1077)
---
 .../spark/shuffle/celeborn/RssShuffleManager.java  |  2 +-
 .../shuffle/celeborn/RssShuffleWriterSuiteJ.java   |  4 ++--
 .../spark/shuffle/celeborn/RssShuffleManager.java  |  2 +-
 .../shuffle/celeborn/RssShuffleWriterSuiteJ.java   |  2 +-
 .../org/apache/celeborn/client/ShuffleClient.java  | 28 +++++++++++-----------
 .../apache/celeborn/client/ShuffleClientImpl.java  | 12 +++++-----
 .../apache/celeborn/client/DummyShuffleClient.java |  2 +-
 .../service/deploy/cluster/ReadWriteTestBase.scala |  2 +-
 8 files changed, 27 insertions(+), 27 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 82e31026..34feca61 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -141,7 +141,7 @@ public class RssShuffleManager implements ShuffleManager {
   @Override
   public void stop() {
     if (rssShuffleClient != null) {
-      rssShuffleClient.shutDown();
+      rssShuffleClient.shutdown();
     }
     if (lifecycleManager != null) {
       lifecycleManager.stop();
diff --git 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
index 6b00eb42..59334a3b 100644
--- 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
@@ -250,7 +250,7 @@ public class RssShuffleWriterSuiteJ {
 
       writer.write(iterator);
       Option<MapStatus> status = writer.stop(true);
-      client.shutDown();
+      client.shutdown();
 
       assertNotNull(status);
       assertTrue(status.isDefined());
@@ -315,7 +315,7 @@ public class RssShuffleWriterSuiteJ {
 
       writer.write(iterator);
       Option<MapStatus> status = writer.stop(true);
-      client.shutDown();
+      client.shutdown();
 
       assertNotNull(status);
       assertTrue(status.isDefined());
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 6f23c741..78a31562 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -139,7 +139,7 @@ public class RssShuffleManager implements ShuffleManager {
   @Override
   public void stop() {
     if (rssShuffleClient != null) {
-      rssShuffleClient.shutDown();
+      rssShuffleClient.shutdown();
     }
     if (lifecycleManager != null) {
       lifecycleManager.stop();
diff --git 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
index 56aed82d..c7e59ce9 100644
--- 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
@@ -232,7 +232,7 @@ public class RssShuffleWriterSuiteJ {
 
     writer.write(iterator);
     Option<MapStatus> status = writer.stop(true);
-    client.shutDown();
+    client.shutdown();
 
     assertNotNull(status);
     assertTrue(status.isDefined());
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index b9fcb9e1..dac22fb3 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -33,15 +33,15 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef;
  * ShuffleClient may be a process singleton, the specific PartitionLocation 
should be hidden in the
  * implementation
  */
-public abstract class ShuffleClient implements Cloneable {
+public abstract class ShuffleClient {
   private static volatile ShuffleClient _instance;
-  private static volatile boolean initFinished = false;
+  private static volatile boolean initialized = false;
   private static volatile FileSystem hdfsFs;
 
   // for testing
   public static void reset() {
     _instance = null;
-    initFinished = false;
+    initialized = false;
     hdfsFs = null;
   }
 
@@ -49,7 +49,7 @@ public abstract class ShuffleClient implements Cloneable {
 
   public static ShuffleClient get(
       RpcEndpointRef driverRef, CelebornConf conf, UserIdentifier 
userIdentifier) {
-    if (null == _instance || !initFinished) {
+    if (null == _instance || !initialized) {
       synchronized (ShuffleClient.class) {
         if (null == _instance) {
           // During the execution of Spark tasks, each task may be interrupted 
due to speculative
@@ -59,12 +59,12 @@ public abstract class ShuffleClient implements Cloneable {
           // when communicating with MetaService, it will cause a 
NullPointerException.
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverRef);
-          initFinished = true;
-        } else if (!initFinished) {
-          _instance.shutDown();
+          initialized = true;
+        } else if (!initialized) {
+          _instance.shutdown();
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverRef);
-          initFinished = true;
+          initialized = true;
         }
       }
     }
@@ -73,7 +73,7 @@ public abstract class ShuffleClient implements Cloneable {
 
   public static ShuffleClient get(
       String driverHost, int port, CelebornConf conf, UserIdentifier 
userIdentifier) {
-    if (null == _instance || !initFinished) {
+    if (null == _instance || !initialized) {
       synchronized (ShuffleClient.class) {
         if (null == _instance) {
           // During the execution of Spark tasks, each task may be interrupted 
due to speculative
@@ -83,12 +83,12 @@ public abstract class ShuffleClient implements Cloneable {
           // when communicating with MetaService, it will cause a 
NullPointerException.
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverHost, port);
-          initFinished = true;
-        } else if (!initFinished) {
-          _instance.shutDown();
+          initialized = true;
+        } else if (!initialized) {
+          _instance.shutdown();
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverHost, port);
-          initFinished = true;
+          initialized = true;
         }
       }
     }
@@ -173,7 +173,7 @@ public abstract class ShuffleClient implements Cloneable {
 
   public abstract boolean unregisterShuffle(String applicationId, int 
shuffleId, boolean isDriver);
 
-  public abstract void shutDown();
+  public abstract void shutdown();
 
   public abstract Optional<PartitionLocation> regionStart(
       String applicationId,
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 67a6ee4e..217e225f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -73,7 +73,7 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   private static final byte MASTER_MODE = PartitionLocation.Mode.MASTER.mode();
 
-  private static final Random rand = new Random();
+  private static final Random RND = new Random();
 
   private final CelebornConf conf;
 
@@ -137,9 +137,9 @@ public class ShuffleClientImpl extends ShuffleClient {
     // init rpc env and master endpointRef
     rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(), 0, conf);
 
+    String module = TransportModuleConstants.DATA_MODULE;
     TransportConf dataTransportConf =
-        Utils.fromCelebornConf(
-            conf, TransportModuleConstants.DATA_MODULE, 
conf.getInt("celeborn.data.io.threads", 8));
+        Utils.fromCelebornConf(conf, module, conf.getInt("celeborn" + module + 
".io.threads", 8));
     TransportContext context =
         new TransportContext(dataTransportConf, new BaseMessageHandler(), 
true);
     dataClientFactory = context.createClientFactory();
@@ -396,7 +396,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       return true;
     }
 
-    long sleepTimeMs = rand.nextInt(50);
+    long sleepTimeMs = RND.nextInt(50);
     if (sleepTimeMs > 30) {
       try {
         TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
@@ -850,7 +850,7 @@ public class ShuffleClientImpl extends ShuffleClient {
         new ArrayList<>(pushState.batchesMap.entrySet());
     while (!batchesArr.isEmpty()) {
       limitMaxInFlight(mapKey, pushState, maxInFlight);
-      Map.Entry<String, DataBatches> entry = 
batchesArr.get(rand.nextInt(batchesArr.size()));
+      Map.Entry<String, DataBatches> entry = 
batchesArr.get(RND.nextInt(batchesArr.size()));
       ArrayList<DataBatches.DataBatch> batches = 
entry.getValue().requireBatches(pushBufferMaxSize);
       if (entry.getValue().getTotalSize() == 0) {
         batchesArr.remove(entry);
@@ -1164,7 +1164,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   @Override
-  public void shutDown() {
+  public void shutdown() {
     if (null != rpcEnv) {
       rpcEnv.shutdown();
     }
diff --git 
a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java 
b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
index 2fb60718..83393ed1 100644
--- a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -117,7 +117,7 @@ public class DummyShuffleClient extends ShuffleClient {
   }
 
   @Override
-  public void shutDown() {
+  public void shutdown() {
     try {
       os.close();
     } catch (IOException e) {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
index b2e3fc6c..ae50a499 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
@@ -88,7 +88,7 @@ trait ReadWriteTestBase extends Logging {
     Assert.assertArrayEquals(targetArr, readBytes)
 
     Thread.sleep(5000L)
-    shuffleClient.shutDown()
+    shuffleClient.shutdown()
     lifecycleManager.rpcEnv.shutdown()
 
   }

Reply via email to