This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 67f01d081 [CELEBORN-1575] TimeSlidingHub should remove expire node
when reading
67f01d081 is described below
commit 67f01d08106f758c7a8d38a8d269a6c0a1291dd4
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Aug 27 16:02:26 2024 +0800
[CELEBORN-1575] TimeSlidingHub should remove expire node when reading
### What changes were proposed in this pull request?
TimeSlidingHub remove expire node when reading.
### Why are the changes needed?
Metrics **UserProduceSpeed** is not correct.
You can see that if a user no longer writes data, UserProduceSpeed still
retains the last value until user expires.

### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs.
Closes #2702 from leixm/issue_1575.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 7188b34257d5dae6a4ff780199517fa1005a6a52)
Signed-off-by: SteNicholas <[email protected]>
---
.../worker/congestcontrol/TimeSlidingHub.java | 22 +++++++++++++++-------
.../worker/congestcontrol/TestTimeSlidingHub.java | 3 +++
2 files changed, 18 insertions(+), 7 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
index 64ce466cc..d744f583e 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
@@ -52,6 +52,7 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
// 1 second.
protected final int intervalPerBucketInMills = 1000;
+ protected final int timeWindowsInMills;
private final int maxQueueSize;
private Pair<N, Integer> sumInfo;
@@ -60,10 +61,22 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
public TimeSlidingHub(int timeWindowsInSecs) {
this._deque = new LinkedBlockingDeque<>();
this.maxQueueSize = timeWindowsInSecs * 1000 / intervalPerBucketInMills;
+ this.timeWindowsInMills = maxQueueSize * intervalPerBucketInMills;
this.sumInfo = Pair.of(newEmptyNode(), 0);
}
- public Pair<N, Integer> sum() {
+ private void removeExpiredNodes() {
+ long currentTime = currentTimeMillis();
+ while (!_deque.isEmpty() && currentTime - _deque.getFirst().getLeft() >=
timeWindowsInMills) {
+ Pair<Long, N> removed = _deque.removeFirst();
+ N nodeToSeparate = sumInfo.getLeft();
+ nodeToSeparate.separateNode(removed.getRight());
+ sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1);
+ }
+ }
+
+ public synchronized Pair<N, Integer> sum() {
+ removeExpiredNodes();
return sumInfo;
}
@@ -108,12 +121,7 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
nodeToCombine.combineNode(newNode);
sumInfo = Pair.of(nodeToCombine, sumInfo.getRight() + nodesToAdd);
- while (_deque.size() > maxQueueSize) {
- Pair<Long, N> removed = _deque.removeFirst();
- N nodeToSeparate = sumInfo.getLeft();
- nodeToSeparate.separateNode(removed.getRight());
- sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1);
- }
+ removeExpiredNodes();
return;
}
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
index 0bcc7af8b..2bca305f1 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
@@ -125,5 +125,8 @@ public class TestTimeSlidingHub {
hub.setDummyTimestamp(10000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
Assert.assertEquals(2, hub.sum().getLeft().getValue());
+
+ hub.setDummyTimestamp(13000L);
+ Assert.assertEquals(0, hub.sum().getLeft().getValue());
}
}