This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch branch-0.2 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 0fb12d77aec7175e8dbdb8b97e34bebebefcdbee Author: Cheng Pan <[email protected]> AuthorDate: Wed Dec 14 10:04:39 2022 +0800 [CELEBORN-132] ShuffleClient should not implement Cloneable (#1077) --- .../spark/shuffle/celeborn/RssShuffleManager.java | 2 +- .../shuffle/celeborn/RssShuffleWriterSuiteJ.java | 4 ++-- .../spark/shuffle/celeborn/RssShuffleManager.java | 2 +- .../shuffle/celeborn/RssShuffleWriterSuiteJ.java | 2 +- .../org/apache/celeborn/client/ShuffleClient.java | 28 +++++++++++----------- .../apache/celeborn/client/ShuffleClientImpl.java | 12 +++++----- .../apache/celeborn/client/DummyShuffleClient.java | 2 +- .../service/deploy/cluster/ReadWriteTestBase.scala | 2 +- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java index 82e31026..34feca61 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java @@ -141,7 +141,7 @@ public class RssShuffleManager implements ShuffleManager { @Override public void stop() { if (rssShuffleClient != null) { - rssShuffleClient.shutDown(); + rssShuffleClient.shutdown(); } if (lifecycleManager != null) { lifecycleManager.stop(); diff --git a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java index 6b00eb42..59334a3b 100644 --- a/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java +++ b/client-spark/spark-2/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java @@ -250,7 +250,7 @@ public class RssShuffleWriterSuiteJ { writer.write(iterator); Option<MapStatus> status = writer.stop(true); - client.shutDown(); + client.shutdown(); assertNotNull(status); assertTrue(status.isDefined()); @@ -315,7 +315,7 @@ public class RssShuffleWriterSuiteJ { writer.write(iterator); Option<MapStatus> status = writer.stop(true); - client.shutDown(); + client.shutdown(); assertNotNull(status); assertTrue(status.isDefined()); diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java index 6f23c741..78a31562 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/RssShuffleManager.java @@ -139,7 +139,7 @@ public class RssShuffleManager implements ShuffleManager { @Override public void stop() { if (rssShuffleClient != null) { - rssShuffleClient.shutDown(); + rssShuffleClient.shutdown(); } if (lifecycleManager != null) { lifecycleManager.stop(); diff --git a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java index 56aed82d..c7e59ce9 100644 --- a/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java +++ b/client-spark/spark-3/src/test/java/org/apache/spark/shuffle/celeborn/RssShuffleWriterSuiteJ.java @@ -232,7 +232,7 @@ public class RssShuffleWriterSuiteJ { writer.write(iterator); Option<MapStatus> status = writer.stop(true); - client.shutDown(); + client.shutdown(); assertNotNull(status); assertTrue(status.isDefined()); diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index b9fcb9e1..dac22fb3 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -33,15 +33,15 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef; * ShuffleClient may be a process singleton, the specific PartitionLocation should be hidden in the * implementation */ -public abstract class ShuffleClient implements Cloneable { +public abstract class ShuffleClient { private static volatile ShuffleClient _instance; - private static volatile boolean initFinished = false; + private static volatile boolean initialized = false; private static volatile FileSystem hdfsFs; // for testing public static void reset() { _instance = null; - initFinished = false; + initialized = false; hdfsFs = null; } @@ -49,7 +49,7 @@ public abstract class ShuffleClient implements Cloneable { public static ShuffleClient get( RpcEndpointRef driverRef, CelebornConf conf, UserIdentifier userIdentifier) { - if (null == _instance || !initFinished) { + if (null == _instance || !initialized) { synchronized (ShuffleClient.class) { if (null == _instance) { // During the execution of Spark tasks, each task may be interrupted due to speculative @@ -59,12 +59,12 @@ public abstract class ShuffleClient implements Cloneable { // when communicating with MetaService, it will cause a NullPointerException. _instance = new ShuffleClientImpl(conf, userIdentifier); _instance.setupMetaServiceRef(driverRef); - initFinished = true; - } else if (!initFinished) { - _instance.shutDown(); + initialized = true; + } else if (!initialized) { + _instance.shutdown(); _instance = new ShuffleClientImpl(conf, userIdentifier); _instance.setupMetaServiceRef(driverRef); - initFinished = true; + initialized = true; } } } @@ -73,7 +73,7 @@ public abstract class ShuffleClient implements Cloneable { public static ShuffleClient get( String driverHost, int port, CelebornConf conf, UserIdentifier userIdentifier) { - if (null == _instance || !initFinished) { + if (null == _instance || !initialized) { synchronized (ShuffleClient.class) { if (null == _instance) { // During the execution of Spark tasks, each task may be interrupted due to speculative @@ -83,12 +83,12 @@ public abstract class ShuffleClient implements Cloneable { // when communicating with MetaService, it will cause a NullPointerException. _instance = new ShuffleClientImpl(conf, userIdentifier); _instance.setupMetaServiceRef(driverHost, port); - initFinished = true; - } else if (!initFinished) { - _instance.shutDown(); + initialized = true; + } else if (!initialized) { + _instance.shutdown(); _instance = new ShuffleClientImpl(conf, userIdentifier); _instance.setupMetaServiceRef(driverHost, port); - initFinished = true; + initialized = true; } } } @@ -173,7 +173,7 @@ public abstract class ShuffleClient implements Cloneable { public abstract boolean unregisterShuffle(String applicationId, int shuffleId, boolean isDriver); - public abstract void shutDown(); + public abstract void shutdown(); public abstract Optional<PartitionLocation> regionStart( String applicationId, diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 67a6ee4e..217e225f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -73,7 +73,7 @@ public class ShuffleClientImpl extends ShuffleClient { private static final byte MASTER_MODE = PartitionLocation.Mode.MASTER.mode(); - private static final Random rand = new Random(); + private static final Random RND = new Random(); private final CelebornConf conf; @@ -137,9 +137,9 @@ public class ShuffleClientImpl extends ShuffleClient { // init rpc env and master endpointRef rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(), 0, conf); + String module = TransportModuleConstants.DATA_MODULE; TransportConf dataTransportConf = - Utils.fromCelebornConf( - conf, TransportModuleConstants.DATA_MODULE, conf.getInt("celeborn.data.io.threads", 8)); + Utils.fromCelebornConf(conf, module, conf.getInt("celeborn" + module + ".io.threads", 8)); TransportContext context = new TransportContext(dataTransportConf, new BaseMessageHandler(), true); dataClientFactory = context.createClientFactory(); @@ -396,7 +396,7 @@ public class ShuffleClientImpl extends ShuffleClient { return true; } - long sleepTimeMs = rand.nextInt(50); + long sleepTimeMs = RND.nextInt(50); if (sleepTimeMs > 30) { try { TimeUnit.MILLISECONDS.sleep(sleepTimeMs); @@ -850,7 +850,7 @@ public class ShuffleClientImpl extends ShuffleClient { new ArrayList<>(pushState.batchesMap.entrySet()); while (!batchesArr.isEmpty()) { limitMaxInFlight(mapKey, pushState, maxInFlight); - Map.Entry<String, DataBatches> entry = batchesArr.get(rand.nextInt(batchesArr.size())); + Map.Entry<String, DataBatches> entry = batchesArr.get(RND.nextInt(batchesArr.size())); ArrayList<DataBatches.DataBatch> batches = entry.getValue().requireBatches(pushBufferMaxSize); if (entry.getValue().getTotalSize() == 0) { batchesArr.remove(entry); @@ -1164,7 +1164,7 @@ public class ShuffleClientImpl extends ShuffleClient { } @Override - public void shutDown() { + public void shutdown() { if (null != rpcEnv) { rpcEnv.shutdown(); } diff --git a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java index 2fb60718..83393ed1 100644 --- a/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java +++ b/client/src/test/java/org/apache/celeborn/client/DummyShuffleClient.java @@ -117,7 +117,7 @@ public class DummyShuffleClient extends ShuffleClient { } @Override - public void shutDown() { + public void shutdown() { try { os.close(); } catch (IOException e) { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala index b2e3fc6c..ae50a499 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala @@ -88,7 +88,7 @@ trait ReadWriteTestBase extends Logging { Assert.assertArrayEquals(targetArr, readBytes) Thread.sleep(5000L) - shuffleClient.shutDown() + shuffleClient.shutdown() lifecycleManager.rpcEnv.shutdown() }
