This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new ca2be1c63 [CELEBORN-777][BUG] CongestionControl 
getPotentialConsumeSpeed throw /zero error
ca2be1c63 is described below

commit ca2be1c6300342fd39fc7a40491fb5e4b417bceb
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sat Jul 8 21:46:37 2023 +0800

    [CELEBORN-777][BUG] CongestionControl getPotentialConsumeSpeed throw /zero 
error
    
    ### What changes were proposed in this pull request?
    In `TimeSlidingHub.add()` `_deque` will clear then add the pair.
    
    ```
          if (nodesToAdd >= maxQueueSize) {
            // The new node exceed existing sliding list, need to clear all old 
nodes
            // and create a new sliding list
            _deque.clear();
            _deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
            sumNode = (N) newNode.clone();
            return;
          }
    
    ```
    
    Then when call `BufferStatusHub.avgBytesPerSec()`,  `currentNumBytes` can 
be `> 0` but `getCurrentTimeWindowsInMills` may return 0. Cause the error.
    
    ```
      public long avgBytesPerSec() {
        long currentNumBytes = sum().numBytes();
        if (currentNumBytes > 0) {
          return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills();
        }
        return 0L;
      }
    ```
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1690 from AngersZhuuuu/CELEBORN-777.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
    (cherry picked from commit 52dcd3b5df9c6ee27261f776545b341ceeeabdcd)
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../worker/congestcontrol/BufferStatusHub.java     |  7 ++--
 .../worker/congestcontrol/TimeSlidingHub.java      | 38 +++++++++++-----------
 .../worker/congestcontrol/TestTimeSlidingHub.java  | 18 +++++-----
 3 files changed, 33 insertions(+), 30 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
index 38c7282f4..14a0dfd5a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
@@ -19,6 +19,8 @@ package 
org.apache.celeborn.service.deploy.worker.congestcontrol;
 
 import java.util.concurrent.atomic.LongAdder;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 public class BufferStatusHub extends 
TimeSlidingHub<BufferStatusHub.BufferStatusNode> {
 
   public static class BufferStatusNode implements 
TimeSlidingHub.TimeSlidingNode {
@@ -73,9 +75,10 @@ public class BufferStatusHub extends 
TimeSlidingHub<BufferStatusHub.BufferStatus
   }
 
   public long avgBytesPerSec() {
-    long currentNumBytes = sum().numBytes();
+    Pair<BufferStatusNode, Integer> sumInfo = sum();
+    long currentNumBytes = sumInfo.getKey().numBytes();
     if (currentNumBytes > 0) {
-      return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills();
+      return currentNumBytes * 1000 / (long) sumInfo.getRight() * 
intervalPerBucketInMills;
     }
     return 0L;
   }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
index ef34f07c6..6951b0d5d 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.tuple.Pair;
 
 /**
  * A time sliding list that group different {@link TimeSlidingNode} with 
corresponding timestamp by
- * exact interval 1 second. Internally hold a {@link sumNode} to get the sum 
of the nodes in the
+ * exact interval 1 second. Internally hold a {@link sumInfo} to get the sum 
of the nodes in the
  * list.
  *
  * <p>This list is thread-safe, but {@link TimeSlidingNode} returned by the 
method {@link sum}
@@ -51,20 +51,20 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
   }
 
   // 1 second.
-  private final int intervalPerBucketInMills = 1000;
+  protected final int intervalPerBucketInMills = 1000;
   private final int maxQueueSize;
-  private N sumNode;
+  private Pair<N, Integer> sumInfo;
 
   private final LinkedBlockingDeque<Pair<Long, N>> _deque;
 
   public TimeSlidingHub(int timeWindowsInSecs) {
     this._deque = new LinkedBlockingDeque<>();
     this.maxQueueSize = timeWindowsInSecs * 1000 / intervalPerBucketInMills;
-    this.sumNode = newEmptyNode();
+    this.sumInfo = Pair.of(newEmptyNode(), 0);
   }
 
-  public N sum() {
-    return sumNode;
+  public Pair<N, Integer> sum() {
+    return sumInfo;
   }
 
   public void add(N newNode) {
@@ -75,7 +75,7 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
   public synchronized void add(long currentTimestamp, N newNode) {
     if (_deque.size() == 0) {
       _deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
-      sumNode = (N) newNode.clone();
+      sumInfo = Pair.of((N) newNode.clone(), 1);
       return;
     }
 
@@ -87,29 +87,33 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
       // The node doesn't belong to the lastNode, there might be 2 different 
scenarios
       // 1. All existing nodes are out of date, should be removed
       // 2. some nodes are out of date, should be removed
-      long nodesToAdd = timeDiff / intervalPerBucketInMills;
+      int nodesToAdd = (int) timeDiff / intervalPerBucketInMills;
       if (nodesToAdd >= maxQueueSize) {
         // The new node exceed existing sliding list, need to clear all old 
nodes
         // and create a new sliding list
         _deque.clear();
         _deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
-        sumNode = (N) newNode.clone();
+        sumInfo = Pair.of((N) newNode.clone(), 1);
         return;
       }
 
       // Add new node at the end of the list, and deprecate nodes out of 
timeInterval
-      for (long i = 1; i < nodesToAdd; i++) {
+      for (int i = 1; i < nodesToAdd; i++) {
         N toAdd = newEmptyNode();
         lastNode = Pair.of(lastNode.getLeft() + intervalPerBucketInMills, 
toAdd);
         _deque.add(lastNode);
       }
 
       _deque.add(Pair.of(lastNode.getLeft() + intervalPerBucketInMills, (N) 
newNode.clone()));
-      sumNode.combineNode(newNode);
+      N nodeToCombine = sumInfo.getLeft();
+      nodeToCombine.combineNode(newNode);
+      sumInfo = Pair.of(nodeToCombine, sumInfo.getRight() + nodesToAdd);
 
       while (_deque.size() > maxQueueSize) {
         Pair<Long, N> removed = _deque.removeFirst();
-        sumNode.separateNode(removed.getRight());
+        N nodeToSeparate = sumInfo.getLeft();
+        nodeToSeparate.separateNode(removed.getRight());
+        sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1);
       }
       return;
     }
@@ -121,7 +125,7 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
         Pair<Long, N> curNode = iter.next();
         if (currentTimestamp - curNode.getLeft() >= 0) {
           curNode.getRight().combineNode(newNode);
-          sumNode.combineNode(newNode);
+          sumInfo.getLeft().combineNode(newNode);
           return;
         }
       }
@@ -132,22 +136,18 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
 
     // Belong to last node
     lastNode.getRight().combineNode(newNode);
-    sumNode.combineNode(newNode);
+    sumInfo.getLeft().combineNode(newNode);
   }
 
   public void clear() {
     synchronized (_deque) {
       _deque.clear();
-      sumNode = newEmptyNode();
+      sumInfo = Pair.of(newEmptyNode(), 0);
     }
   }
 
   protected abstract N newEmptyNode();
 
-  protected int getCurrentTimeWindowsInMills() {
-    return _deque.size() * intervalPerBucketInMills;
-  }
-
   @VisibleForTesting
   protected long currentTimeMillis() {
     return System.currentTimeMillis();
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
index 683eafa5b..0bcc7af8b 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
@@ -84,34 +84,34 @@ public class TestTimeSlidingHub {
 
     hub.setDummyTimestamp(0L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1));
-    Assert.assertEquals(1, hub.sum().getValue());
+    Assert.assertEquals(1, hub.sum().getLeft().getValue());
 
     hub.setDummyTimestamp(1000L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
-    Assert.assertEquals(3, hub.sum().getValue());
+    Assert.assertEquals(3, hub.sum().getLeft().getValue());
 
     hub.setDummyTimestamp(2200L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(3));
-    Assert.assertEquals(6, hub.sum().getValue());
+    Assert.assertEquals(6, hub.sum().getLeft().getValue());
 
     hub.setDummyTimestamp(2400L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(4));
-    Assert.assertEquals(10, hub.sum().getValue());
+    Assert.assertEquals(10, hub.sum().getLeft().getValue());
 
     // Should remove the value 1
     hub.setDummyTimestamp(3000L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(5));
-    Assert.assertEquals(14, hub.sum().getValue());
+    Assert.assertEquals(14, hub.sum().getLeft().getValue());
 
     // Should remove the value 2
     hub.setDummyTimestamp(4000L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(6));
-    Assert.assertEquals(18, hub.sum().getValue());
+    Assert.assertEquals(18, hub.sum().getLeft().getValue());
 
     // Should remove the value 3 and 4
     hub.setDummyTimestamp(5000L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(7));
-    Assert.assertEquals(18, hub.sum().getValue());
+    Assert.assertEquals(18, hub.sum().getLeft().getValue());
   }
 
   @Test
@@ -120,10 +120,10 @@ public class TestTimeSlidingHub {
 
     hub.setDummyTimestamp(0L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1));
-    Assert.assertEquals(1, hub.sum().getValue());
+    Assert.assertEquals(1, hub.sum().getLeft().getValue());
 
     hub.setDummyTimestamp(10000L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
-    Assert.assertEquals(2, hub.sum().getValue());
+    Assert.assertEquals(2, hub.sum().getLeft().getValue());
   }
 }

Reply via email to