This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 0db919403 Revert "[CELEBORN-798] Add heartbeat from client to 
LifecycleManager to clean…"
0db919403 is described below

commit 0db919403ef78a73512794b879d3be9f77322933
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Jul 19 15:08:45 2023 +0800

    Revert "[CELEBORN-798] Add heartbeat from client to LifecycleManager to 
clean…"
    
    This reverts commit e56a8a8bed9c0c162b863ca3e08adb1731e4b7c1.
---
 .../flink/readclient/FlinkShuffleClientImpl.java   |  2 +-
 .../shuffle/celeborn/SparkShuffleManager.java      |  6 ++--
 .../shuffle/celeborn/CelebornShuffleReader.scala   |  3 +-
 .../shuffle/celeborn/SparkShuffleManager.java      |  6 ++--
 .../shuffle/celeborn/CelebornShuffleReader.scala   |  3 +-
 .../org/apache/celeborn/client/ShuffleClient.java  |  7 ++---
 .../apache/celeborn/client/ShuffleClientImpl.java  | 32 ++++------------------
 .../apache/celeborn/client/LifecycleManager.scala  | 15 ----------
 .../celeborn/client/ShuffleClientBaseSuiteJ.java   |  2 +-
 .../celeborn/client/ShuffleClientSuiteJ.java       |  2 +-
 .../celeborn/client/WithShuffleClientSuite.scala   |  2 +-
 common/src/main/proto/TransportMessages.proto      | 10 -------
 .../org/apache/celeborn/common/CelebornConf.scala  | 19 ++-----------
 .../common/protocol/message/ControlMessages.scala  | 29 --------------------
 docs/configuration/client.md                       |  1 -
 .../celeborn/tests/client/ShuffleClientSuite.scala |  2 +-
 .../celeborn/tests/spark/HeartbeatTest.scala       |  6 ++--
 .../service/deploy/cluster/ReadWriteTestBase.scala |  3 +-
 18 files changed, 25 insertions(+), 125 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
index 99c345a2d..f1d5cc5b9 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
@@ -125,7 +125,7 @@ public class FlinkShuffleClientImpl extends 
ShuffleClientImpl {
       long driverTimestamp,
       CelebornConf conf,
       UserIdentifier userIdentifier) {
-    super(appUniqueId, conf, userIdentifier, false);
+    super(appUniqueId, conf, userIdentifier);
     String module = TransportModuleConstants.DATA_MODULE;
     TransportConf dataTransportConf =
         Utils.fromCelebornConf(conf, module, conf.getInt("celeborn." + module 
+ ".io.threads", 8));
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index a69a7a9d5..95641a6fc 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -109,8 +109,7 @@ public class SparkShuffleManager implements ShuffleManager {
                   lifecycleManager.getHost(),
                   lifecycleManager.getPort(),
                   celebornConf,
-                  lifecycleManager.getUserIdentifier(),
-                  true);
+                  lifecycleManager.getUserIdentifier());
         }
       }
     }
@@ -187,8 +186,7 @@ public class SparkShuffleManager implements ShuffleManager {
                 h.lifecycleManagerHost(),
                 h.lifecycleManagerPort(),
                 celebornConf,
-                h.userIdentifier(),
-                false);
+                h.userIdentifier());
         if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
           ExecutorService pushThread =
               celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 1927afb97..8d5c99e0a 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -44,8 +44,7 @@ class CelebornShuffleReader[K, C](
     handle.lifecycleManagerHost,
     handle.lifecycleManagerPort,
     conf,
-    handle.userIdentifier,
-    false)
+    handle.userIdentifier)
 
   override def read(): Iterator[Product2[K, C]] = {
 
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 815a1be7f..9695fd8ac 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -116,8 +116,7 @@ public class SparkShuffleManager implements ShuffleManager {
                   lifecycleManager.getHost(),
                   lifecycleManager.getPort(),
                   celebornConf,
-                  lifecycleManager.getUserIdentifier(),
-                  true);
+                  lifecycleManager.getUserIdentifier());
         }
       }
     }
@@ -198,8 +197,7 @@ public class SparkShuffleManager implements ShuffleManager {
                 h.lifecycleManagerHost(),
                 h.lifecycleManagerPort(),
                 celebornConf,
-                h.userIdentifier(),
-                false);
+                h.userIdentifier());
         if (ShuffleMode.SORT.equals(celebornConf.shuffleWriterMode())) {
           ExecutorService pushThread =
               celebornConf.clientPushSortPipelineEnabled() ? getPusherThread() 
: null;
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index d4c868a4b..5ce891907 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -46,8 +46,7 @@ class CelebornShuffleReader[K, C](
     handle.lifecycleManagerHost,
     handle.lifecycleManagerPort,
     conf,
-    handle.userIdentifier,
-    false)
+    handle.userIdentifier)
 
   override def read(): Iterator[Product2[K, C]] = {
 
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index d9e88e780..b77c82780 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -56,8 +56,7 @@ public abstract class ShuffleClient {
       String driverHost,
       int port,
       CelebornConf conf,
-      UserIdentifier userIdentifier,
-      boolean isDriver) {
+      UserIdentifier userIdentifier) {
     if (null == _instance || !initialized) {
       synchronized (ShuffleClient.class) {
         if (null == _instance) {
@@ -67,12 +66,12 @@ public abstract class ShuffleClient {
           // be
           // assigned. An Executor will only construct a ShuffleClient 
singleton once. At this time,
           // when communicating with LifecycleManager, it will cause a 
NullPointerException.
-          _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier, 
isDriver);
+          _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier);
           _instance.setupLifecycleManagerRef(driverHost, port);
           initialized = true;
         } else if (!initialized) {
           _instance.shutdown();
-          _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier, 
isDriver);
+          _instance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier);
           _instance.setupLifecycleManagerRef(driverHost, port);
           initialized = true;
         }
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index db4eb4f94..dada674cc 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -20,7 +20,10 @@ package org.apache.celeborn.client;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import scala.reflect.ClassTag$;
 
@@ -110,8 +113,6 @@ public class ShuffleClientImpl extends ShuffleClient {
 
   protected final String appUniqueId;
 
-  private ScheduledExecutorService heartbeater;
-
   private ThreadLocal<Compressor> compressorThreadLocal =
       new ThreadLocal<Compressor>() {
         @Override
@@ -153,8 +154,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   protected final Map<Integer, ReduceFileGroups> reduceFileGroupsMap =
       JavaUtils.newConcurrentHashMap();
 
-  public ShuffleClientImpl(
-      String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier, 
boolean isDriver) {
+  public ShuffleClientImpl(String appUniqueId, CelebornConf conf, 
UserIdentifier userIdentifier) {
     super();
     this.appUniqueId = appUniqueId;
     this.conf = conf;
@@ -193,25 +193,6 @@ public class ShuffleClientImpl extends ShuffleClient {
             "celeborn-shuffle-split", pushSplitPartitionThreads, 60);
     reviveManager = new ReviveManager(this, conf);
 
-    if (!isDriver) {
-      heartbeater =
-          ThreadUtils.newDaemonSingleThreadScheduledExecutor(
-              "celeborn-lifecyclemanager-heartbeater");
-      heartbeater.scheduleAtFixedRate(
-          () -> {
-            PbHeartbeatFromClientResponse resp =
-                lifecycleManagerRef.askSync(
-                    
HeartbeatFromClient$.MODULE$.apply(reducePartitionMap.keySet()),
-                    
ClassTag$.MODULE$.apply(PbHeartbeatFromClientResponse.class));
-            List<Integer> unknownShuffleIds = resp.getUnknownShuffleIdList();
-            for (int i = 0; i < unknownShuffleIds.size(); i++) {
-              unregisterShuffle(unknownShuffleIds.get(i), false);
-            }
-          },
-          conf.clientHeartbeatToLifecycleManagerInterval(),
-          conf.clientHeartbeatToLifecycleManagerInterval(),
-          TimeUnit.MILLISECONDS);
-    }
     logger.info("Created ShuffleClientImpl, appUniqueId: {}", appUniqueId);
   }
 
@@ -1652,9 +1633,6 @@ public class ShuffleClientImpl extends ShuffleClient {
     if (null != lifecycleManagerRef) {
       lifecycleManagerRef = null;
     }
-    if (null != heartbeater) {
-      heartbeater.shutdown();
-    }
     pushExcludedWorkers.clear();
     fetchExcludedWorkers.clear();
     logger.warn("Shuffle client has been shutdown!");
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 5ef1ed13a..3b291c538 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -20,7 +20,6 @@ package org.apache.celeborn.client
 import java.util
 import java.util.{function, List => JList}
 import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
-import java.util.function.Predicate
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -301,9 +300,6 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       logDebug(s"Received GetShuffleFileGroup request," +
         s"shuffleId $shuffleId.")
       handleGetReducerFileGroup(context, shuffleId)
-
-    case pb: PbHeartbeatFromClient =>
-      handleHeartbeatFromClient(context, pb)
   }
 
   private def offerAndReserveSlots(
@@ -568,17 +564,6 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     commitManager.handleGetReducerFileGroup(context, shuffleId)
   }
 
-  private def handleHeartbeatFromClient(
-      context: RpcCallContext,
-      pb: PbHeartbeatFromClient): Unit = {
-    val shuffleIds = new util.ArrayList[Integer](pb.getShuffleIdList)
-    val pred = new Predicate[Integer] {
-      override def test(id: Integer): Boolean = registeredShuffle.contains(id)
-    }
-    shuffleIds.removeIf(pred)
-    context.reply(HeartbeatFromClientResponse(shuffleIds))
-  }
-
   private def handleStageEnd(shuffleId: Int): Unit = {
     // check whether shuffle has registered
     if (!registeredShuffle.contains(shuffleId)) {
diff --git 
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java 
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java
index f44ce32ba..7a7706973 100644
--- 
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java
+++ 
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientBaseSuiteJ.java
@@ -86,7 +86,7 @@ public abstract class ShuffleClientBaseSuiteJ {
     conf.set(CelebornConf.CLIENT_PUSH_RETRY_THREADS().key(), "1");
     conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), "1K");
     shuffleClient =
-        new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"), false);
+        new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"));
     primaryLocation.setPeer(replicaLocation);
 
     when(endpointRef.askSync(
diff --git 
a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java 
b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
index dcf035374..9901855a8 100644
--- a/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
+++ b/client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java
@@ -172,7 +172,7 @@ public class ShuffleClientSuiteJ {
     conf.set(CelebornConf.CLIENT_PUSH_RETRY_THREADS().key(), "1");
     conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(), "1K");
     shuffleClient =
-        new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"), false);
+        new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new 
UserIdentifier("mock", "mock"));
 
     primaryLocation.setPeer(replicaLocation);
     when(endpointRef.askSync(any(), any(), any()))
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala 
b/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala
index 3d45e27de..aefef641d 100644
--- 
a/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala
+++ 
b/client/src/test/scala/org/apache/celeborn/client/WithShuffleClientSuite.scala
@@ -151,7 +151,7 @@ trait WithShuffleClientSuite extends CelebornFunSuite {
 
   private def prepareService(): Unit = {
     lifecycleManager = new LifecycleManager(APP, celebornConf)
-    shuffleClient = new ShuffleClientImpl(APP, celebornConf, userIdentifier, 
false)
+    shuffleClient = new ShuffleClientImpl(APP, celebornConf, userIdentifier)
     shuffleClient.setupLifecycleManagerRef(lifecycleManager.self)
   }
 
diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index 0bad6f43a..0d9b67bcb 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -69,8 +69,6 @@ enum MessageType {
   REGISTER_MAP_PARTITION_TASK = 48;
   HEARTBEAT_FROM_APPLICATION_RESPONSE = 49;
   CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50;
-  HEARTBEAT_FROM_CLIENT = 51;
-  HEARTBEAT_FROM_CLIENT_RESPONSE = 52;
 }
 
 message PbStorageInfo {
@@ -175,14 +173,6 @@ message PbRegisterShuffleResponse {
   repeated PbPartitionLocation partitionLocations = 2;
 }
 
-message PbHeartbeatFromClient {
-  repeated int32 shuffleId = 1;
-}
-
-message PbHeartbeatFromClientResponse {
-  repeated int32 unknownShuffleId = 1;
-}
-
 message PbRequestSlots {
   string applicationId = 1;
   int32 shuffleId = 2;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 56e1d3895..9aa927138 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -697,12 +697,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
   def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
   def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
+  def clientCheckedUseAllocatedWorkers: Boolean = 
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
+  def clientExcludedWorkerExpireTimeout: Long = 
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
   def clientExcludeReplicaOnFailureEnabled: Boolean =
     get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
-  def clientExcludedWorkerExpireTimeout: Long = 
get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
-  def clientCheckedUseAllocatedWorkers: Boolean = 
get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
-  def clientHeartbeatToLifecycleManagerInterval: Long =
-    get(CLIENT_HEARTBEAT_TO_LIFECYCLEMANAGER_INTERVAL)
 
   // //////////////////////////////////////////////////////
   //               Shuffle Compression                   //
@@ -998,10 +996,6 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def testPushReplicaDataTimeout: Boolean = 
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
   def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
   def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
-
-  // //////////////////////////////////////////////////////
-  //                      Flink                          //
-  // //////////////////////////////////////////////////////
   def clientFlinkMemoryPerResultPartitionMin: Long = 
get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
   def clientFlinkMemoryPerResultPartition: Long = 
get(CLIENT_MEMORY_PER_RESULT_PARTITION)
   def clientFlinkMemoryPerInputGateMin: Long = 
get(CLIENT_MEMORY_PER_INPUT_GATE_MIN)
@@ -2657,15 +2651,6 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  val CLIENT_HEARTBEAT_TO_LIFECYCLEMANAGER_INTERVAL: ConfigEntry[Long] =
-    buildConf("celeborn.client.heartbeatToLifecycleManager.interval")
-      .categories("client")
-      .version("0.3.0")
-      .doc(
-        "Interval for client in Executor(for Spark) to send heartbeat message 
to lifecycle manager.")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("30s")
-
   val TEST_CLIENT_RETRY_COMMIT_FILE: ConfigEntry[Boolean] =
     buildConf("celeborn.test.client.retryCommitFiles")
       .withAlternative("celeborn.test.retryCommitFiles")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index d99c8d3e2..1be450005 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -244,23 +244,6 @@ object ControlMessages extends Logging {
     }
   }
 
-  object HeartbeatFromClient {
-    def apply(
-        shuffleIds: util.Set[Integer]): PbHeartbeatFromClient = {
-      PbHeartbeatFromClient.newBuilder()
-        .addAllShuffleId(shuffleIds)
-        .build()
-    }
-  }
-
-  object HeartbeatFromClientResponse {
-    def apply(shuflfeIds: util.List[Integer]): PbHeartbeatFromClientResponse = 
{
-      PbHeartbeatFromClientResponse.newBuilder()
-        .addAllUnknownShuffleId(shuflfeIds)
-        .build()
-    }
-  }
-
   case class MapperEnd(
       shuffleId: Int,
       mapId: Int,
@@ -545,12 +528,6 @@ object ControlMessages extends Logging {
     case pb: PbChangeLocationResponse =>
       new TransportMessage(MessageType.CHANGE_LOCATION_RESPONSE, 
pb.toByteArray)
 
-    case pb: PbHeartbeatFromClient =>
-      new TransportMessage(MessageType.HEARTBEAT_FROM_CLIENT, pb.toByteArray)
-
-    case pb: PbHeartbeatFromClientResponse =>
-      new TransportMessage(MessageType.HEARTBEAT_FROM_CLIENT_RESPONSE, 
pb.toByteArray)
-
     case MapperEnd(shuffleId, mapId, attemptId, numMappers, partitionId) =>
       val payload = PbMapperEnd.newBuilder()
         .setShuffleId(shuffleId)
@@ -881,12 +858,6 @@ object ControlMessages extends Logging {
       case CHANGE_LOCATION_RESPONSE_VALUE =>
         PbChangeLocationResponse.parseFrom(message.getPayload)
 
-      case HEARTBEAT_FROM_CLIENT_VALUE =>
-        PbHeartbeatFromClient.parseFrom(message.getPayload)
-
-      case HEARTBEAT_FROM_CLIENT_RESPONSE_VALUE =>
-        PbHeartbeatFromClientResponse.parseFrom(message.getPayload)
-
       case MAPPER_END_VALUE =>
         val pbMapperEnd = PbMapperEnd.parseFrom(message.getPayload)
         MapperEnd(
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index df6799e33..0fac3020a 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -37,7 +37,6 @@ license: |
 | celeborn.client.flink.resultPartition.memory | 64m | Memory reserved for a 
result partition. | 0.3.0 | 
 | celeborn.client.flink.resultPartition.minMemory | 8m | Min memory reserved 
for a result partition. | 0.3.0 | 
 | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | Whether 
to support floating buffer for result partitions. | 0.3.0 | 
-| celeborn.client.heartbeatToLifecycleManager.interval | 30s | Interval for 
client in Executor(for Spark) to send heartbeat message to lifecycle manager. | 
0.3.0 | 
 | celeborn.client.push.buffer.initial.size | 8k |  | 0.3.0 | 
 | celeborn.client.push.buffer.max.size | 64k | Max size of reducer partition 
buffer memory for shuffle hash writer. The pushed data will be buffered in 
memory before sending to Celeborn worker. For performance consideration keep 
this buffer size higher than 32K. Example: If reducer amount is 2000, buffer 
size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap 
memory. | 0.3.0 | 
 | celeborn.client.push.excludeWorkerOnFailure.enabled | false | Whether to 
enable shuffle client-side push exclude workers on failures. | 0.3.0 | 
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
index 6f7680c28..4946a3dae 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ShuffleClientSuite.scala
@@ -47,7 +47,7 @@ class ShuffleClientSuite extends WithShuffleClientSuite with 
MiniClusterFeature
 
     val lifecycleManager: LifecycleManager = new LifecycleManager(APP, 
celebornConf)
     val shuffleClient: ShuffleClientImpl = {
-      val client = new ShuffleClientImpl(APP, celebornConf, userIdentifier, 
false)
+      val client = new ShuffleClientImpl(APP, celebornConf, userIdentifier)
       client.setupLifecycleManagerRef(lifecycleManager.self)
       client
     }
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
index 826ce5de4..af6142022 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
@@ -39,21 +39,21 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
   test("celeborn spark heartbeat test - client <- worker") {
     val (_, clientConf) = getTestHeartbeatFromWorker2ClientConf
     val shuffleClientImpl =
-      new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"), 
false)
+      new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
     testHeartbeatFromWorker2Client(shuffleClientImpl.getDataClientFactory)
   }
 
   test("celeborn spark heartbeat test - client <- worker on heartbeat") {
     val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
     val shuffleClientImpl =
-      new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"), 
false)
+      new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
     
testHeartbeatFromWorker2ClientWithNoHeartbeat(shuffleClientImpl.getDataClientFactory)
   }
 
   test("celeborn spark heartbeat test - client <- worker timeout") {
     val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
     val shuffleClientImpl =
-      new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"), 
false)
+      new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
     
testHeartbeatFromWorker2ClientWithCloseChannel(shuffleClientImpl.getDataClientFactory)
   }
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
index c270152f9..3796975ae 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/cluster/ReadWriteTestBase.scala
@@ -61,8 +61,7 @@ trait ReadWriteTestBase extends AnyFunSuite
       .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
       .set("celeborn.data.io.numConnectionsPerPeer", "1")
     val lifecycleManager = new LifecycleManager(APP, clientConf)
-    val shuffleClient =
-      new ShuffleClientImpl(APP, clientConf, UserIdentifier("mock", "mock"), 
false)
+    val shuffleClient = new ShuffleClientImpl(APP, clientConf, 
UserIdentifier("mock", "mock"))
     shuffleClient.setupLifecycleManagerRef(lifecycleManager.self)
 
     val STR1 = RandomStringUtils.random(1024)

Reply via email to