This is an automated email from the ASF dual-hosted git repository.

zhongqiangchen pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 497b746f1 [CELEBORN-675] Fix decode heartbeat message
497b746f1 is described below

commit 497b746f1dd950a47fcef019d7afbc087cc4842b
Author: Shuang <[email protected]>
AuthorDate: Wed Jun 14 14:37:13 2023 +0800

    [CELEBORN-675] Fix decode heartbeat message
    
    ### What changes were proposed in this pull request?
    Give Heartbeat one byte message and skip this byte when decode.
    
    ### Why are the changes needed?
    Heartbeat message may split in to two netty buffer,  then the `empty 
buffer` (which don't need actually, but need keep) be wrong removed, then 
decodeNext would throw NPE. see
    ```  java
    while (headerBuf.readableBytes() < HEADER_SIZE) {
          ByteBuf next = buffers.getFirst();
          int toRead = Math.min(next.readableBytes(), HEADER_SIZE - 
headerBuf.readableBytes());
          headerBuf.writeBytes(next, toRead);
          if (!next.isReadable()) {
            buffers.removeFirst().release();
          }
        }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT & MANUAL
    
    Closes #1589 from RexXiong/CELEBORN-675.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: zhongqiang.czq <[email protected]>
    (cherry picked from commit da853473302d4df793f3d5d60a211b948c1b9997)
    Signed-off-by: zhongqiang.czq <[email protected]>
---
 .../org/apache/celeborn/common/network/protocol/Heartbeat.java     | 7 +++++--
 .../apache/celeborn/common/network/util/TransportFrameDecoder.java | 6 ++----
 .../src/main/scala/org/apache/celeborn/common/CelebornConf.scala   | 4 ++--
 docs/configuration/worker.md                                       | 4 ++--
 .../org/apache/celeborn/service/deploy/HeartbeatFeature.scala      | 4 ++++
 5 files changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
 
b/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
index 8554f3a91..b9d2c14cc 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/protocol/Heartbeat.java
@@ -23,13 +23,16 @@ public class Heartbeat extends RequestMessage {
 
   @Override
   public int encodedLength() {
-    return 0;
+    return 1; // only one byte for Heartbeat
   }
 
   @Override
-  public void encode(ByteBuf buf) {}
+  public void encode(ByteBuf buf) {
+    buf.writeByte(0);
+  }
 
   public static Message decode(ByteBuf buffer) {
+    buffer.skipBytes(1);
     return new Heartbeat();
   }
 
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
 
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
index a0a49d369..dec5db28b 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/util/TransportFrameDecoder.java
@@ -99,9 +99,7 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter implemen
       bodySize = first.readInt();
       nextFrameSize = msgSize + bodySize;
       totalSize -= HEADER_SIZE;
-      // The msgsize and bodysize of some messages e.g. HEARTBEAT are both 0, 
so buffers can't be
-      // released
-      if (!first.isReadable() && nextFrameSize != 0) {
+      if (!first.isReadable()) {
         buffers.removeFirst().release();
       }
       return nextFrameSize;
@@ -133,7 +131,7 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter implemen
     // Reset size for next frame.
     nextFrameSize = UNKNOWN_FRAME_SIZE;
     Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: 
%s", frameSize);
-    Preconditions.checkArgument(frameSize >= 0, "Frame length should be >= 0: 
%s", frameSize);
+    Preconditions.checkArgument(frameSize > 0, "Frame length should be 
positive: %s", frameSize);
 
     // If the first buffer holds the entire frame, return it.
     int remaining = (int) frameSize;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index da334e859..a74699681 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2448,7 +2448,7 @@ object CelebornConf extends Logging {
       .version("0.3.0")
       .doc("enable the heartbeat from worker to client when pushing data")
       .booleanConf
-      .createWithDefault(true)
+      .createWithDefault(false)
 
   val WORKER_FETCH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.worker.fetch.heartbeat.enabled")
@@ -2456,7 +2456,7 @@ object CelebornConf extends Logging {
       .version("0.3.0")
       .doc("enable the heartbeat from worker to client when fetching data")
       .booleanConf
-      .createWithDefault(true)
+      .createWithDefault(false)
 
   val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
     buildConf("celeborn.client.application.heartbeatInterval")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 88736db5a..0e57dd5e1 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -36,7 +36,7 @@ license: |
 | celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory 
usage reaches this limit, the worker will stop to receive data from Celeborn 
shuffle clients. | 0.2.0 | 
 | celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory 
usage reaches this limit, the worker will stop to receive replication data from 
other workers. | 0.2.0 | 
 | celeborn.worker.directMemoryRatioToResume | 0.5 | If direct memory usage is 
less than this limit, worker will resume. | 0.2.0 | 
-| celeborn.worker.fetch.heartbeat.enabled | true | enable the heartbeat from 
worker to client when fetching data | 0.3.0 | 
+| celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from 
worker to client when fetching data | 0.3.0 | 
 | celeborn.worker.fetch.io.threads | &lt;undefined&gt; | Netty IO thread 
number of worker to handle client fetch data. The default threads number is the 
number of flush thread. | 0.2.0 | 
 | celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch 
data request from ShuffleClient. | 0.2.0 | 
 | celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single 
flusher. | 0.2.0 | 
@@ -66,7 +66,7 @@ license: |
 | celeborn.worker.partition.initial.readBuffersMax | 1024 | Max number of 
initial read buffers | 0.3.0 | 
 | celeborn.worker.partition.initial.readBuffersMin | 1 | Min number of initial 
read buffers | 0.3.0 | 
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio 
of partition sorter's memory for sorting, when reserved memory is higher than 
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
-| celeborn.worker.push.heartbeat.enabled | true | enable the heartbeat from 
worker to client when pushing data | 0.3.0 | 
+| celeborn.worker.push.heartbeat.enabled | false | enable the heartbeat from 
worker to client when pushing data | 0.3.0 | 
 | celeborn.worker.push.io.threads | &lt;undefined&gt; | Netty IO thread number 
of worker to handle client push data. The default threads number is the number 
of flush thread. | 0.2.0 | 
 | celeborn.worker.push.port | 0 | Server port for Worker to receive push data 
request from ShuffleClient. | 0.2.0 | 
 | celeborn.worker.readBuffer.allocationWait | 50ms | The time to wait when 
buffer dispatcher can not allocate a buffer. | 0.3.0 | 
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
index 13f409422..27112eef4 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/HeartbeatFeature.scala
@@ -67,6 +67,8 @@ trait HeartbeatFeature extends MiniClusterFeature {
     val workerConf = Map(
       CelebornConf.MASTER_ENDPOINTS.key -> "localhost:9097",
       "celeborn.push.heartbeat.interval" -> "4s",
+      "celeborn.worker.push.heartbeat.enabled" -> "true",
+      "celeborn.worker.fetch.heartbeat.enabled" -> "true",
       "celeborn.fetch.heartbeat.interval" -> "4s")
     val clientConf = new CelebornConf()
     clientConf.set("celeborn.data.io.connectionTimeout", "6s")
@@ -127,6 +129,8 @@ trait HeartbeatFeature extends MiniClusterFeature {
       "celeborn.push.io.connectionTimeout" -> "9s",
       "celeborn.push.heartbeat.interval" -> "4s",
       "celeborn.fetch.heartbeat.interval" -> "4s",
+      "celeborn.worker.push.heartbeat.enabled" -> "true",
+      "celeborn.worker.fetch.heartbeat.enabled" -> "true",
       CelebornConf.WORKER_CLOSE_IDLE_CONNECTIONS.key -> "true")
     val clientConf = new CelebornConf()
     clientConf.set("celeborn.data.io.connectionTimeout", "6s")

Reply via email to