This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ec371c00 [CELEBORN-132] ShuffleClient should not implement Cloneable
(#1077)
ec371c00 is described below
commit ec371c0026aa4da8c77cc3a0e839afb8412b039b
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Dec 14 10:04:39 2022 +0800
[CELEBORN-132] ShuffleClient should not implement Cloneable (#1077)
---
.../spark/shuffle/celeborn/RssShuffleManager.java | 2 +-
.../shuffle/celeborn/RssShuffleWriterSuiteJ.java | 4 ++--
.../spark/shuffle/celeborn/RssShuffleManager.java | 2 +-
.../shuffle/celeborn/RssShuffleWriterSuiteJ.java | 2 +-
.../org/apache/celeborn/client/ShuffleClient.java | 28 +++++++++++-----------
.../apache/celeborn/client/ShuffleClientImpl.java | 12 +++++-----
.../apache/celeborn/client/DummyShuffleClient.java | 2 +-
.../service/deploy/cluster/ReadWriteTestBase.scala | 2 +-
8 files changed, 27 insertions(+), 27 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 82e31026..34feca61 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -141,7 +141,7 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public void stop() {
if (rssShuffleClient != null) {
- rssShuffleClient.shutDown();
+ rssShuffleClient.shutdown();
}
if (lifecycleManager != null) {
lifecycleManager.stop();
diff --git
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
index 6b00eb42..59334a3b 100644
---
a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
+++
b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
@@ -250,7 +250,7 @@ public class RssShuffleWriterSuiteJ {
writer.write(iterator);
Option<MapStatus> status = writer.stop(true);
- client.shutDown();
+ client.shutdown();
assertNotNull(status);
assertTrue(status.isDefined());
@@ -315,7 +315,7 @@ public class RssShuffleWriterSuiteJ {
writer.write(iterator);
Option<MapStatus> status = writer.stop(true);
- client.shutDown();
+ client.shutdown();
assertNotNull(status);
assertTrue(status.isDefined());
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
index 6f23c741..78a31562 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java
@@ -139,7 +139,7 @@ public class RssShuffleManager implements ShuffleManager {
@Override
public void stop() {
if (rssShuffleClient != null) {
- rssShuffleClient.shutDown();
+ rssShuffleClient.shutdown();
}
if (lifecycleManager != null) {
lifecycleManager.stop();
diff --git
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
index 56aed82d..c7e59ce9 100644
---
a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
+++
b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java
@@ -232,7 +232,7 @@ public class RssShuffleWriterSuiteJ {
writer.write(iterator);
Option<MapStatus> status = writer.stop(true);
- client.shutDown();
+ client.shutdown();
assertNotNull(status);
assertTrue(status.isDefined());
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index 4beaf00c..f5ea4800 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -35,15 +35,15 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef;
* ShuffleClient may be a process singleton, the specific PartitionLocation
should be hidden in the
* implementation
*/
-public abstract class ShuffleClient implements Cloneable {
+public abstract class ShuffleClient {
private static volatile ShuffleClient _instance;
- private static volatile boolean initFinished = false;
+ private static volatile boolean initialized = false;
private static volatile FileSystem hdfsFs;
// for testing
public static void reset() {
_instance = null;
- initFinished = false;
+ initialized = false;
hdfsFs = null;
}
@@ -51,7 +51,7 @@ public abstract class ShuffleClient implements Cloneable {
public static ShuffleClient get(
RpcEndpointRef driverRef, CelebornConf conf, UserIdentifier
userIdentifier) {
- if (null == _instance || !initFinished) {
+ if (null == _instance || !initialized) {
synchronized (ShuffleClient.class) {
if (null == _instance) {
// During the execution of Spark tasks, each task may be interrupted
due to speculative
@@ -61,12 +61,12 @@ public abstract class ShuffleClient implements Cloneable {
// when communicating with MetaService, it will cause a
NullPointerException.
_instance = new ShuffleClientImpl(conf, userIdentifier);
_instance.setupMetaServiceRef(driverRef);
- initFinished = true;
- } else if (!initFinished) {
- _instance.shutDown();
+ initialized = true;
+ } else if (!initialized) {
+ _instance.shutdown();
_instance = new ShuffleClientImpl(conf, userIdentifier);
_instance.setupMetaServiceRef(driverRef);
- initFinished = true;
+ initialized = true;
}
}
}
@@ -75,7 +75,7 @@ public abstract class ShuffleClient implements Cloneable {
public static ShuffleClient get(
String driverHost, int port, CelebornConf conf, UserIdentifier
userIdentifier) {
- if (null == _instance || !initFinished) {
+ if (null == _instance || !initialized) {
synchronized (ShuffleClient.class) {
if (null == _instance) {
// During the execution of Spark tasks, each task may be interrupted
due to speculative
@@ -85,12 +85,12 @@ public abstract class ShuffleClient implements Cloneable {
// when communicating with MetaService, it will cause a
NullPointerException.
_instance = new ShuffleClientImpl(conf, userIdentifier);
_instance.setupMetaServiceRef(driverHost, port);
- initFinished = true;
- } else if (!initFinished) {
- _instance.shutDown();
+ initialized = true;
+ } else if (!initialized) {
+ _instance.shutdown();
_instance = new ShuffleClientImpl(conf, userIdentifier);
_instance.setupMetaServiceRef(driverHost, port);
- initFinished = true;
+ initialized = true;
}
}
}
@@ -185,7 +185,7 @@ public abstract class ShuffleClient implements Cloneable {
public abstract boolean unregisterShuffle(String applicationId, int
shuffleId, boolean isDriver);
- public abstract void shutDown();
+ public abstract void shutdown();
// Write data to a specific map partition, input data's type is Bytebuf.
// data's type is Bytebuf to avoid copy between application and netty
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 51243ee5..103eac13 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -75,7 +75,7 @@ public class ShuffleClientImpl extends ShuffleClient {
private static final byte MASTER_MODE = PartitionLocation.Mode.MASTER.mode();
- private static final Random rand = new Random();
+ private static final Random RND = new Random();
private final CelebornConf conf;
@@ -143,9 +143,9 @@ public class ShuffleClientImpl extends ShuffleClient {
// init rpc env and master endpointRef
rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(), 0, conf);
+ String module = TransportModuleConstants.DATA_MODULE;
TransportConf dataTransportConf =
- Utils.fromCelebornConf(
- conf, TransportModuleConstants.DATA_MODULE,
conf.getInt("celeborn.data.io.threads", 8));
+ Utils.fromCelebornConf(conf, module, conf.getInt("celeborn" + module +
".io.threads", 8));
TransportContext context =
new TransportContext(dataTransportConf, new BaseMessageHandler(),
true);
dataClientFactory = context.createClientFactory();
@@ -402,7 +402,7 @@ public class ShuffleClientImpl extends ShuffleClient {
return true;
}
- long sleepTimeMs = rand.nextInt(50);
+ long sleepTimeMs = RND.nextInt(50);
if (sleepTimeMs > 30) {
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
@@ -873,7 +873,7 @@ public class ShuffleClientImpl extends ShuffleClient {
new ArrayList<>(pushState.batchesMap.entrySet());
while (!batchesArr.isEmpty()) {
limitMaxInFlight(mapKey, pushState, currentMaxReqsInFlight);
- Map.Entry<String, DataBatches> entry =
batchesArr.get(rand.nextInt(batchesArr.size()));
+ Map.Entry<String, DataBatches> entry =
batchesArr.get(RND.nextInt(batchesArr.size()));
ArrayList<DataBatches.DataBatch> batches =
entry.getValue().requireBatches(pushBufferMaxSize);
if (entry.getValue().getTotalSize() == 0) {
batchesArr.remove(entry);
@@ -1233,7 +1233,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
@Override
- public void shutDown() {
+ public void shutdown() {
if (null != rpcEnv) {
rpcEnv.shutdown();
}
diff --git
a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
index d1911fa3..07f03038 100644
--- a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -129,7 +129,7 @@ public class DummyShuffleClient extends ShuffleClient {
}
@Override
- public void shutDown() {
+ public void shutdown() {
try {
os.close();
} catch (IOException e) {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
index b2e3fc6c..ae50a499 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
@@ -88,7 +88,7 @@ trait ReadWriteTestBase extends Logging {
Assert.assertArrayEquals(targetArr, readBytes)
Thread.sleep(5000L)
- shuffleClient.shutDown()
+ shuffleClient.shutdown()
lifecycleManager.rpcEnv.shutdown()
}