This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ec371c00 [CELEBORN-132] ShuffleClient should not implement Cloneable 
(#1077)
ec371c00 is described below

commit ec371c0026aa4da8c77cc3a0e839afb8412b039b
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Dec 14 10:04:39 2022 +0800

    [CELEBORN-132] ShuffleClient should not implement Cloneable (#1077)
---
 .../spark/shuffle/celeborn/RssShuffleManager.java  |  2 +-
 .../shuffle/celeborn/RssShuffleWriterSuiteJ.java   |  4 ++--
 .../spark/shuffle/celeborn/RssShuffleManager.java  |  2 +-
 .../shuffle/celeborn/RssShuffleWriterSuiteJ.java   |  2 +-
 .../org/apache/celeborn/client/ShuffleClient.java  | 28 +++++++++++-----------
 .../apache/celeborn/client/ShuffleClientImpl.java  | 12 +++++-----
 .../apache/celeborn/client/DummyShuffleClient.java |  2 +-
 .../service/deploy/cluster/ReadWriteTestBase.scala |  2 +-
 8 files changed, 27 insertions(+), 27 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 82e31026..34feca61 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -141,7 +141,7 @@ public class RssShuffleManager implements ShuffleManager {
   @Override
   public void stop() {
     if (rssShuffleClient != null) {
-      rssShuffleClient.shutDown();
+      rssShuffleClient.shutdown();
     }
     if (lifecycleManager != null) {
       lifecycleManager.stop();
diff --git 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
index 6b00eb42..59334a3b 100644
--- 
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
@@ -250,7 +250,7 @@ public class RssShuffleWriterSuiteJ {
 
       writer.write(iterator);
       Option<MapStatus> status = writer.stop(true);
-      client.shutDown();
+      client.shutdown();
 
       assertNotNull(status);
       assertTrue(status.isDefined());
@@ -315,7 +315,7 @@ public class RssShuffleWriterSuiteJ {
 
       writer.write(iterator);
       Option<MapStatus> status = writer.stop(true);
-      client.shutDown();
+      client.shutdown();
 
       assertNotNull(status);
       assertTrue(status.isDefined());
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 6f23c741..78a31562 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -139,7 +139,7 @@ public class RssShuffleManager implements ShuffleManager {
   @Override
   public void stop() {
     if (rssShuffleClient != null) {
-      rssShuffleClient.shutDown();
+      rssShuffleClient.shutdown();
     }
     if (lifecycleManager != null) {
       lifecycleManager.stop();
diff --git 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
index 56aed82d..c7e59ce9 100644
--- 
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
+++ 
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
@@ -232,7 +232,7 @@ public class RssShuffleWriterSuiteJ {
 
     writer.write(iterator);
     Option<MapStatus> status = writer.stop(true);
-    client.shutDown();
+    client.shutdown();
 
     assertNotNull(status);
     assertTrue(status.isDefined());
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index 4beaf00c..f5ea4800 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -35,15 +35,15 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef;
  * ShuffleClient may be a process singleton, the specific PartitionLocation 
should be hidden in the
  * implementation
  */
-public abstract class ShuffleClient implements Cloneable {
+public abstract class ShuffleClient {
   private static volatile ShuffleClient _instance;
-  private static volatile boolean initFinished = false;
+  private static volatile boolean initialized = false;
   private static volatile FileSystem hdfsFs;
 
   // for testing
   public static void reset() {
     _instance = null;
-    initFinished = false;
+    initialized = false;
     hdfsFs = null;
   }
 
@@ -51,7 +51,7 @@ public abstract class ShuffleClient implements Cloneable {
 
   public static ShuffleClient get(
       RpcEndpointRef driverRef, CelebornConf conf, UserIdentifier 
userIdentifier) {
-    if (null == _instance || !initFinished) {
+    if (null == _instance || !initialized) {
       synchronized (ShuffleClient.class) {
         if (null == _instance) {
           // During the execution of Spark tasks, each task may be interrupted 
due to speculative
@@ -61,12 +61,12 @@ public abstract class ShuffleClient implements Cloneable {
           // when communicating with MetaService, it will cause a 
NullPointerException.
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverRef);
-          initFinished = true;
-        } else if (!initFinished) {
-          _instance.shutDown();
+          initialized = true;
+        } else if (!initialized) {
+          _instance.shutdown();
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverRef);
-          initFinished = true;
+          initialized = true;
         }
       }
     }
@@ -75,7 +75,7 @@ public abstract class ShuffleClient implements Cloneable {
 
   public static ShuffleClient get(
       String driverHost, int port, CelebornConf conf, UserIdentifier 
userIdentifier) {
-    if (null == _instance || !initFinished) {
+    if (null == _instance || !initialized) {
       synchronized (ShuffleClient.class) {
         if (null == _instance) {
           // During the execution of Spark tasks, each task may be interrupted 
due to speculative
@@ -85,12 +85,12 @@ public abstract class ShuffleClient implements Cloneable {
           // when communicating with MetaService, it will cause a 
NullPointerException.
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverHost, port);
-          initFinished = true;
-        } else if (!initFinished) {
-          _instance.shutDown();
+          initialized = true;
+        } else if (!initialized) {
+          _instance.shutdown();
           _instance = new ShuffleClientImpl(conf, userIdentifier);
           _instance.setupMetaServiceRef(driverHost, port);
-          initFinished = true;
+          initialized = true;
         }
       }
     }
@@ -185,7 +185,7 @@ public abstract class ShuffleClient implements Cloneable {
 
   public abstract boolean unregisterShuffle(String applicationId, int 
shuffleId, boolean isDriver);
 
-  public abstract void shutDown();
+  public abstract void shutdown();
 
   // Write data to a specific map partition, input data's type is Bytebuf.
   // data's type is Bytebuf to avoid copy between application and netty
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 51243ee5..103eac13 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -75,7 +75,7 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   private static final byte MASTER_MODE = PartitionLocation.Mode.MASTER.mode();
 
-  private static final Random rand = new Random();
+  private static final Random RND = new Random();
 
   private final CelebornConf conf;
 
@@ -143,9 +143,9 @@ public class ShuffleClientImpl extends ShuffleClient {
     // init rpc env and master endpointRef
     rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(), 0, conf);
 
+    String module = TransportModuleConstants.DATA_MODULE;
     TransportConf dataTransportConf =
-        Utils.fromCelebornConf(
-            conf, TransportModuleConstants.DATA_MODULE, 
conf.getInt("celeborn.data.io.threads", 8));
+        Utils.fromCelebornConf(conf, module, conf.getInt("celeborn" + module + 
".io.threads", 8));
     TransportContext context =
         new TransportContext(dataTransportConf, new BaseMessageHandler(), 
true);
     dataClientFactory = context.createClientFactory();
@@ -402,7 +402,7 @@ public class ShuffleClientImpl extends ShuffleClient {
       return true;
     }
 
-    long sleepTimeMs = rand.nextInt(50);
+    long sleepTimeMs = RND.nextInt(50);
     if (sleepTimeMs > 30) {
       try {
         TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
@@ -873,7 +873,7 @@ public class ShuffleClientImpl extends ShuffleClient {
         new ArrayList<>(pushState.batchesMap.entrySet());
     while (!batchesArr.isEmpty()) {
       limitMaxInFlight(mapKey, pushState, currentMaxReqsInFlight);
-      Map.Entry<String, DataBatches> entry = 
batchesArr.get(rand.nextInt(batchesArr.size()));
+      Map.Entry<String, DataBatches> entry = 
batchesArr.get(RND.nextInt(batchesArr.size()));
       ArrayList<DataBatches.DataBatch> batches = 
entry.getValue().requireBatches(pushBufferMaxSize);
       if (entry.getValue().getTotalSize() == 0) {
         batchesArr.remove(entry);
@@ -1233,7 +1233,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   }
 
   @Override
-  public void shutDown() {
+  public void shutdown() {
     if (null != rpcEnv) {
       rpcEnv.shutdown();
     }
diff --git 
a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java 
b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
index d1911fa3..07f03038 100644
--- a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -129,7 +129,7 @@ public class DummyShuffleClient extends ShuffleClient {
   }
 
   @Override
-  public void shutDown() {
+  public void shutdown() {
     try {
       os.close();
     } catch (IOException e) {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
index b2e3fc6c..ae50a499 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
@@ -88,7 +88,7 @@ trait ReadWriteTestBase extends Logging {
     Assert.assertArrayEquals(targetArr, readBytes)
 
     Thread.sleep(5000L)
-    shuffleClient.shutDown()
+    shuffleClient.shutdown()
     lifecycleManager.rpcEnv.shutdown()
 
   }

Reply via email to