This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new f10e6f0f5 Revert "[CELEBORN-798] Add heartbeat from client to
LifecycleManager to clean…"
f10e6f0f5 is described below
commit f10e6f0f5120aa6a22188e62dc21c700f6ec8dcb
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Jul 19 15:09:17 2023 +0800
Revert "[CELEBORN-798] Add heartbeat from client to LifecycleManager to
clean…"
This reverts commit 20b60aba6af855ce3563e41a1227eb903b916146.
---
.../flink/readclient/FlinkShuffleClientImpl.java | 2 +-
.../shuffle/celeborn/SparkShuffleManager.java | 6 ++--
.../shuffle/celeborn/CelebornShuffleReader.scala | 3 +-
.../shuffle/celeborn/SparkShuffleManager.java | 6 ++--
.../shuffle/celeborn/CelebornShuffleReader.scala | 3 +-
.../org/apache/celeborn/client/ShuffleClient.java | 7 ++---
.../apache/celeborn/client/ShuffleClientImpl.java | 32 ++++------------------
.../apache/celeborn/client/LifecycleManager.scala | 15 ----------
.../celeborn/client/ShuffleClientBaseSuiteJ.java | 2 +-
.../celeborn/client/ShuffleClientSuiteJ.java | 2 +-
.../celeborn/client/WithShuffleClientSuite.scala | 2 +-
common/src/main/proto/TransportMessages.proto | 10 -------
.../org/apache/celeborn/common/CelebornConf.scala | 19 ++-----------
.../common/protocol/message/ControlMessages.scala | 29 --------------------
docs/configuration/client.md | 1 -
.../celeborn/tests/client/ShuffleClientSuite.scala | 2 +-
.../celeborn/tests/spark/HeartbeatTest.scala | 6 ++--
.../service/deploy/cluster/ReadWriteTestBase.scala | 3 +-
18 files changed, 25 insertions(+), 125 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 99c345a2d..f1d5cc5b9 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -125,7 +125,7 @@ public class FlinkShuffleClientImpl extends
ShuffleClientImpl {
long driverTimestamp,
CelebornConf conf,
UserIdentifier userIdentifier) {
- super(appUniqueId, conf, userIdentifier, false);
+ super(appUniqueId, conf, userIdentifier);
String module = TransportModuleConstants.DATA_MODULE;
TransportConf dataTransportConf =
Utils.fromCelebornConf(conf, module, conf.getInt("celeborn." + module
+ ".io.threads", 8));
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index a69a7a9d5..95641a6fc 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -109,8 +109,7 @@ public class SparkShuffleManager implements ShuffleManager {
lifecycleManager.getHost(),
lifecycleManager.getPort(),
celebornConf,
- lifecycleManager.getUserIdentifier(),
- true);
+ lifecycleManager.getUserIdentifier());
}
}
}
@@ -187,8 +186,7 @@ public class SparkShuffleManager implements ShuffleManager {
h.lifecycleManagerHost(),
h.lifecycleManagerPort(),
celebornConf,
- h.userIdentifier(),
- false);
+ h.userIdentifier());
if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
ExecutorService pushThread =
celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 1927afb97..8d5c99e0a 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -44,8 +44,7 @@ class CelebornShuffleReader[K, C](
handle.lifecycleManagerHost,
handle.lifecycleManagerPort,
conf,
- handle.userIdentifier,
- false)
+ handle.userIdentifier)
override def read(): Iterator[Product2[K, C]] = {
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 815a1be7f..9695fd8ac 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -116,8 +116,7 @@ public class SparkShuffleManager implements ShuffleManager {
lifecycleManager.getHost(),
lifecycleManager.getPort(),
celebornConf,
- lifecycleManager.getUserIdentifier(),
- true);
+ lifecycleManager.getUserIdentifier());
}
}
}
@@ -198,8 +197,7 @@ public class SparkShuffleManager implements ShuffleManager {
h.lifecycleManagerHost(),
h.lifecycleManagerPort(),
celebornConf,
- h.userIdentifier(),
- false);
+ h.userIdentifier());
if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
ExecutorService pushThread =
celebornConf.clientPushSortPipelineEnabled() ? getPusherThread()
: null;
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index d4c868a4b..5ce891907 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -46,8 +46,7 @@ class CelebornShuffleReader[K, C](
handle.lifecycleManagerHost,
handle.lifecycleManagerPort,
conf,
- handle.userIdentifier,
- false)
+ handle.userIdentifier)
override def read(): Iterator[Product2[K, C]] = {
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index d9e88e780..b77c82780 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -56,8 +56,7 @@ public abstract class ShuffleClient {
String driverHost,
int port,
CelebornConf conf,
- UserIdentifier userIdentifier,
- boolean isDriver) {
+ UserIdentifier userIdentifier) {
if (null == _instance || !initialized) {
synchronized (ShuffleClient.class) {
if (null == _instance) {
@@ -67,12 +66,12 @@ public abstract class ShuffleClient {
// be
// assigned. An Executor will only construct a ShuffleClient
singleton once. At this time,
// when communicating with LifecycleManager, it will cause a
NullPointerException.
- _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier,
isDriver);
+ _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier);
_instance.setupLifecycleManagerRef(driverHost, port);
initialized = true;
} else if (!initialized) {
_instance.shutdown();
- _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier,
isDriver);
+ _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier);
_instance.setupLifecycleManagerRef(driverHost, port);
initialized = true;
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index db4eb4f94..dada674cc 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -20,7 +20,10 @@ package org.apache.celeborn.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import scala.reflect.ClassTag$;
@@ -110,8 +113,6 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final String appUniqueId;
- private ScheduledExecutorService heartbeater;
-
private ThreadLocal<Compressor> compressorThreadLocal =
new ThreadLocal<Compressor>() {
@Override
@@ -153,8 +154,7 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final Map<Integer, ReduceFileGroups> reduceFileGroupsMap =
JavaUtils.newConcurrentHashMap();
- public ShuffleClientImpl(
- String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier,
boolean isDriver) {
+ public ShuffleClientImpl(String appUniqueId, CelebornConf conf,
UserIdentifier userIdentifier) {
super();
this.appUniqueId = appUniqueId;
this.conf = conf;
@@ -193,25 +193,6 @@ public class ShuffleClientImpl extends ShuffleClient {
"celeborn-shuffle-split", pushSplitPartitionThreads, 60);
reviveManager = new ReviveManager(this, conf);
- if (!isDriver) {
- heartbeater =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor(
- "celeborn-lifecyclemanager-heartbeater");
- heartbeater.scheduleAtFixedRate(
- () -> {
- PbHeartbeatFromClientResponse resp =
- lifecycleManagerRef.askSync(
-
HeartbeatFromClient$.MODULE$.apply(reducePartitionMap.keySet()),
-
ClassTag$.MODULE$.apply(PbHeartbeatFromClientResponse.class));
- List<Integer> unknownShuffleIds = resp.getUnknownShuffleIdList();
- for (int i = 0; i < unknownShuffleIds.size(); i++) {
- unregisterShuffle(unknownShuffleIds.get(i), false);
- }
- },
- conf.clientHeartbeatToLifecycleManagerInterval(),
- conf.clientHeartbeatToLifecycleManagerInterval(),
- TimeUnit.MILLISECONDS);
- }
logger.info("Created ShuffleClientImpl, appUniqueId: {}", appUniqueId);
}
@@ -1652,9 +1633,6 @@ public class ShuffleClientImpl extends ShuffleClient {
if (null != lifecycleManagerRef) {
lifecycleManagerRef = null;
}
- if (null != heartbeater) {
- heartbeater.shutdown();
- }
pushExcludedWorkers.clear();
fetchExcludedWorkers.clear();
logger.warn("Shuffle client has been shutdown!");
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 5ef1ed13a..3b291c538 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -20,7 +20,6 @@ package org.apache.celeborn.client
import java.util
import java.util.{function, List => JList}
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
-import java.util.function.Predicate
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -301,9 +300,6 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
logDebug(s"Received GetShuffleFileGroup request," +
s"shuffleId $shuffleId.")
handleGetReducerFileGroup(context, shuffleId)
-
- case pb: PbHeartbeatFromClient =>
- handleHeartbeatFromClient(context, pb)
}
private def offerAndReserveSlots(
@@ -568,17 +564,6 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
commitManager.handleGetReducerFileGroup(context, shuffleId)
}
- private def handleHeartbeatFromClient(
- context: RpcCallContext,
- pb: PbHeartbeatFromClient): Unit = {
- val shuffleIds = new util.ArrayList[Integer](pb.getShuffleIdList)
- val pred = new Predicate[Integer] {
- override def test(id: Integer): Boolean = registeredShuffle.contains(id)
- }
- shuffleIds.removeIf(pred)
- context.reply(HeartbeatFromClientResponse(shuffleIds))
- }
-
private def handleStageEnd(shuffleId: Int): Unit = {
// check whether shuffle has registered
if (!registeredShuffle.contains(shuffleId)) {
diff --git
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java
index f44ce32ba..7a7706973 100644
---
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java
+++
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java
@@ -86,7 +86,7 @@ public abstract class ShuffleClientBaseSuiteJ {
conf.set(CelebornConf.CLIENT_PUSH_RETRY_THREADS().key(), "1");
conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), "1K");
shuffleClient =
- new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new
UserIdentifier("mock", "mock"), false);
+ new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new
UserIdentifier("mock", "mock"));
primaryLocation.setPeer(replicaLocation);
when(endpointRef.askSync(
diff --git
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
index dcf035374..9901855a8 100644
--- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
@@ -172,7 +172,7 @@ public class ShuffleClientSuiteJ {
conf.set(CelebornConf.CLIENT_PUSH_RETRY_THREADS().key(), "1");
conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), "1K");
shuffleClient =
- new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new
UserIdentifier("mock", "mock"), false);
+ new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new
UserIdentifier("mock", "mock"));
primaryLocation.setPeer(replicaLocation);
when(endpointRef.askSync(any(), any(), any()))
diff --git
a/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala
b/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala
index 3d45e27de..aefef641d 100644
---
a/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala
+++
b/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala
@@ -151,7 +151,7 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
private def prepareService(): Unit = {
lifecycleManager = new LifecycleManager(APP, celebornConf)
- shuffleClient = new ShuffleClientImpl(APP, celebornConf, userIdentifier,
false)
+ shuffleClient = new ShuffleClientImpl(APP, celebornConf, userIdentifier)
shuffleClient.setupLifecycleManagerRef(lifecycleManager.self)
}
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 969520c5d..50dcdc7a1 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -69,8 +69,6 @@ enum MessageType {
REGISTER_MAP_PARTITION_TASK = 48;
HEARTBEAT_FROM_APPLICATION_RESPONSE = 49;
CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50;
- HEARTBEAT_FROM_CLIENT = 51;
- HEARTBEAT_FROM_CLIENT_RESPONSE = 52;
}
message PbStorageInfo {
@@ -175,14 +173,6 @@ message PbRegisterShuffleResponse {
repeated PbPartitionLocation partitionLocations = 2;
}
-message PbHeartbeatFromClient {
- repeated int32 shuffleId = 1;
-}
-
-message PbHeartbeatFromClientResponse {
- repeated int32 unknownShuffleId = 1;
-}
-
message PbRequestSlots {
string applicationId = 1;
int32 shuffleId = 2;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 507ff22f3..212dddd0d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -697,12 +697,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
+ def clientCheckedUseAllocatedWorkers: Boolean =
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
+ def clientExcludedWorkerExpireTimeout: Long =
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
- def clientExcludedWorkerExpireTimeout: Long =
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
- def clientCheckedUseAllocatedWorkers: Boolean =
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
- def clientHeartbeatToLifecycleManagerInterval: Long =
- get(CLIENT_HEARTBEAT_TO_LIFECYCLEMANAGER_INTERVAL)
// //////////////////////////////////////////////////////
// Shuffle Compression //
@@ -998,10 +996,6 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def testPushReplicaDataTimeout: Boolean =
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
-
- // //////////////////////////////////////////////////////
- // Flink //
- // //////////////////////////////////////////////////////
def clientFlinkMemoryPerResultPartitionMin: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
def clientFlinkMemoryPerResultPartition: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION)
def clientFlinkMemoryPerInputGateMin: Long =
get(CLIENT_MEMORY_PER_INPUT_GATE_MIN)
@@ -2657,15 +2651,6 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
- val CLIENT_HEARTBEAT_TO_LIFECYCLEMANAGER_INTERVAL: ConfigEntry[Long] =
- buildConf("celeborn.client.heartbeatToLifecycleManager.interval")
- .categories("client")
- .version("0.3.0")
- .doc(
- "Interval for client in Executor(for Spark) to send heartbeat message
to lifecycle manager.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("30s")
-
val TEST_CLIENT_RETRY_COMMIT_FILE: ConfigEntry[Boolean] =
buildConf("celeborn.test.client.retryCommitFiles")
.withAlternative("celeborn.test.retryCommitFiles")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index a7a098bed..a0ba251a8 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -244,23 +244,6 @@ object ControlMessages extends Logging {
}
}
- object HeartbeatFromClient {
- def apply(
- shuffleIds: util.Set[Integer]): PbHeartbeatFromClient = {
- PbHeartbeatFromClient.newBuilder()
- .addAllShuffleId(shuffleIds)
- .build()
- }
- }
-
- object HeartbeatFromClientResponse {
- def apply(shuflfeIds: util.List[Integer]): PbHeartbeatFromClientResponse =
{
- PbHeartbeatFromClientResponse.newBuilder()
- .addAllUnknownShuffleId(shuflfeIds)
- .build()
- }
- }
-
case class MapperEnd(
shuffleId: Int,
mapId: Int,
@@ -552,12 +535,6 @@ object ControlMessages extends Logging {
case pb: PbChangeLocationResponse =>
new TransportMessage(MessageType.CHANGE_LOCATION_RESPONSE,
pb.toByteArray)
- case pb: PbHeartbeatFromClient =>
- new TransportMessage(MessageType.HEARTBEAT_FROM_CLIENT, pb.toByteArray)
-
- case pb: PbHeartbeatFromClientResponse =>
- new TransportMessage(MessageType.HEARTBEAT_FROM_CLIENT_RESPONSE,
pb.toByteArray)
-
case MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId) =>
val payload = PbMapperEnd.newBuilder()
.setShuffleId(shuffleId)
@@ -907,12 +884,6 @@ object ControlMessages extends Logging {
case CHANGE_LOCATION_RESPONSE_VALUE =>
PbChangeLocationResponse.parseFrom(message.getPayload)
- case HEARTBEAT_FROM_CLIENT_VALUE =>
- PbHeartbeatFromClient.parseFrom(message.getPayload)
-
- case HEARTBEAT_FROM_CLIENT_RESPONSE_VALUE =>
- PbHeartbeatFromClientResponse.parseFrom(message.getPayload)
-
case MAPPER_END_VALUE =>
val pbMapperEnd = PbMapperEnd.parseFrom(message.getPayload)
MapperEnd(
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index a3d1cca42..94f5771c8 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -37,7 +37,6 @@ license: |
| celeborn.client.flink.resultPartition.memory | 64m | Memory reserved for a
result partition. | 0.3.0 |
| celeborn.client.flink.resultPartition.minMemory | 8m | Min memory reserved
for a result partition. | 0.3.0 |
| celeborn.client.flink.resultPartition.supportFloatingBuffer | true | Whether
to support floating buffer for result partitions. | 0.3.0 |
-| celeborn.client.heartbeatToLifecycleManager.interval | 30s | Interval for
client in Executor(for Spark) to send heartbeat message to lifecycle manager. |
0.3.0 |
| celeborn.client.push.buffer.initial.size | 8k | | 0.3.0 |
| celeborn.client.push.buffer.max.size | 64k | Max size of reducer partition
buffer memory for shuffle hash writer. The pushed data will be buffered in
memory before sending to Celeborn worker. For performance consideration keep
this buffer size higher than 32K. Example: If reducer amount is 2000, buffer
size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap
memory. | 0.3.0 |
| celeborn.client.push.excludeWorkerOnFailure.enabled | false | Whether to
enable shuffle client-side push exclude workers on failures. | 0.3.0 |
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
index 6f7680c28..4946a3dae 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
@@ -47,7 +47,7 @@ class ShuffleClientSuite extends WithShuffleClientSuite with
MiniClusterFeature
val lifecycleManager: LifecycleManager = new LifecycleManager(APP,
celebornConf)
val shuffleClient: ShuffleClientImpl = {
- val client = new ShuffleClientImpl(APP, celebornConf, userIdentifier,
false)
+ val client = new ShuffleClientImpl(APP, celebornConf, userIdentifier)
client.setupLifecycleManagerRef(lifecycleManager.self)
client
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
index 826ce5de4..af6142022 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
@@ -39,21 +39,21 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
test("celeborn spark heartbeat test - client <- worker") {
val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
val shuffleClientImpl =
- new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"),
false)
+ new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
testHeartbeatFromWorker2Client(shuffleClientImpl.getDataClientFactory)
}
test("celeborn spark heartbeat test - client <- worker on heartbeat") {
val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
val shuffleClientImpl =
- new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"),
false)
+ new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
testHeartbeatFromWorker2ClientWithNoHeartbeat(shuffleClientImpl.getDataClientFactory)
}
test("celeborn spark heartbeat test - client <- worker timeout") {
val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
val shuffleClientImpl =
- new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"),
false)
+ new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
testHeartbeatFromWorker2ClientWithCloseChannel(shuffleClientImpl.getDataClientFactory)
}
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
index c270152f9..3796975ae 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
@@ -61,8 +61,7 @@ trait ReadWriteTestBase extends AnyFunSuite
.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
.set("celeborn.data.io.numConnectionsPerPeer", "1")
val lifecycleManager = new LifecycleManager(APP, clientConf)
- val shuffleClient =
- new ShuffleClientImpl(APP, clientConf, UserIdentifier("mock", "mock"),
false)
+ val shuffleClient = new ShuffleClientImpl(APP, clientConf,
UserIdentifier("mock", "mock"))
shuffleClient.setupLifecycleManagerRef(lifecycleManager.self)
val STR1 = RandomStringUtils.random(1024)