This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 52dcd3b5d [CELEBORN-777][BUG] CongestionControl
getPotentialConsumeSpeed throw /zero error
52dcd3b5d is described below
commit 52dcd3b5df9c6ee27261f776545b341ceeeabdcd
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sat Jul 8 21:46:37 2023 +0800
[CELEBORN-777][BUG] CongestionControl getPotentialConsumeSpeed throw /zero
error
### What changes were proposed in this pull request?
In `TimeSlidingHub.add()` `_deque` will clear then add the pair.
```
if (nodesToAdd >= maxQueueSize) {
// The new node exceed existing sliding list, need to clear all old
nodes
// and create a new sliding list
_deque.clear();
_deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
sumNode = (N) newNode.clone();
return;
}
```
Then when call `BufferStatusHub.avgBytesPerSec()`, `currentNumBytes` can
be `> 0` but `getCurrentTimeWindowsInMills` may return 0. Cause the error.
```
public long avgBytesPerSec() {
long currentNumBytes = sum().numBytes();
if (currentNumBytes > 0) {
return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills();
}
return 0L;
}
```
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1690 from AngersZhuuuu/CELEBORN-777.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../worker/congestcontrol/BufferStatusHub.java | 7 ++--
.../worker/congestcontrol/TimeSlidingHub.java | 38 +++++++++++-----------
.../worker/congestcontrol/TestTimeSlidingHub.java | 18 +++++-----
3 files changed, 33 insertions(+), 30 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
index 38c7282f4..14a0dfd5a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java
@@ -19,6 +19,8 @@ package
org.apache.celeborn.service.deploy.worker.congestcontrol;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.commons.lang3.tuple.Pair;
+
public class BufferStatusHub extends
TimeSlidingHub<BufferStatusHub.BufferStatusNode> {
public static class BufferStatusNode implements
TimeSlidingHub.TimeSlidingNode {
@@ -73,9 +75,10 @@ public class BufferStatusHub extends
TimeSlidingHub<BufferStatusHub.BufferStatus
}
public long avgBytesPerSec() {
- long currentNumBytes = sum().numBytes();
+ Pair<BufferStatusNode, Integer> sumInfo = sum();
+ long currentNumBytes = sumInfo.getKey().numBytes();
if (currentNumBytes > 0) {
- return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills();
+ return currentNumBytes * 1000 / (long) sumInfo.getRight() *
intervalPerBucketInMills;
}
return 0L;
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
index ef34f07c6..6951b0d5d 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java
@@ -25,7 +25,7 @@ import org.apache.commons.lang3.tuple.Pair;
/**
* A time sliding list that group different {@link TimeSlidingNode} with
corresponding timestamp by
- * exact interval 1 second. Internally hold a {@link sumNode} to get the sum
of the nodes in the
+ * exact interval 1 second. Internally hold a {@link sumInfo} to get the sum
of the nodes in the
* list.
*
* <p>This list is thread-safe, but {@link TimeSlidingNode} returned by the
method {@link sum}
@@ -51,20 +51,20 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
}
// 1 second.
- private final int intervalPerBucketInMills = 1000;
+ protected final int intervalPerBucketInMills = 1000;
private final int maxQueueSize;
- private N sumNode;
+ private Pair<N, Integer> sumInfo;
private final LinkedBlockingDeque<Pair<Long, N>> _deque;
public TimeSlidingHub(int timeWindowsInSecs) {
this._deque = new LinkedBlockingDeque<>();
this.maxQueueSize = timeWindowsInSecs * 1000 / intervalPerBucketInMills;
- this.sumNode = newEmptyNode();
+ this.sumInfo = Pair.of(newEmptyNode(), 0);
}
- public N sum() {
- return sumNode;
+ public Pair<N, Integer> sum() {
+ return sumInfo;
}
public void add(N newNode) {
@@ -75,7 +75,7 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
public synchronized void add(long currentTimestamp, N newNode) {
if (_deque.size() == 0) {
_deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
- sumNode = (N) newNode.clone();
+ sumInfo = Pair.of((N) newNode.clone(), 1);
return;
}
@@ -87,29 +87,33 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
// The node doesn't belong to the lastNode, there might be 2 different
scenarios
// 1. All existing nodes are out of date, should be removed
// 2. some nodes are out of date, should be removed
- long nodesToAdd = timeDiff / intervalPerBucketInMills;
+ int nodesToAdd = (int) timeDiff / intervalPerBucketInMills;
if (nodesToAdd >= maxQueueSize) {
// The new node exceed existing sliding list, need to clear all old
nodes
// and create a new sliding list
_deque.clear();
_deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
- sumNode = (N) newNode.clone();
+ sumInfo = Pair.of((N) newNode.clone(), 1);
return;
}
// Add new node at the end of the list, and deprecate nodes out of
timeInterval
- for (long i = 1; i < nodesToAdd; i++) {
+ for (int i = 1; i < nodesToAdd; i++) {
N toAdd = newEmptyNode();
lastNode = Pair.of(lastNode.getLeft() + intervalPerBucketInMills,
toAdd);
_deque.add(lastNode);
}
_deque.add(Pair.of(lastNode.getLeft() + intervalPerBucketInMills, (N)
newNode.clone()));
- sumNode.combineNode(newNode);
+ N nodeToCombine = sumInfo.getLeft();
+ nodeToCombine.combineNode(newNode);
+ sumInfo = Pair.of(nodeToCombine, sumInfo.getRight() + nodesToAdd);
while (_deque.size() > maxQueueSize) {
Pair<Long, N> removed = _deque.removeFirst();
- sumNode.separateNode(removed.getRight());
+ N nodeToSeparate = sumInfo.getLeft();
+ nodeToSeparate.separateNode(removed.getRight());
+ sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1);
}
return;
}
@@ -121,7 +125,7 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
Pair<Long, N> curNode = iter.next();
if (currentTimestamp - curNode.getLeft() >= 0) {
curNode.getRight().combineNode(newNode);
- sumNode.combineNode(newNode);
+ sumInfo.getLeft().combineNode(newNode);
return;
}
}
@@ -132,22 +136,18 @@ public abstract class TimeSlidingHub<N extends
TimeSlidingHub.TimeSlidingNode> {
// Belong to last node
lastNode.getRight().combineNode(newNode);
- sumNode.combineNode(newNode);
+ sumInfo.getLeft().combineNode(newNode);
}
public void clear() {
synchronized (_deque) {
_deque.clear();
- sumNode = newEmptyNode();
+ sumInfo = Pair.of(newEmptyNode(), 0);
}
}
protected abstract N newEmptyNode();
- protected int getCurrentTimeWindowsInMills() {
- return _deque.size() * intervalPerBucketInMills;
- }
-
@VisibleForTesting
protected long currentTimeMillis() {
return System.currentTimeMillis();
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
index 683eafa5b..0bcc7af8b 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java
@@ -84,34 +84,34 @@ public class TestTimeSlidingHub {
hub.setDummyTimestamp(0L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1));
- Assert.assertEquals(1, hub.sum().getValue());
+ Assert.assertEquals(1, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(1000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
- Assert.assertEquals(3, hub.sum().getValue());
+ Assert.assertEquals(3, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(2200L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(3));
- Assert.assertEquals(6, hub.sum().getValue());
+ Assert.assertEquals(6, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(2400L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(4));
- Assert.assertEquals(10, hub.sum().getValue());
+ Assert.assertEquals(10, hub.sum().getLeft().getValue());
// Should remove the value 1
hub.setDummyTimestamp(3000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(5));
- Assert.assertEquals(14, hub.sum().getValue());
+ Assert.assertEquals(14, hub.sum().getLeft().getValue());
// Should remove the value 2
hub.setDummyTimestamp(4000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(6));
- Assert.assertEquals(18, hub.sum().getValue());
+ Assert.assertEquals(18, hub.sum().getLeft().getValue());
// Should remove the value 3 and 4
hub.setDummyTimestamp(5000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(7));
- Assert.assertEquals(18, hub.sum().getValue());
+ Assert.assertEquals(18, hub.sum().getLeft().getValue());
}
@Test
@@ -120,10 +120,10 @@ public class TestTimeSlidingHub {
hub.setDummyTimestamp(0L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1));
- Assert.assertEquals(1, hub.sum().getValue());
+ Assert.assertEquals(1, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(10000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
- Assert.assertEquals(2, hub.sum().getValue());
+ Assert.assertEquals(2, hub.sum().getLeft().getValue());
}
}