This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9dd6587d1 [CELEBORN-1912] Client should send heartbeat to worker for
processing heartbeat to avoid reading idleness of worker which enables heartbeat
9dd6587d1 is described below
commit 9dd6587d151a77a53b1708211868dee2d122d34a
Author: SteNicholas <[email protected]>
AuthorDate: Thu May 8 10:09:50 2025 +0800
[CELEBORN-1912] Client should send heartbeat to worker for processing
heartbeat to avoid reading idleness of worker which enables heartbeat
### What changes were proposed in this pull request?
Client should send heartbeat to worker for processing heartbeat to avoid
reading idleness of worker which enables heartbeat.
Follow up #1457.
### Why are the changes needed?
In Flink batch jobs, the following exception is caused by closed connection:
```
2025-04-27 23:30:28
java.io.IOException: Client /:9093 is lost, notify related stream
805472050177
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
at
org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:145)
at
java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
at
org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:136)
at
org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:74)
at
org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:141)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at
org.apache.celeborn.common.network.client.ReconnectHandler.scheduleReconnect(ReconnectHandler.java:93)
at
org.apache.celeborn.common.network.client.ReconnectHandler.channelInactive(ReconnectHandler.java:63)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at
org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at
org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at
org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:207)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at
org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at
org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at
org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:991)
```
The closed connection is caused by reading idleness of worker which enables
heartbeat with troubleshooting via debug mode of log.
```
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE: MessageWithHeader
[headerLength: 17, bodyLength: 26]
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ 38B
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ COMPLETE
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:32:31,823 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:32:31,824 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] CLOSE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] INACTIVE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id:
0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] UNREGISTERED
```
The reading idleness of worker which enables heartbeat is resulted via
one-way heartbeat from worker to client, which only keeps the channel of client
active. Client should handle heartbeat to keep the channel of worker active via
sending heartbeat to worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`HeartbeatTest`
Closes #3239 from SteNicholas/CELEBORN-1912.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/common/network/TransportContext.java | 2 +-
.../network/server/TransportChannelHandler.java | 17 +++++++++--
.../network/server/TransportRequestHandler.java | 12 ++++++++
.../common/network/util/TransportConf.java | 4 +--
.../org/apache/celeborn/common/CelebornConf.scala | 19 +++++--------
.../apache/celeborn/common/CelebornConfSuite.scala | 2 +-
docs/configuration/network.md | 2 +-
.../celeborn/tests/flink/HeartbeatTest.scala | 4 +--
.../celeborn/tests/spark/HeartbeatTest.scala | 4 +--
.../celeborn/service/deploy/HeartbeatFeature.scala | 33 +++++++++-------------
10 files changed, 55 insertions(+), 44 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index e8a203be7..464e9d1e6 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -243,7 +243,7 @@ public class TransportContext implements Closeable {
conf.connectionTimeoutMs(),
closeIdleConnections,
enableHeartbeat,
- conf.clientHeartbeatInterval(),
+ conf.channelHeartbeatInterval(),
this);
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
index dc633cac8..2bd0549fb 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
@@ -113,7 +113,8 @@ public class TransportChannelHandler extends
ChannelInboundHandlerAdapter {
ctx.executor()
.scheduleWithFixedDelay(
() -> {
- logger.debug("send heartbeat");
+ logger.debug(
+ "Send heartbeat to {}.",
NettyUtils.getRemoteAddress(ctx.channel()));
ctx.writeAndFlush(new Heartbeat());
},
0,
@@ -152,8 +153,18 @@ public class TransportChannelHandler extends
ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws
Exception {
- if (request instanceof RequestMessage && !(request instanceof Heartbeat)) {
- requestHandler.handle((RequestMessage) request);
+ if (request instanceof RequestMessage) {
+ if (request instanceof Heartbeat) {
+ logger.debug("Received heartbeat from {}.",
NettyUtils.getRemoteAddress(ctx.channel()));
+ if (!enableHeartbeat) {
+ // When heartbeat is disabled, we should still response to a
heartbeat if peer has
+ // heartbeat enabled - to present reading idleness.
+ requestHandler.processHeartbeat();
+ }
+ ctx.fireChannelRead(request);
+ } else {
+ requestHandler.handle((RequestMessage) request);
+ }
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
index 1b9403e06..96d2f074a 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
@@ -95,6 +95,18 @@ public class TransportRequestHandler extends
MessageHandler<RequestMessage> {
}
}
+ protected void processHeartbeat() {
+ try {
+ logger.trace("Process heartbeat from {}.",
NettyUtils.getRemoteAddress(channel));
+ respond(new Heartbeat());
+ } catch (Exception e) {
+ logger.error(
+ "Error while invoking handler#respond() on heartbeat from {}.",
+ NettyUtils.getRemoteAddress(channel),
+ e);
+ }
+ }
+
private void processRpcRequest(final RpcRequest req) {
try {
logger.trace("Process rpc request {}", req.requestId);
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
index effafee43..973064451 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java
@@ -156,8 +156,8 @@ public class TransportConf {
return celebornConf.fetchDataTimeoutCheckInterval(module);
}
- public long clientHeartbeatInterval() {
- return celebornConf.clientHeartbeatInterval(module);
+ public long channelHeartbeatInterval() {
+ return celebornConf.channelHeartbeatInterval(module);
}
/** Timeout for a single round trip of sasl message exchange, in
milliseconds. */
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 59f380fc9..1fcb5fe35 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -620,7 +620,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(MAX_CHUNKS_BEING_TRANSFERRED)
}
- def clientHeartbeatInterval(module: String): Long = {
+ def channelHeartbeatInterval(module: String): Long = {
getTransportConfTimeAsMs(module, CHANNEL_HEARTBEAT_INTERVAL)
}
@@ -2302,19 +2302,14 @@ object CelebornConf extends Logging {
.categories("network")
.version("0.3.0")
.doc("The heartbeat interval between worker and client. " +
- s"If setting <module> to `${TransportModuleConstants.RPC_APP_MODULE}`,
" +
- s"works for shuffle client. " +
- s"If setting <module> to
`${TransportModuleConstants.RPC_SERVICE_MODULE}`, " +
- s"works for master or worker. " +
- s"If setting <module> to `${TransportModuleConstants.DATA_MODULE}`, " +
- s"it works for shuffle client push and fetch data. " +
- s"If setting <module> to
`${TransportModuleConstants.REPLICATE_MODULE}`, " +
- s"it works for replicate client of worker replicating data to peer
worker. " +
+ s"If setting <module> to `${TransportModuleConstants.PUSH_MODULE}`, " +
+ s"it works for worker receiving push data. " +
+ s"If setting <module> to `${TransportModuleConstants.FETCH_MODULE}`, "
+
+ s"it works for worker fetch server. " +
"If you are using the \"celeborn.client.heartbeat.interval\", " +
"please use the new configs for each module according to your needs or
" +
- "replace it with \"celeborn.rpc.heartbeat.interval\", " +
- "\"celeborn.data.heartbeat.interval\" and " +
- "\"celeborn.replicate.heartbeat.interval\". ")
+ "replace it with \"celeborn.push.heartbeat.interval\" and " +
+ "\"celeborn.fetch.heartbeat.interval\". ")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 87a6a1773..ef0922f8c 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -361,7 +361,7 @@ class CelebornConfSuite extends CelebornFunSuite {
assert(transportTestNetworkIoRetryWait ==
conf.networkIoRetryWaitMs(module))
assert(transportTestNetworkIoStorageMemoryMapThreshold ==
conf.networkIoMemoryMapBytes(module))
assert(transportTestNetworkIoLazyFd ==
conf.networkIoLazyFileDescriptor(module))
- assert(transportTestChannelHeartbeatInterval ==
conf.clientHeartbeatInterval(module))
+ assert(transportTestChannelHeartbeatInterval ==
conf.channelHeartbeatInterval(module))
assert(transportTestPushTimeoutCheckThreads ==
conf.pushDataTimeoutCheckerThreads(module))
assert(transportTestPushTimeoutCheckInterval ==
conf.pushDataTimeoutCheckInterval(module))
assert(transportTestFetchTimeoutCheckThreads ==
conf.fetchDataTimeoutCheckerThreads(module))
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index 199a9328f..b78c70ad6 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -21,7 +21,7 @@ license: |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.<module>.fetch.timeoutCheck.interval | 5s | false | Interval
for checking fetch data timeout. It only support setting <module> to `data`
since it works for shuffle client fetch data. | 0.3.0 | |
| celeborn.<module>.fetch.timeoutCheck.threads | 4 | false | Threads num
for checking fetch data timeout. It only support setting <module> to `data`
since it works for shuffle client fetch data. | 0.3.0 | |
-| celeborn.<module>.heartbeat.interval | 60s | false | The heartbeat
interval between worker and client. If setting <module> to `rpc_app`, works for
shuffle client. If setting <module> to `rpc_service`, works for master or
worker. If setting <module> to `data`, it works for shuffle client push and
fetch data. If setting <module> to `replicate`, it works for replicate client
of worker replicating data to peer worker. If you are using the
"celeborn.client.heartbeat.interval", please [...]
+| celeborn.<module>.heartbeat.interval | 60s | false | The heartbeat
interval between worker and client. If setting <module> to `push`, it works for
worker receiving push data. If setting <module> to `fetch`, it works for worker
fetch server. If you are using the "celeborn.client.heartbeat.interval", please
use the new configs for each module according to your needs or replace it with
"celeborn.push.heartbeat.interval" and "celeborn.fetch.heartbeat.interval". |
0.3.0 | celeborn.cl [...]
| celeborn.<module>.io.backLog | 0 | false | Requested maximum length of
the queue of incoming connections. Default 0 for no backlog. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `push`, it
works for worker receiving push data. If setting <module> to `replicate`, it
works for replicate server of worker replicating data to peer worker. If
setting <module> to `fetch`, it works for [...]
| celeborn.<module>.io.clientThreads | 0 | false | Number of threads
used in the client thread pool. Default to 0, which is 2x#cores. If setting
<module> to `rpc_app`, works for shuffle client. If setting <module> to
`rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data. If setting <module> to
`replicate`, it works for replicate client of worker replicating data to peer
worker. | | |
| celeborn.<module>.io.conflictAvoidChooser.enable | false | false |
Whether to use conflict avoid event executor chooser in the client thread pool.
If setting <module> to `rpc_app`, works for shuffle client. If setting <module>
to `rpc_service`, works for master or worker. If setting <module> to `data`, it
works for shuffle client push and fetch data. If setting <module> to
`replicate`, it works for replicate client of worker replicating data to peer
worker. | 0.5.4 | |
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
index 2c1008871..878ab0b76 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HeartbeatTest.scala
@@ -61,7 +61,7 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
}
test("celeborn flink heartbeat test - client <- worker timeout") {
- val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
+ val (_, clientConf) =
getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
val flinkShuffleClientImpl =
new FlinkShuffleClientImpl(
"",
@@ -73,6 +73,6 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
-1) {
override def setupLifecycleManagerRef(host: String, port: Int): Unit =
{}
}
-
testHeartbeatFromWorker2ClientWithCloseChannel(flinkShuffleClientImpl.getDataClientFactory)
+
testHeartbeatFromWorker2ClientWithWorkerTimeout(flinkShuffleClientImpl.getDataClientFactory)
}
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
index af6142022..8f12a563f 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/HeartbeatTest.scala
@@ -51,9 +51,9 @@ class HeartbeatTest extends AnyFunSuite with Logging with
MiniClusterFeature wit
}
test("celeborn spark heartbeat test - client <- worker timeout") {
- val (_, clientConf) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
+ val (_, clientConf) =
getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
val shuffleClientImpl =
new ShuffleClientImpl("APP", clientConf, new UserIdentifier("1", "1"))
-
testHeartbeatFromWorker2ClientWithCloseChannel(shuffleClientImpl.getDataClientFactory)
+
testHeartbeatFromWorker2ClientWithWorkerTimeout(shuffleClientImpl.getDataClientFactory)
}
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
index dc5616064..45874db79 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
@@ -66,21 +66,19 @@ trait HeartbeatFeature extends MiniClusterFeature {
"celeborn.fetch.heartbeat.interval" -> "4s")
val clientConf = new CelebornConf()
clientConf.set("celeborn.data.io.connectionTimeout", "6s")
- clientConf.set("celeborn.data.heartbeat.interval", "3s")
(workerConf, clientConf)
}
def testHeartbeatFromWorker2Client(dataClientFactory:
TransportClientFactory): Unit = {
val (workerConf, _) = getTestHeartbeatFromWorker2ClientConf
- // client <- worker:default client do not send heartbeat to worker, and
worker sends hearbeat to client
- // client active: after connection timeout, the channel still be active
+ // client <- worker: Worker sends heartbeat to client to avoid client read
idle, and client sends heartbeat after receiving heartbeat to avoid worker read
idle.
+ // client active: After client connection timeout, the channel remains
active because worker send heartbeat to client.
testCore(
workerConf,
dataClientFactory,
(pushClient, fetchClient) => {
Thread.sleep(7 * 1000)
Assert.assertTrue(fetchClient.isActive)
- // because worker don't send heartbeat when pushdata, so client's
channel is false
Assert.assertTrue(pushClient.isActive)
})
}
@@ -100,23 +98,23 @@ trait HeartbeatFeature extends MiniClusterFeature {
: Unit = {
val (workerConf, _) = getTestHeartbeatFromWorker2ClientWithNoHeartbeatConf
- // client <- worker:default client do not send heartbeat to worker, and
worker sends hearbeat to client
- // client active: after connection timeout, the channel still be active
+ // client <- worker: Worker sends heartbeat to client to avoid client read
idle, and client sends heartbeat after receiving heartbeat to avoid worker read
idle.
+ // client inactive: After client connection timeout, the channel is
inactive.
testCore(
workerConf,
dataClientFactory,
(pushClient, fetchClient) => {
Thread.sleep(7 * 1000)
Assert.assertFalse(fetchClient.isActive)
- // because worker don't send heartbeat when pushdata, so client's
channel is false
Assert.assertFalse(pushClient.isActive)
})
}
- def getTestHeartbeatFromWorker2ClientWithCloseChannelConf: (Map[String,
String], CelebornConf) = {
+ def getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
+ : (Map[String, String], CelebornConf) = {
val workerConf = Map(
- "celeborn.fetch.io.connectionTimeout" -> "9s",
- "celeborn.push.io.connectionTimeout" -> "9s",
+ "celeborn.fetch.io.connectionTimeout" -> "3s",
+ "celeborn.push.io.connectionTimeout" -> "3s",
"celeborn.push.heartbeat.interval" -> "4s",
"celeborn.fetch.heartbeat.interval" -> "4s",
"celeborn.worker.push.heartbeat.enabled" -> "true",
@@ -127,22 +125,17 @@ trait HeartbeatFeature extends MiniClusterFeature {
(workerConf, clientConf)
}
- def testHeartbeatFromWorker2ClientWithCloseChannel(dataClientFactory:
TransportClientFactory)
+ def testHeartbeatFromWorker2ClientWithWorkerTimeout(dataClientFactory:
TransportClientFactory)
: Unit = {
- val (workerConf, _) = getTestHeartbeatFromWorker2ClientWithCloseChannelConf
+ val (workerConf, _) =
getTestHeartbeatFromWorker2ClientWithWorkerTimeoutConf
- // client <- worker:default client do not send heartbeat to worker, and
worker sends hearbeat to client
- // client inactive: after client connection timeout, client still be
active(because worker send heartbeat to client);
- // but after worker connectionTimeout, worker will close the channel, then
client will be inactive
+ // client <- worker: Worker sends heartbeat to client to avoid client read
idle, and client sends heartbeat after receiving heartbeat to avoid worker read
idle.
+ // client inactive: Before client connection timeout, the channel is
inactive because worker closes heartbeat.
testCore(
workerConf,
dataClientFactory,
(pushClient, fetchClient) => {
- Thread.sleep(7 * 1000)
- Assert.assertTrue(fetchClient.isActive)
- Assert.assertTrue(pushClient.isActive)
-
- Thread.sleep(4 * 1000)
+ Thread.sleep(5 * 1000)
Assert.assertFalse(fetchClient.isActive)
Assert.assertFalse(pushClient.isActive)
})