This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 9dd6587d1 [CELEBORN-1912] Client should send heartbeat to worker for 
processing heartbeat to avoid reading idleness of worker which enables heartbeat
9dd6587d1 is described below

commit 9dd6587d151a77a53b1708211868dee2d122d34a
Author: SteNicholas <[email protected]>
AuthorDate: Thu May 8 10:09:50 2025 +0800

    [CELEBORN-1912] Client should send heartbeat to worker for processing 
heartbeat to avoid reading idleness of worker which enables heartbeat
    
    ### What changes were proposed in this pull request?
    
    Client should send heartbeat to worker for processing heartbeat to avoid 
reading idleness of worker which enables heartbeat.
    
    Follow up #1457.
    
    ### Why are the changes needed?
    
    In Flink batch jobs, the following exception is caused by closed connection:
    ```
    2025-04-27 23:30:28
    java.io.IOException: Client /:9093 is lost, notify related stream 
805472050177
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
            at 
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:145)
            at 
java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
            at 
org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:136)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:74)
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:141)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
            at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
            at 
org.apache.celeborn.common.network.client.ReconnectHandler.scheduleReconnect(ReconnectHandler.java:93)
            at 
org.apache.celeborn.common.network.client.ReconnectHandler.channelInactive(ReconnectHandler.java:63)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
            at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
            at 
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
            at 
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
            at 
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:207)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
            at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
            at 
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
            at 
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
            at 
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:991)
    ```
    The closed connection is caused by reading idleness of worker which enables 
heartbeat with troubleshooting via debug mode of log.
    ```
    2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE: MessageWithHeader 
[headerLength: 17, bodyLength: 26]
    2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
    2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ 38B
    2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ COMPLETE
    2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
    2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
    2025-04-27 23:32:31,823 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
    2025-04-27 23:32:31,824 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
    2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
    2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
    2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
    2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
    2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] CLOSE
    2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] INACTIVE
    2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 
0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] UNREGISTERED
    ```
    The reading idleness of worker which enables heartbeat is resulted via 
one-way heartbeat from worker to client, which only keeps the channel of client 
active. Client should handle heartbeat to keep the channel of worker active via 
sending heartbeat to worker.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `HeartbeatTest`
    
    Closes #3239 from SteNicholas/CELEBORN-1912.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/common/network/TransportContext.java  |  2 +-
 .../network/server/TransportChannelHandler.java    | 17 +++++++++--
 .../network/server/TransportRequestHandler.java    | 12 ++++++++
 .../common/network/util/TransportConf.java         |  4 +--
 .../org/apache/celeborn/common/CelebornConf.scala  | 19 +++++--------
 .../apache/celeborn/common/CelebornConfSuite.scala |  2 +-
 docs/configuration/network.md                      |  2 +-
 .../celeborn/tests/flink/HeartbeatTest.scala       |  4 +--
 .../celeborn/tests/spark/HeartbeatTest.scala       |  4 +--
 .../celeborn/service/deploy/HeartbeatFeature.scala | 33 +++++++++-------------
 10 files changed, 55 insertions(+), 44 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index e8a203be7..464e9d1e6 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -243,7 +243,7 @@ public class TransportContext implements Closeable {
         conf.connectionTimeoutMs(),
         closeIdleConnections,
         enableHeartbeat,
-        conf.clientHeartbeatInterval(),
+        conf.channelHeartbeatInterval(),
         this);
   }
 
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
index dc633cac8..2bd0549fb 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
@@ -113,7 +113,8 @@ public class TransportChannelHandler extends 
ChannelInboundHandlerAdapter {
           ctx.executor()
               .scheduleWithFixedDelay(
                   () -> {
-                    logger.debug("send heartbeat");
+                    logger.debug(
+                        "Send heartbeat to {}.", 
NettyUtils.getRemoteAddress(ctx.channel()));
                     ctx.writeAndFlush(new Heartbeat());
                   },
                   0,
@@ -152,8 +153,18 @@ public class TransportChannelHandler extends 
ChannelInboundHandlerAdapter {
 
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object request) throws 
Exception {
-    if (request instanceof RequestMessage && !(request instanceof Heartbeat)) {
-      requestHandler.handle((RequestMessage) request);
+    if (request instanceof RequestMessage) {
+      if (request instanceof Heartbeat) {
+        logger.debug("Received heartbeat from {}.", 
NettyUtils.getRemoteAddress(ctx.channel()));
+        if (!enableHeartbeat) {
+          // When heartbeat is disabled, we should still response to a 
heartbeat if peer has
+          // heartbeat enabled - to present reading idleness.
+          requestHandler.processHeartbeat();
+        }
+        ctx.fireChannelRead(request);
+      } else {
+        requestHandler.handle((RequestMessage) request);
+      }
     } else if (request instanceof ResponseMessage) {
       responseHandler.handle((ResponseMessage) request);
     } else {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
index 1b9403e06..96d2f074a 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
@@ -95,6 +95,18 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
     }
   }
 
+  protected void processHeartbeat() {
+    try {
+      logger.trace("Process heartbeat from {}.", 
NettyUtils.getRemoteAddress(channel));
+      respond(new Heartbeat());
+    } catch (Exception e) {
+      logger.error(
+          "Error while invoking handler#respond() on heartbeat from {}.",
+          NettyUtils.getRemoteAddress(channel),
+          e);
+    }
+  }
+
   private void processRpcRequest(final RpcRequest req) {
     try {
       logger.trace("Process rpc request {}", req.requestId);
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
 
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
index effafee43..973064451 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
@@ -156,8 +156,8 @@ public class TransportConf {
     return celebornConf.fetchDataTimeoutCheckInterval(module);
   }
 
-  public long clientHeartbeatInterval() {
-    return celebornConf.clientHeartbeatInterval(module);
+  public long channelHeartbeatInterval() {
+    return celebornConf.channelHeartbeatInterval(module);
   }
 
   /** Timeout for a single round trip of sasl message exchange, in 
milliseconds. */
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 59f380fc9..1fcb5fe35 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -620,7 +620,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(MAX_CHUNKS_BEING_TRANSFERRED)
   }
 
-  def clientHeartbeatInterval(module: String): Long = {
+  def channelHeartbeatInterval(module: String): Long = {
     getTransportConfTimeAsMs(module, CHANNEL_HEARTBEAT_INTERVAL)
   }
 
@@ -2302,19 +2302,14 @@ object CelebornConf extends Logging {
       .categories("network")
       .version("0.3.0")
       .doc("The heartbeat interval between worker and client. " +
-        s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`, 
" +
-        s"works for shuffle client. " +
-        s"If setting <module> to 
`${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
-        s"works for master or worker. " +
-        s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
-        s"it works for shuffle client push and fetch data. " +
-        s"If setting <module> to 
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
-        s"it works for replicate client of worker replicating data to peer 
worker. " +
+        s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
+        s"it works for worker receiving push data. " +
+        s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, " 
+
+        s"it works for worker fetch server. " +
         "If you are using the \"celeborn.client.heartbeat.interval\", " +
         "please use the new configs for each module according to your needs or 
" +
-        "replace it with \"celeborn.rpc.heartbeat.interval\", " +
-        "\"celeborn.data.heartbeat.interval\" and " +
-        "\"celeborn.replicate.heartbeat.interval\". ")
+        "replace it with \"celeborn.push.heartbeat.interval\" and " +
+        "\"celeborn.fetch.heartbeat.interval\". ")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("60s")
 
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 87a6a1773..ef0922f8c 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -361,7 +361,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     assert(transportTestNetworkIoRetryWait == 
conf.networkIoRetryWaitMs(module))
     assert(transportTestNetworkIoStorageMemoryMapThreshold == 
conf.networkIoMemoryMapBytes(module))
     assert(transportTestNetworkIoLazyFd == 
conf.networkIoLazyFileDescriptor(module))
-    assert(transportTestChannelHeartbeatInterval == 
conf.clientHeartbeatInterval(module))
+    assert(transportTestChannelHeartbeatInterval == 
conf.channelHeartbeatInterval(module))
     assert(transportTestPushTimeoutCheckThreads == 
conf.pushDataTimeoutCheckerThreads(module))
     assert(transportTestPushTimeoutCheckInterval == 
conf.pushDataTimeoutCheckInterval(module))
     assert(transportTestFetchTimeoutCheckThreads == 
conf.fetchDataTimeoutCheckerThreads(module))
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 199a9328f..b78c70ad6 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -21,7 +21,7 @@ license: |
 | --- | ------- | --------- | ----------- | ----- | ---------- |
 | celeborn.&lt;module&gt;.fetch.timeoutCheck.interval | 5s | false | Interval 
for checking fetch data timeout. It only support setting <module> to `data` 
since it works for shuffle client fetch data. | 0.3.0 |  | 
 | celeborn.&lt;module&gt;.fetch.timeoutCheck.threads | 4 | false | Threads num 
for checking fetch data timeout. It only support setting <module> to `data` 
since it works for shuffle client fetch data. | 0.3.0 |  | 
-| celeborn.&lt;module&gt;.heartbeat.interval | 60s | false | The heartbeat 
interval between worker and client. If setting <module> to `rpc_app`, works for 
shuffle client. If setting <module> to `rpc_service`, works for master or 
worker. If setting <module> to `data`, it works for shuffle client push and 
fetch data. If setting <module> to `replicate`, it works for replicate client 
of worker replicating data to peer worker. If you are using the 
"celeborn.client.heartbeat.interval", please  [...]
+| celeborn.&lt;module&gt;.heartbeat.interval | 60s | false | The heartbeat 
interval between worker and client. If setting <module> to `push`, it works for 
worker receiving push data. If setting <module> to `fetch`, it works for worker 
fetch server. If you are using the "celeborn.client.heartbeat.interval", please 
use the new configs for each module according to your needs or replace it with 
"celeborn.push.heartbeat.interval" and "celeborn.fetch.heartbeat.interval".  | 
0.3.0 | celeborn.cl [...]
 | celeborn.&lt;module&gt;.io.backLog | 0 | false | Requested maximum length of 
the queue of incoming connections. Default 0 for no backlog. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `push`, it 
works for worker receiving push data. If setting <module> to `replicate`, it 
works for replicate server of worker replicating data to peer worker. If 
setting <module> to `fetch`, it works for  [...]
 | celeborn.&lt;module&gt;.io.clientThreads | 0 | false | Number of threads 
used in the client thread pool. Default to 0, which is 2x#cores. If setting 
<module> to `rpc_app`, works for shuffle client. If setting <module> to 
`rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data. If setting <module> to 
`replicate`, it works for replicate client of worker replicating data to peer 
worker. |  |  | 
 | celeborn.&lt;module&gt;.io.conflictAvoidChooser.enable | false | false | 
Whether to use conflict avoid event executor chooser in the client thread pool. 
If setting <module> to `rpc_app`, works for shuffle client. If setting <module> 
to `rpc_service`, works for master or worker. If setting <module> to `data`, it 
works for shuffle client push and fetch data. If setting <module> to 
`replicate`, it works for replicate client of worker replicating data to peer 
worker. | 0.5.4 |  | 
diff --git 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
index 2c1008871..878ab0b76 100644
--- 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
+++ 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
@@ -61,7 +61,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
   }
 
   test("celeborn flink heartbeat test - client <- worker timeout") {
-    val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
+    val (_, clientConf) = 
getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
     val flinkShuffleClientImpl =
       new FlinkShuffleClientImpl(
         "",
@@ -73,6 +73,6 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
         -1) {
         override def setupLifecycleManagerRef(host: String, port: Int): Unit = 
{}
       }
-    
testHeartbeatFromWorker2ClientWithCloseChannel(flinkShuffleClientImpl.getDataClientFactory)
+    
testHeartbeatFromWorker2ClientWithWorkerTimeout(flinkShuffleClientImpl.getDataClientFactory)
   }
 }
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
index af6142022..8f12a563f 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
@@ -51,9 +51,9 @@ class HeartbeatTest extends AnyFunSuite with Logging with 
MiniClusterFeature wit
   }
 
   test("celeborn spark heartbeat test - client <- worker timeout") {
-    val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
+    val (_, clientConf) = 
getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
     val shuffleClientImpl =
       new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
-    
testHeartbeatFromWorker2ClientWithCloseChannel(shuffleClientImpl.getDataClientFactory)
+    
testHeartbeatFromWorker2ClientWithWorkerTimeout(shuffleClientImpl.getDataClientFactory)
   }
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
index dc5616064..45874db79 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
@@ -66,21 +66,19 @@ trait HeartbeatFeature extends MiniClusterFeature {
       "celeborn.fetch.heartbeat.interval" -> "4s")
     val clientConf = new CelebornConf()
     clientConf.set("celeborn.data.io.connectionTimeout", "6s")
-    clientConf.set("celeborn.data.heartbeat.interval", "3s")
     (workerConf, clientConf)
   }
 
   def testHeartbeatFromWorker2Client(dataClientFactory: 
TransportClientFactory): Unit = {
     val (workerConf, _) = getTestHeartbeatFromWorker2ClientConf
-    // client <- worker:default client do not send heartbeat to worker, and 
worker sends hearbeat to client
-    // client active: after connection timeout, the channel still be active
+    // client <- worker: Worker sends heartbeat to client to avoid client read 
idle, and client sends heartbeat after receiving heartbeat to avoid worker read 
idle.
+    // client active: After client connection timeout, the channel remains 
active because worker send heartbeat to client.
     testCore(
       workerConf,
       dataClientFactory,
       (pushClient, fetchClient) => {
         Thread.sleep(7 * 1000)
         Assert.assertTrue(fetchClient.isActive)
-        // because worker don't send heartbeat when pushdata, so client's 
channel is false
         Assert.assertTrue(pushClient.isActive)
       })
   }
@@ -100,23 +98,23 @@ trait HeartbeatFeature extends MiniClusterFeature {
       : Unit = {
     val (workerConf, _) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
 
-    // client <- worker:default client do not send heartbeat to worker, and 
worker sends hearbeat to client
-    // client active: after connection timeout, the channel still be active
+    // client <- worker: Worker sends heartbeat to client to avoid client read 
idle, and client sends heartbeat after receiving heartbeat to avoid worker read 
idle.
+    // client inactive: After client connection timeout, the channel is 
inactive.
     testCore(
       workerConf,
       dataClientFactory,
       (pushClient, fetchClient) => {
         Thread.sleep(7 * 1000)
         Assert.assertFalse(fetchClient.isActive)
-        // because worker don't send heartbeat when pushdata, so client's 
channel is false
         Assert.assertFalse(pushClient.isActive)
       })
   }
 
-  def getTestHeartbeatFromWorker2ClientWithCloseChannelConf: (Map[String, 
String], CelebornConf) = {
+  def getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
+      : (Map[String, String], CelebornConf) = {
     val workerConf = Map(
-      "celeborn.fetch.io.connectionTimeout" -> "9s",
-      "celeborn.push.io.connectionTimeout" -> "9s",
+      "celeborn.fetch.io.connectionTimeout" -> "3s",
+      "celeborn.push.io.connectionTimeout" -> "3s",
       "celeborn.push.heartbeat.interval" -> "4s",
       "celeborn.fetch.heartbeat.interval" -> "4s",
       "celeborn.worker.push.heartbeat.enabled" -> "true",
@@ -127,22 +125,17 @@ trait HeartbeatFeature extends MiniClusterFeature {
     (workerConf, clientConf)
   }
 
-  def testHeartbeatFromWorker2ClientWithCloseChannel(dataClientFactory: 
TransportClientFactory)
+  def testHeartbeatFromWorker2ClientWithWorkerTimeout(dataClientFactory: 
TransportClientFactory)
       : Unit = {
-    val (workerConf, _) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
+    val (workerConf, _) = 
getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
 
-    // client <- worker:default client do not send heartbeat to worker, and 
worker sends hearbeat to client
-    // client inactive: after client connection timeout, client still be 
active(because worker send heartbeat to client);
-    // but after worker connectionTimeout, worker will close the channel, then 
client will be inactive
+    // client <- worker: Worker sends heartbeat to client to avoid client read 
idle, and client sends heartbeat after receiving heartbeat to avoid worker read 
idle.
+    // client inactive: Before client connection timeout, the channel is 
inactive because worker closes heartbeat.
     testCore(
       workerConf,
       dataClientFactory,
       (pushClient, fetchClient) => {
-        Thread.sleep(7 * 1000)
-        Assert.assertTrue(fetchClient.isActive)
-        Assert.assertTrue(pushClient.isActive)
-
-        Thread.sleep(4 * 1000)
+        Thread.sleep(5 * 1000)
         Assert.assertFalse(fetchClient.isActive)
         Assert.assertFalse(pushClient.isActive)
       })

Reply via email to