This is an automated email from the ASF dual-hosted git repository.
zhongqiangchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 497b746f1 [CELEBORN-675] Fix decode heartbeat message
497b746f1 is described below
commit 497b746f1dd950a47fcef019d7afbc087cc4842b
Author: Shuang <[email protected]>
AuthorDate: Wed Jun 14 14:37:13 2023 +0800
[CELEBORN-675] Fix decode heartbeat message
### What changes were proposed in this pull request?
Give Heartbeat one byte message and skip this byte when decode.
### Why are the changes needed?
Heartbeat message may split in to two netty buffer, then the `empty
buffer` (which don't need actually, but need keep) be wrong removed, then
decodeNext would throw NPE. see
``` java
while (headerBuf.readableBytes() < HEADER_SIZE) {
ByteBuf next = buffers.getFirst();
int toRead = Math.min(next.readableBytes(), HEADER_SIZE -
headerBuf.readableBytes());
headerBuf.writeBytes(next, toRead);
if (!next.isReadable()) {
buffers.removeFirst().release();
}
}
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT & MANUAL
Closes #1589 from RexXiong/CELEBORN-675.
Authored-by: Shuang <[email protected]>
Signed-off-by: zhongqiang.czq <[email protected]>
(cherry picked from commit da853473302d4df793f3d5d60a211b948c1b9997)
Signed-off-by: zhongqiang.czq <[email protected]>
---
.../org/apache/celeborn/common/network/protocol/Heartbeat.java | 7 +++++--
.../apache/celeborn/common/network/util/TransportFrameDecoder.java | 6 ++----
.../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 4 ++--
docs/configuration/worker.md | 4 ++--
.../org/apache/celeborn/service/deploy/HeartbeatFeature.scala | 4 ++++
5 files changed, 15 insertions(+), 10 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
b/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
index 8554f3a91..b9d2c14cc 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
@@ -23,13 +23,16 @@ public class Heartbeat extends RequestMessage {
@Override
public int encodedLength() {
- return 0;
+ return 1; // only one byte for Heartbeat
}
@Override
- public void encode(ByteBuf buf) {}
+ public void encode(ByteBuf buf) {
+ buf.writeByte(0);
+ }
public static Message decode(ByteBuf buffer) {
+ buffer.skipBytes(1);
return new Heartbeat();
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
index a0a49d369..dec5db28b 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
@@ -99,9 +99,7 @@ public class TransportFrameDecoder extends
ChannelInboundHandlerAdapter implemen
bodySize = first.readInt();
nextFrameSize = msgSize + bodySize;
totalSize -= HEADER_SIZE;
- // The msgsize and bodysize of some messages e.g. HEARTBEAT are both 0,
so buffers can't be
- // released
- if (!first.isReadable() && nextFrameSize != 0) {
+ if (!first.isReadable()) {
buffers.removeFirst().release();
}
return nextFrameSize;
@@ -133,7 +131,7 @@ public class TransportFrameDecoder extends
ChannelInboundHandlerAdapter implemen
// Reset size for next frame.
nextFrameSize = UNKNOWN_FRAME_SIZE;
Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame:
%s", frameSize);
- Preconditions.checkArgument(frameSize >= 0, "Frame length should be >= 0:
%s", frameSize);
+ Preconditions.checkArgument(frameSize > 0, "Frame length should be
positive: %s", frameSize);
// If the first buffer holds the entire frame, return it.
int remaining = (int) frameSize;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index da334e859..a74699681 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2448,7 +2448,7 @@ object CelebornConf extends Logging {
.version("0.3.0")
.doc("enable the heartbeat from worker to client when pushing data")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
val WORKER_FETCH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.fetch.heartbeat.enabled")
@@ -2456,7 +2456,7 @@ object CelebornConf extends Logging {
.version("0.3.0")
.doc("enable the heartbeat from worker to client when fetching data")
.booleanConf
- .createWithDefault(true)
+ .createWithDefault(false)
val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 88736db5a..0e57dd5e1 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -36,7 +36,7 @@ license: |
| celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory
usage reaches this limit, the worker will stop to receive data from Celeborn
shuffle clients. | 0.2.0 |
| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory
usage reaches this limit, the worker will stop to receive replication data from
other workers. | 0.2.0 |
| celeborn.worker.directMemoryRatioToResume | 0.5 | If direct memory usage is
less than this limit, worker will resume. | 0.2.0 |
-| celeborn.worker.fetch.heartbeat.enabled | true | enable the heartbeat from
worker to client when fetching data | 0.3.0 |
+| celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from
worker to client when fetching data | 0.3.0 |
| celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread
number of worker to handle client fetch data. The default threads number is the
number of flush thread. | 0.2.0 |
| celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch
data request from ShuffleClient. | 0.2.0 |
| celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single
flusher. | 0.2.0 |
@@ -66,7 +66,7 @@ license: |
| celeborn.worker.partition.initial.readBuffersMax | 1024 | Max number of
initial read buffers | 0.3.0 |
| celeborn.worker.partition.initial.readBuffersMin | 1 | Min number of initial
read buffers | 0.3.0 |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio
of partition sorter's memory for sorting, when reserved memory is higher than
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
-| celeborn.worker.push.heartbeat.enabled | true | enable the heartbeat from
worker to client when pushing data | 0.3.0 |
+| celeborn.worker.push.heartbeat.enabled | false | enable the heartbeat from
worker to client when pushing data | 0.3.0 |
| celeborn.worker.push.io.threads | <undefined> | Netty IO thread number
of worker to handle client push data. The default threads number is the number
of flush thread. | 0.2.0 |
| celeborn.worker.push.port | 0 | Server port for Worker to receive push data
request from ShuffleClient. | 0.2.0 |
| celeborn.worker.readBuffer.allocationWait | 50ms | The time to wait when
buffer dispatcher can not allocate a buffer. | 0.3.0 |
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
index 13f409422..27112eef4 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
@@ -67,6 +67,8 @@ trait HeartbeatFeature extends MiniClusterFeature {
val workerConf = Map(
CelebornConf.MASTER_ENDPOINTS.key -> "localhost:9097",
"celeborn.push.heartbeat.interval" -> "4s",
+ "celeborn.worker.push.heartbeat.enabled" -> "true",
+ "celeborn.worker.fetch.heartbeat.enabled" -> "true",
"celeborn.fetch.heartbeat.interval" -> "4s")
val clientConf = new CelebornConf()
clientConf.set("celeborn.data.io.connectionTimeout", "6s")
@@ -127,6 +129,8 @@ trait HeartbeatFeature extends MiniClusterFeature {
"celeborn.push.io.connectionTimeout" -> "9s",
"celeborn.push.heartbeat.interval" -> "4s",
"celeborn.fetch.heartbeat.interval" -> "4s",
+ "celeborn.worker.push.heartbeat.enabled" -> "true",
+ "celeborn.worker.fetch.heartbeat.enabled" -> "true",
CelebornConf.WORKER_CLOSE_IDLE_CONNECTIONS.key -> "true")
val clientConf = new CelebornConf()
clientConf.set("celeborn.data.io.connectionTimeout", "6s")