This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 822701ffc [CELEBORN-1575] TimeSlidingHub should remove expire node 
when reading
822701ffc is described below

commit 822701ffc074f078bc935903b09e4431f9498b75
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Aug 27 16:02:26 2024 +0800

    [CELEBORN-1575] TimeSlidingHub should remove expire node when reading
    
    ### What changes were proposed in this pull request?
    TimeSlidingHub remove expire node when reading.
    
    ### Why are the changes needed?
    Metrics **UserProduceSpeed** is not correct.
    
    You can see that if a user no longer writes data, UserProduceSpeed ​​still 
retains the last value until user expires.
    
![image](https://github.com/user-attachments/assets/885f5b56-3b08-4909-8045-3c7ee3d03771)
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UTs.
    
    Closes #2702 from leixm/issue_1575.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 7188b34257d5dae6a4ff780199517fa1005a6a52)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../worker/congestcontrol/TimeSlidingHub.java      | 22 +++++++++++++++-------
 .../worker/congestcontrol/TestTimeSlidingHub.java  |  3 +++
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
index 64ce466cc..d744f583e 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
@@ -52,6 +52,7 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
 
   // 1 second.
   protected final int intervalPerBucketInMills = 1000;
+  protected final int timeWindowsInMills;
   private final int maxQueueSize;
   private Pair<N, Integer> sumInfo;
 
@@ -60,10 +61,22 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
   public TimeSlidingHub(int timeWindowsInSecs) {
     this._deque = new LinkedBlockingDeque<>();
     this.maxQueueSize = timeWindowsInSecs * 1000 / intervalPerBucketInMills;
+    this.timeWindowsInMills = maxQueueSize * intervalPerBucketInMills;
     this.sumInfo = Pair.of(newEmptyNode(), 0);
   }
 
-  public Pair<N, Integer> sum() {
+  private void removeExpiredNodes() {
+    long currentTime = currentTimeMillis();
+    while (!_deque.isEmpty() && currentTime - _deque.getFirst().getLeft() >= 
timeWindowsInMills) {
+      Pair<Long, N> removed = _deque.removeFirst();
+      N nodeToSeparate = sumInfo.getLeft();
+      nodeToSeparate.separateNode(removed.getRight());
+      sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1);
+    }
+  }
+
+  public synchronized Pair<N, Integer> sum() {
+    removeExpiredNodes();
     return sumInfo;
   }
 
@@ -108,12 +121,7 @@ public abstract class TimeSlidingHub<N extends 
TimeSlidingHub.TimeSlidingNode> {
       nodeToCombine.combineNode(newNode);
       sumInfo = Pair.of(nodeToCombine, sumInfo.getRight() + nodesToAdd);
 
-      while (_deque.size() > maxQueueSize) {
-        Pair<Long, N> removed = _deque.removeFirst();
-        N nodeToSeparate = sumInfo.getLeft();
-        nodeToSeparate.separateNode(removed.getRight());
-        sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1);
-      }
+      removeExpiredNodes();
       return;
     }
 
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
index 0bcc7af8b..2bca305f1 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
@@ -125,5 +125,8 @@ public class TestTimeSlidingHub {
     hub.setDummyTimestamp(10000L);
     hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
     Assert.assertEquals(2, hub.sum().getLeft().getValue());
+
+    hub.setDummyTimestamp(13000L);
+    Assert.assertEquals(0, hub.sum().getLeft().getValue());
   }
 }

Reply via email to