This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1d44e5fbf [CELEBORN-1487][PHASE1] CongestionController support control
traffic by user/worker traffic speed
1d44e5fbf is described below
commit 1d44e5fbfe0244a035ae4fa7cbf83837875323ef
Author: Xianming Lei <[email protected]>
AuthorDate: Sat Oct 12 10:17:33 2024 +0800
[CELEBORN-1487][PHASE1] CongestionController support control traffic by
user/worker traffic speed
### What changes were proposed in this pull request?
Introduce support control traffic by user/worker traffic speed.
### Why are the changes needed?
Currently, Celeborn only supports quota management based on disk file
bytes/count, and this quota management cannot cope with sudden increases in
traffic, which will cause corrupt to the cluster.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
UTs.
Closes #2797 from leixm/issue_1487_1.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 73 ++++++--
docs/configuration/worker.md | 8 +-
docs/migration.md | 4 +
.../congestcontrol/CongestionController.java | 135 ++++++++++++---
.../UserCongestionControlContext.java | 77 +++++++++
.../deploy/worker/storage/PartitionDataWriter.java | 20 ++-
.../service/deploy/worker/PushDataHandler.scala | 8 +-
.../celeborn/service/deploy/worker/Worker.scala | 15 +-
.../congestcontrol/TestCongestionController.java | 187 ++++++++++++++++++---
9 files changed, 448 insertions(+), 79 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 49e669c56..33ebb3c07 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1269,10 +1269,19 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
// `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and
`WORKER_DIRECT_MEMORY_RATIO_RESUME`,
// we'd better refine the logic among them
- def workerCongestionControlLowWatermark: Option[Long] =
- get(WORKER_CONGESTION_CONTROL_LOW_WATERMARK)
- def workerCongestionControlHighWatermark: Option[Long] =
- get(WORKER_CONGESTION_CONTROL_HIGH_WATERMARK)
+ def workerCongestionControlDiskBufferLowWatermark: Option[Long] =
+ get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK)
+ def workerCongestionControlDiskBufferHighWatermark: Option[Long] =
+ get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK)
+ def workerCongestionControlUserProduceSpeedLowWatermark: Option[Long] =
+ get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK)
+ def workerCongestionControlUserProduceSpeedHighWatermark: Option[Long] =
+ get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK)
+ def workerCongestionControlWorkerProduceSpeedLowWatermark: Option[Long] =
+ get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK)
+ def workerCongestionControlWorkerProduceSpeedHighWatermark: Option[Long] =
+ get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK)
+
def workerCongestionControlUserInactiveIntervalMs: Long =
get(WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL)
def workerCongestionControlCheckIntervalMs: Long =
get(WORKER_CONGESTION_CONTROL_CHECK_INTERVAL)
@@ -1540,7 +1549,15 @@ object CelebornConf extends Logging {
DeprecatedConfig(
"celeborn.shuffle.forceFallback.enabled",
"0.5.0",
- "Please use celeborn.client.spark.shuffle.fallback.policy"))
+ "Please use celeborn.client.spark.shuffle.fallback.policy"),
+ DeprecatedConfig(
+ "celeborn.worker.congestionControl.low.watermark",
+ "0.6.0",
+ "Please use
celeborn.worker.congestionControl.diskBuffer.low.watermark"),
+ DeprecatedConfig(
+ "celeborn.worker.congestionControl.high.watermark",
+ "0.6.0",
+ "Please use
celeborn.worker.congestionControl.diskBuffer.high.watermark"))
Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}
@@ -3775,8 +3792,9 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")
- val WORKER_CONGESTION_CONTROL_LOW_WATERMARK: OptionalConfigEntry[Long] =
- buildConf("celeborn.worker.congestionControl.low.watermark")
+ val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK:
OptionalConfigEntry[Long] =
+ buildConf("celeborn.worker.congestionControl.diskBuffer.low.watermark")
+ .withAlternative("celeborn.worker.congestionControl.low.watermark")
.categories("worker")
.doc("Will stop congest users if the total pending bytes of disk buffer
is lower than " +
"this configuration")
@@ -3784,18 +3802,53 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createOptional
- val WORKER_CONGESTION_CONTROL_HIGH_WATERMARK: OptionalConfigEntry[Long] =
- buildConf("celeborn.worker.congestionControl.high.watermark")
+ val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK:
OptionalConfigEntry[Long] =
+ buildConf("celeborn.worker.congestionControl.diskBuffer.high.watermark")
+ .withAlternative("celeborn.worker.congestionControl.high.watermark")
.categories("worker")
.doc("If the total bytes in disk buffer exceeds this configure, will
start to congest" +
"users whose produce rate is higher than the potential average consume
rate. " +
"The congestion will stop if the produce rate is lower or equal to the
" +
"average consume rate, or the total pending bytes lower than " +
- s"${WORKER_CONGESTION_CONTROL_LOW_WATERMARK.key}")
+ s"${WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK.key}")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createOptional
+ val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK:
OptionalConfigEntry[Long] =
+
buildConf("celeborn.worker.congestionControl.userProduceSpeed.low.watermark")
+ .categories("worker")
+ .doc("For those users that produce byte speeds less than this
configuration, " +
+ "stop congestion for these users")
+ .version("0.6.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createOptional
+
+ val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK:
OptionalConfigEntry[Long] =
+
buildConf("celeborn.worker.congestionControl.userProduceSpeed.high.watermark")
+ .categories("worker")
+ .doc("For those users that produce byte speeds greater than this
configuration, " +
+ "start congestion for these users")
+ .version("0.6.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createOptional
+
+ val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK:
OptionalConfigEntry[Long] =
+
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.low.watermark")
+ .categories("worker")
+ .doc("Stop congestion If worker total produce speed less than this
configuration")
+ .version("0.6.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createOptional
+
+ val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK:
OptionalConfigEntry[Long] =
+
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.high.watermark")
+ .categories("worker")
+ .doc("Start congestion If worker total produce speed greater than this
configuration")
+ .version("0.6.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createOptional
+
val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.user.inactive.interval")
.categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 637c58231..1e204a67b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -58,11 +58,15 @@ license: |
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn
worker to commit files of a shuffle. It's recommended to set at least `240s`
when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 |
celeborn.worker.shuffle.commit.timeout |
| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of
worker to wait for commit shuffle data files to finish. | 0.5.0 | |
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval
of worker checks congestion if celeborn.worker.congestionControl.enabled is
true. | 0.3.2 | |
+| celeborn.worker.congestionControl.diskBuffer.high.watermark |
<undefined> | false | If the total bytes in disk buffer exceeds this
configure, will start to congestusers whose produce rate is higher than the
potential average consume rate. The congestion will stop if the produce rate is
lower or equal to the average consume rate, or the total pending bytes lower
than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 |
celeborn.worker.congestionControl.high.watermark |
+| celeborn.worker.congestionControl.diskBuffer.low.watermark |
<undefined> | false | Will stop congest users if the total pending bytes
of disk buffer is lower than this configuration | 0.3.0 |
celeborn.worker.congestionControl.low.watermark |
| celeborn.worker.congestionControl.enabled | false | false | Whether to
enable congestion control or not. | 0.3.0 | |
-| celeborn.worker.congestionControl.high.watermark | <undefined> | false
| If the total bytes in disk buffer exceeds this configure, will start to
congestusers whose produce rate is higher than the potential average consume
rate. The congestion will stop if the produce rate is lower or equal to the
average consume rate, or the total pending bytes lower than
celeborn.worker.congestionControl.low.watermark | 0.3.0 | |
-| celeborn.worker.congestionControl.low.watermark | <undefined> | false
| Will stop congest users if the total pending bytes of disk buffer is lower
than this configuration | 0.3.0 | |
| celeborn.worker.congestionControl.sample.time.window | 10s | false | The
worker holds a time sliding list to calculate users' produce/consume rate |
0.3.0 | |
| celeborn.worker.congestionControl.user.inactive.interval | 10min | false |
How long will consider this user is inactive if it doesn't send data | 0.3.0 |
|
+| celeborn.worker.congestionControl.userProduceSpeed.high.watermark |
<undefined> | false | For those users that produce byte speeds greater
than this configuration, start congestion for these users | 0.6.0 | |
+| celeborn.worker.congestionControl.userProduceSpeed.low.watermark |
<undefined> | false | For those users that produce byte speeds less than
this configuration, stop congestion for these users | 0.6.0 | |
+| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark |
<undefined> | false | Start congestion If worker total produce speed
greater than this configuration | 0.6.0 | |
+| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark |
<undefined> | false | Stop congestion If worker total produce speed less
than this configuration | 0.6.0 | |
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval
of checking whether all the shuffle expired during worker decommission | 0.4.0
| |
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time
of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max
ratio of direct memory to store shuffle data. This feature is experimental and
disabled by default. | 0.5.0 | |
diff --git a/docs/migration.md b/docs/migration.md
index 29a237c47..13a4592b1 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -22,6 +22,10 @@ license: |
# Migration Guide
# Upgrading from 0.5 to 0.6
+- Since 0.6.0, Celeborn deprecate
`celeborn.worker.congestionControl.low.watermark`. Please use
`celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.
+
+- Since 0.6.0, Celeborn deprecate
`celeborn.worker.congestionControl.high.watermark`. Please use
`celeborn.worker.congestionControl.diskBuffer.high.watermark` instead.
+
- Since 0.6.0, Celeborn has introduced a new RESTful API namespace: /api/v1,
which uses the application/json media type for requests and responses.
The `celeborn-openapi-client` SDK is also available to help users interact
with the new RESTful APIs.
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
index d05ddd582..b0c3a2ca1 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
@@ -40,36 +40,57 @@ public class CongestionController {
private final WorkerSource workerSource;
private final int sampleTimeWindowSeconds;
- private final long highWatermark;
- private final long lowWatermark;
+ private final long diskBufferHighWatermark;
+ private final long diskBufferLowWatermark;
+ private final long userProduceSpeedHighWatermark;
+ private final long userProduceSpeedLowWatermark;
+ private final long workerProduceSpeedHighWatermark;
+ private final long workerProduceSpeedLowWatermark;
private final long userInactiveTimeMills;
private final AtomicBoolean overHighWatermark = new AtomicBoolean(false);
private final BufferStatusHub consumedBufferStatusHub;
+ private final BufferStatusHub producedBufferStatusHub;
+
private final ConcurrentHashMap<UserIdentifier, UserBufferInfo>
userBufferStatuses;
private final ScheduledExecutorService removeUserExecutorService;
private final ScheduledExecutorService checkService;
+ private final ConcurrentHashMap<UserIdentifier, UserCongestionControlContext>
+ userCongestionContextMap;
+
protected CongestionController(
WorkerSource workerSource,
int sampleTimeWindowSeconds,
- long highWatermark,
- long lowWatermark,
+ long diskBufferHighWatermark,
+ long diskBufferLowWatermark,
+ long userProduceSpeedHighWatermark,
+ long userProduceSpeedLowWatermark,
+ long workerProduceSpeedHighWatermark,
+ long workerProduceSpeedLowWatermark,
long userInactiveTimeMills,
long checkIntervalTimeMills) {
- assert (highWatermark > lowWatermark);
+ assert (diskBufferHighWatermark > diskBufferLowWatermark);
+ assert (userProduceSpeedHighWatermark > userProduceSpeedLowWatermark);
+ assert (workerProduceSpeedHighWatermark > workerProduceSpeedLowWatermark);
this.workerSource = workerSource;
this.sampleTimeWindowSeconds = sampleTimeWindowSeconds;
- this.highWatermark = highWatermark;
- this.lowWatermark = lowWatermark;
+ this.diskBufferHighWatermark = diskBufferHighWatermark;
+ this.diskBufferLowWatermark = diskBufferLowWatermark;
+ this.userProduceSpeedHighWatermark = userProduceSpeedHighWatermark;
+ this.userProduceSpeedLowWatermark = userProduceSpeedLowWatermark;
+ this.workerProduceSpeedHighWatermark = workerProduceSpeedHighWatermark;
+ this.workerProduceSpeedLowWatermark = workerProduceSpeedLowWatermark;
this.userInactiveTimeMills = userInactiveTimeMills;
this.consumedBufferStatusHub = new
BufferStatusHub(sampleTimeWindowSeconds);
+ this.producedBufferStatusHub = new
BufferStatusHub(sampleTimeWindowSeconds);
this.userBufferStatuses = JavaUtils.newConcurrentHashMap();
+ this.userCongestionContextMap = JavaUtils.newConcurrentHashMap();
this.removeUserExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
@@ -94,16 +115,24 @@ public class CongestionController {
public static synchronized CongestionController initialize(
WorkerSource workSource,
int sampleTimeWindowSeconds,
- long highWatermark,
- long lowWatermark,
+ long highWatermarkDiskBuffer,
+ long lowWatermarkDiskBuffer,
+ long highWatermarkUserProduceSpeed,
+ long lowWatermarkUserProduceSpeed,
+ long highWatermarkWorkerProduceSpeed,
+ long lowWatermarkWorkerProduceSpeed,
long userInactiveTimeMills,
long checkIntervalTimeMills) {
_INSTANCE =
new CongestionController(
workSource,
sampleTimeWindowSeconds,
- highWatermark,
- lowWatermark,
+ highWatermarkDiskBuffer,
+ lowWatermarkDiskBuffer,
+ highWatermarkUserProduceSpeed,
+ lowWatermarkUserProduceSpeed,
+ highWatermarkWorkerProduceSpeed,
+ lowWatermarkWorkerProduceSpeed,
userInactiveTimeMills,
checkIntervalTimeMills);
return _INSTANCE;
@@ -122,25 +151,42 @@ public class CongestionController {
* <p>3. If the pending bytes doesn't exceed the high watermark, will allow
all users to try to
* get max throughout capacity.
*/
- public boolean isUserCongested(UserIdentifier userIdentifier) {
- if (userBufferStatuses.size() == 0) {
+ public boolean isUserCongested(UserCongestionControlContext
userCongestionControlContext) {
+ if (userBufferStatuses.isEmpty()) {
return false;
}
+
+ UserIdentifier userIdentifier =
userCongestionControlContext.getUserIdentifier();
+ long userProduceSpeed =
getUserProduceSpeed(userCongestionControlContext.getUserBufferInfo());
+ // If the user produce speed is higher that the avg consume speed, will
congest it
if (overHighWatermark.get()) {
- // If the user produce speed is higher that the avg consume speed, will
congest it
- long userProduceSpeed =
getUserProduceSpeed(userBufferStatuses.get(userIdentifier));
- long avgConsumeSpeed = getPotentialConsumeSpeed();
+ long avgConsumeSpeed = getPotentialProduceSpeed();
+ if (userProduceSpeed > avgConsumeSpeed) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "The user {}, produceSpeed is {}, while consumeSpeed is {}, need
to congest it.",
+ userIdentifier,
+ userProduceSpeed,
+ avgConsumeSpeed);
+ }
+ return true;
+ }
+ }
+
+ if (userProduceSpeed > userProduceSpeedHighWatermark) {
+ userCongestionControlContext.onCongestionControl();
if (logger.isDebugEnabled()) {
logger.debug(
- "The user {}, produceSpeed is {}, while consumeSpeed is {}, need
to congest it: {}",
+ "The user {}, produceSpeed is {}, while
userProduceSpeedHighWatermark is {}, need to congest it.",
userIdentifier,
userProduceSpeed,
- avgConsumeSpeed,
- userProduceSpeed > avgConsumeSpeed);
+ userProduceSpeedHighWatermark);
}
- return userProduceSpeed > avgConsumeSpeed;
+ } else if (userCongestionControlContext.inCongestionControl()
+ && userProduceSpeed < userProduceSpeedLowWatermark) {
+ userCongestionControlContext.offCongestionControl();
}
- return false;
+ return userCongestionControlContext.inCongestionControl();
}
public UserBufferInfo getUserBuffer(UserIdentifier userIdentifier) {
@@ -184,6 +230,14 @@ public class CongestionController {
return consumedBufferStatusHub.avgBytesPerSec() /
userBufferStatuses.size();
}
+ public long getPotentialProduceSpeed() {
+ if (userBufferStatuses.size() == 0) {
+ return 0;
+ }
+
+ return producedBufferStatusHub.avgBytesPerSec() /
userBufferStatuses.size();
+ }
+
/** Get the avg user produce speed, the unit is bytes/sec. */
private long getUserProduceSpeed(UserBufferInfo userBufferInfo) {
if (userBufferInfo != null) {
@@ -205,6 +259,7 @@ public class CongestionController {
UserBufferInfo userBufferInfo = next.getValue();
if (currentTimeMillis - userBufferInfo.getTimestamp() >=
userInactiveTimeMills) {
userBufferStatuses.remove(userIdentifier);
+ userCongestionContextMap.remove(userIdentifier);
workerSource.removeGauge(WorkerSource.USER_PRODUCE_SPEED(),
userIdentifier.toMap());
logger.info("User {} has been expired, remove from rate limit list",
userIdentifier);
}
@@ -217,13 +272,19 @@ public class CongestionController {
protected void checkCongestion() {
try {
long pendingConsume = getTotalPendingBytes();
- if (pendingConsume < lowWatermark) {
+ long workerProduceSpeed = producedBufferStatusHub.avgBytesPerSec();
+ if (pendingConsume < diskBufferLowWatermark
+ && workerProduceSpeed < workerProduceSpeedLowWatermark) {
if (overHighWatermark.compareAndSet(true, false)) {
- logger.info("Pending consume is lower than low watermark, exit
congestion control");
+ logger.info(
+ "Pending consume and produce speed is lower than low watermark,
exit congestion control");
}
return;
- } else if (pendingConsume > highWatermark &&
overHighWatermark.compareAndSet(false, true)) {
- logger.info("Pending consume is higher than high watermark, need
congestion control");
+ } else if ((pendingConsume > diskBufferHighWatermark
+ || workerProduceSpeed > workerProduceSpeedHighWatermark)
+ && overHighWatermark.compareAndSet(false, true)) {
+ logger.info(
+ "Pending consume or produce speed is higher than high watermark,
need congestion control");
}
if (overHighWatermark.get()) {
trimMemoryUsage();
@@ -239,6 +300,7 @@ public class CongestionController {
this.checkService.shutdownNow();
this.userBufferStatuses.clear();
this.consumedBufferStatusHub.clear();
+ this.producedBufferStatusHub.clear();
}
public static synchronized void destroy() {
@@ -247,4 +309,27 @@ public class CongestionController {
_INSTANCE = null;
}
}
+
+ public BufferStatusHub getProducedBufferStatusHub() {
+ return producedBufferStatusHub;
+ }
+
+ public UserCongestionControlContext getUserCongestionContext(UserIdentifier
userIdentifier) {
+ return userCongestionContextMap.computeIfAbsent(
+ userIdentifier,
+ user -> {
+ UserBufferInfo userBufferInfo = getUserBuffer(userIdentifier);
+ return new UserCongestionControlContext(
+ producedBufferStatusHub, userBufferInfo, workerSource,
userIdentifier);
+ });
+ }
+
+ public ConcurrentHashMap<UserIdentifier, UserCongestionControlContext>
+ getUserCongestionContextMap() {
+ return userCongestionContextMap;
+ }
+
+ public BufferStatusHub getConsumedBufferStatusHub() {
+ return consumedBufferStatusHub;
+ }
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
new file mode 100644
index 000000000..4d51139ca
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.congestcontrol;
+
+import com.codahale.metrics.Gauge;
+
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.service.deploy.worker.WorkerSource;
+
+public class UserCongestionControlContext {
+
+ private volatile boolean congestionControlFlag;
+
+ private final UserBufferInfo userBufferInfo;
+
+ private final BufferStatusHub workerBufferStatusHub;
+
+ private final UserIdentifier userIdentifier;
+
+ public UserCongestionControlContext(
+ BufferStatusHub workerBufferStatusHub,
+ UserBufferInfo userBufferInfo,
+ AbstractSource workerSource,
+ UserIdentifier userIdentifier) {
+ this.congestionControlFlag = false;
+ this.userBufferInfo = userBufferInfo;
+ this.userIdentifier = userIdentifier;
+ this.workerBufferStatusHub = workerBufferStatusHub;
+ workerSource.addGauge(
+ WorkerSource.USER_PRODUCE_SPEED(),
+ userIdentifier.toJMap(),
+ (Gauge<Long>) () ->
userBufferInfo.getBufferStatusHub().avgBytesPerSec());
+ }
+
+ public void onCongestionControl() {
+ congestionControlFlag = true;
+ }
+
+ public void offCongestionControl() {
+ congestionControlFlag = false;
+ }
+
+ public boolean inCongestionControl() {
+ return congestionControlFlag;
+ }
+
+ public void updateProduceBytes(long numBytes) {
+ long timeNow = System.currentTimeMillis();
+ BufferStatusHub.BufferStatusNode node = new
BufferStatusHub.BufferStatusNode(numBytes);
+ userBufferInfo.updateInfo(timeNow, node);
+ workerBufferStatusHub.add(timeNow, node);
+ }
+
+ public UserBufferInfo getUserBufferInfo() {
+ return userBufferInfo;
+ }
+
+ public UserIdentifier getUserIdentifier() {
+ return userIdentifier;
+ }
+}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 5bb506270..194796c08 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -50,9 +50,9 @@ import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
-import
org.apache.celeborn.service.deploy.worker.congestcontrol.BufferStatusHub;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
+import
org.apache.celeborn.service.deploy.worker.congestcontrol.UserCongestionControlContext;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
/*
@@ -111,6 +111,8 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
protected FileSystem hadoopFs;
+ private UserCongestionControlContext userCongestionControlContext = null;
+
public PartitionDataWriter(
StorageManager storageManager,
AbstractSource workerSource,
@@ -164,9 +166,10 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
this.mapIdBitMap = new RoaringBitmap();
}
takeBuffer();
- CongestionController congestionController =
CongestionController.instance();
- if (!isMemoryShuffleFile.get() && congestionController != null) {
- userBufferInfo =
congestionController.getUserBuffer(getDiskFileInfo().getUserIdentifier());
+ if (CongestionController.instance() != null) {
+ userCongestionControlContext =
+ CongestionController.instance()
+ .getUserCongestionContext(writerContext.getUserIdentifier());
}
}
@@ -317,9 +320,8 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
MemoryManager.instance().increaseMemoryFileStorage(numBytes);
} else {
MemoryManager.instance().incrementDiskBuffer(numBytes);
- if (userBufferInfo != null) {
- userBufferInfo.updateInfo(
- System.currentTimeMillis(), new
BufferStatusHub.BufferStatusNode(numBytes));
+ if (userCongestionControlContext != null) {
+ userCongestionControlContext.updateProduceBytes(numBytes);
}
}
@@ -684,4 +686,8 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
public MemoryFileInfo getMemoryFileInfo() {
return memoryFileInfo;
}
+
+ public UserCongestionControlContext getUserCongestionControlContext() {
+ return userCongestionControlContext;
+ }
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 9fe99c4fd..3b75dbe12 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -317,7 +317,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
Option(CongestionController.instance()) match {
case Some(congestionController) =>
if (congestionController.isUserCongested(
- fileWriter.getDiskFileInfo.getUserIdentifier)) {
+ fileWriter.getUserCongestionControlContext)) {
// Check whether primary congest the data though the
replicas doesn't congest
// it(the response is empty)
callbackWithTimer.onSuccess(
@@ -396,7 +396,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
Option(CongestionController.instance()) match {
case Some(congestionController) =>
if (congestionController.isUserCongested(
- fileWriter.getDiskFileInfo.getUserIdentifier)) {
+ fileWriter.getUserCongestionControlContext)) {
if (isPrimary) {
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
@@ -594,7 +594,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
Option(CongestionController.instance()) match {
case Some(congestionController) if fileWriters.nonEmpty
=>
if (congestionController.isUserCongested(
-
fileWriters.head.getDiskFileInfo.getUserIdentifier)) {
+ fileWriters.head.getUserCongestionControlContext))
{
// Check whether primary congest the data though the
replicas doesn't congest
// it(the response is empty)
callbackWithTimer.onSuccess(
@@ -670,7 +670,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
Option(CongestionController.instance()) match {
case Some(congestionController) if fileWriters.nonEmpty =>
if (congestionController.isUserCongested(
- fileWriters.head.getDiskFileInfo.getUserIdentifier)) {
+ fileWriters.head.getUserCongestionControlContext)) {
if (isPrimary) {
callbackWithTimer.onSuccess(
ByteBuffer.wrap(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 3777adc48..972b58dec 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -181,7 +181,12 @@ private[celeborn] class Worker(
val partitionsSorter = new PartitionFilesSorter(memoryManager, conf,
workerSource)
if (conf.workerCongestionControlEnabled) {
- if (conf.workerCongestionControlLowWatermark.isEmpty ||
conf.workerCongestionControlHighWatermark.isEmpty) {
+ if (conf.workerCongestionControlDiskBufferLowWatermark.isEmpty ||
+ conf.workerCongestionControlDiskBufferHighWatermark.isEmpty ||
+ conf.workerCongestionControlUserProduceSpeedLowWatermark.isEmpty ||
+ conf.workerCongestionControlUserProduceSpeedHighWatermark.isEmpty ||
+ conf.workerCongestionControlWorkerProduceSpeedLowWatermark.isEmpty ||
+ conf.workerCongestionControlWorkerProduceSpeedHighWatermark.isEmpty) {
throw new IllegalArgumentException("High watermark and low watermark
must be set" +
" when enabling rate limit")
}
@@ -189,8 +194,12 @@ private[celeborn] class Worker(
CongestionController.initialize(
workerSource,
conf.workerCongestionControlSampleTimeWindowSeconds.toInt,
- conf.workerCongestionControlHighWatermark.get,
- conf.workerCongestionControlLowWatermark.get,
+ conf.workerCongestionControlDiskBufferHighWatermark.get,
+ conf.workerCongestionControlDiskBufferLowWatermark.get,
+ conf.workerCongestionControlUserProduceSpeedHighWatermark.get,
+ conf.workerCongestionControlUserProduceSpeedLowWatermark.get,
+ conf.workerCongestionControlWorkerProduceSpeedHighWatermark.get,
+ conf.workerCongestionControlWorkerProduceSpeedLowWatermark.get,
conf.workerCongestionControlUserInactiveIntervalMs,
conf.workerCongestionControlCheckIntervalMs)
}
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index a48288325..d824abb46 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -40,7 +40,16 @@ public class TestCongestionController {
// Make sampleTimeWindow a bit larger in case the tests run time exceed
this window.
controller =
new CongestionController(
- source, 10, 1000, 500, userInactiveTimeMills,
checkIntervalTimeMills) {
+ source,
+ 10,
+ 1000,
+ 500,
+ 20000,
+ 10000,
+ 20000,
+ 10000,
+ userInactiveTimeMills,
+ checkIntervalTimeMills) {
@Override
public long getTotalPendingBytes() {
return pendingBytes;
@@ -62,18 +71,20 @@ public class TestCongestionController {
@Test
public void testSingleUser() {
UserIdentifier userIdentifier = new UserIdentifier("test", "celeborn");
+ UserCongestionControlContext userCongestionControlContext =
+ controller.getUserCongestionContext(userIdentifier);
+
Assert.assertFalse(controller.isUserCongested(userCongestionControlContext));
- Assert.assertFalse(controller.isUserCongested(userIdentifier));
-
- produceBytes(userIdentifier, 1001);
+ produceBytes(controller, userIdentifier, 1001);
pendingBytes = 1001;
controller.checkCongestion();
- Assert.assertTrue(controller.isUserCongested(userIdentifier));
+
Assert.assertTrue(controller.isUserCongested(userCongestionControlContext));
- controller.consumeBytes(1001);
+ controller.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(1001));
pendingBytes = 0;
controller.checkCongestion();
- Assert.assertFalse(controller.isUserCongested(userIdentifier));
+
Assert.assertFalse(controller.isUserCongested(userCongestionControlContext));
+ clearBufferStatus(controller);
}
@Test
@@ -81,52 +92,172 @@ public class TestCongestionController {
UserIdentifier user1 = new UserIdentifier("test", "celeborn");
UserIdentifier user2 = new UserIdentifier("test", "spark");
+ UserCongestionControlContext context1 =
controller.getUserCongestionContext(user1);
+ UserCongestionControlContext context2 =
controller.getUserCongestionContext(user2);
+
// Both users should not be congested at first
- Assert.assertFalse(controller.isUserCongested(user1));
- Assert.assertFalse(controller.isUserCongested(user2));
-
- // If pendingBytes exceed the high watermark, user1 produce speed > avg
consume speed
- // While user2 produce speed < avg consume speed
- produceBytes(user1, 800);
- produceBytes(user2, 201);
- controller.consumeBytes(500);
+ Assert.assertFalse(controller.isUserCongested(context1));
+ Assert.assertFalse(controller.isUserCongested(context2));
+
+ // If pendingBytes exceed the high watermark, user1 produce speed > avg
produce speed
+ // While user2 produce speed < avg produce speed
+ produceBytes(controller, user1, 800);
+ produceBytes(controller, user2, 201);
+ controller.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(500));
pendingBytes = 1001;
controller.checkCongestion();
- Assert.assertTrue(controller.isUserCongested(user1));
- Assert.assertFalse(controller.isUserCongested(user2));
+ Assert.assertTrue(controller.isUserCongested(context1));
+ Assert.assertFalse(controller.isUserCongested(context2));
- // If both users higher than the avg consume speed, should congest them
all.
- produceBytes(user1, 800);
- produceBytes(user2, 800);
- controller.consumeBytes(500);
+ // If both users higher than the avg produce speed, should congest them
all.
+ produceBytes(controller, user1, 800);
+ produceBytes(controller, user2, 800);
+ controller.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(500));
pendingBytes = 1600;
controller.checkCongestion();
- Assert.assertTrue(controller.isUserCongested(user1));
- Assert.assertTrue(controller.isUserCongested(user2));
+ Assert.assertTrue(controller.isUserCongested(context1));
+ Assert.assertTrue(controller.isUserCongested(context2));
// If pending bytes lower than the low watermark, should don't congest all
users.
pendingBytes = 0;
controller.checkCongestion();
- Assert.assertFalse(controller.isUserCongested(user1));
- Assert.assertFalse(controller.isUserCongested(user2));
+ Assert.assertFalse(controller.isUserCongested(context1));
+ Assert.assertFalse(controller.isUserCongested(context2));
+ clearBufferStatus(controller);
}
@Test
public void testUserMetrics() throws InterruptedException {
UserIdentifier user = new UserIdentifier("test", "celeborn");
- Assert.assertFalse(controller.isUserCongested(user));
- produceBytes(user, 800);
+ UserCongestionControlContext context =
controller.getUserCongestionContext(user);
+
+ Assert.assertFalse(controller.isUserCongested(context));
+ produceBytes(controller, user, 800);
Assert.assertTrue(source.gaugeExists(WorkerSource.USER_PRODUCE_SPEED(),
user.toMap()));
Thread.sleep(userInactiveTimeMills * 2);
Assert.assertFalse(source.gaugeExists(WorkerSource.USER_PRODUCE_SPEED(),
user.toMap()));
+ clearBufferStatus(controller);
}
- private void produceBytes(UserIdentifier userIdentifier, long numBytes) {
+ public void produceBytes(
+ CongestionController controller, UserIdentifier userIdentifier, long
numBytes) {
controller
.getUserBuffer(userIdentifier)
.updateInfo(System.currentTimeMillis(), new
BufferStatusHub.BufferStatusNode(numBytes));
}
+
+ @Test
+ public void testUserLevelTrafficQuota() throws InterruptedException {
+ CongestionController controller1 =
+ new CongestionController(
+ source, 10, 100000, 50000, 500, 400, 1200, 1000, 120L * 1000,
checkIntervalTimeMills) {
+ @Override
+ public long getTotalPendingBytes() {
+ return 0;
+ }
+
+ @Override
+ public void trimMemoryUsage() {
+ // No op
+ }
+ };
+
+ UserIdentifier user1 = new UserIdentifier("test1", "celeborn");
+ UserCongestionControlContext context1 =
controller1.getUserCongestionContext(user1);
+ Assert.assertFalse(controller1.isUserCongested(context1));
+ produceBytes(controller1, user1, 600);
+ controller1.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(600));
+ Assert.assertTrue(controller1.isUserCongested(context1));
+ Thread.sleep(1001);
+ produceBytes(controller1, user1, 100);
+ // user1 produce speed is 350mb/s
+ Assert.assertFalse(controller1.isUserCongested(context1));
+
+ UserIdentifier user2 = new UserIdentifier("test2", "celeborn");
+ UserCongestionControlContext context2 =
controller1.getUserCongestionContext(user2);
+ Assert.assertFalse(controller1.isUserCongested(context2));
+ produceBytes(controller1, user2, 400);
+ // user2 produce speed is 400mb/s
+ Assert.assertFalse(controller1.isUserCongested(context2));
+
+ UserIdentifier user3 = new UserIdentifier("test3", "celeborn");
+ UserCongestionControlContext context3 =
controller1.getUserCongestionContext(user3);
+ Assert.assertFalse(controller1.isUserCongested(context3));
+ produceBytes(controller1, user3, 400);
+ // user produce speed is 400mb/s
+ Assert.assertFalse(controller1.isUserCongested(context3));
+
+ produceBytes(controller1, user1, 400);
+ controller1.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(1300));
+ controller1.checkCongestion();
+ // user1 -> 550mb/s, user2 -> 400mb/s, user3 -> 400mb/s, avg consume
316mb/s
+ Assert.assertTrue(controller1.isUserCongested(context1));
+ Assert.assertFalse(controller1.isUserCongested(context2));
+ Assert.assertFalse(controller1.isUserCongested(context3));
+
+ Thread.sleep(1001);
+ produceBytes(controller1, user1, 10);
+ produceBytes(controller1, user2, 50);
+ produceBytes(controller1, user3, 50);
+ controller1.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(110));
+ controller1.checkCongestion();
+ Assert.assertFalse(controller1.isUserCongested(context1));
+ Assert.assertFalse(controller1.isUserCongested(context2));
+ Assert.assertFalse(controller1.isUserCongested(context3));
+ controller1.close();
+ }
+
+ @Test
+ public void testWorkerLevelTrafficQuota() throws InterruptedException {
+ CongestionController controller1 =
+ new CongestionController(
+ source, 10, 100000, 50000, 500, 400, 800, 700, 120 * 1000,
checkIntervalTimeMills) {
+ @Override
+ public long getTotalPendingBytes() {
+ return 0;
+ }
+
+ @Override
+ public void trimMemoryUsage() {
+ // No op
+ }
+ };
+
+ UserIdentifier user1 = new UserIdentifier("test1", "celeborn");
+ UserCongestionControlContext context1 =
controller1.getUserCongestionContext(user1);
+ Assert.assertFalse(controller1.isUserCongested(context1));
+ produceBytes(controller1, user1, 500);
+ controller1.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(500));
+ Assert.assertFalse(controller1.isUserCongested(context1));
+
+ UserIdentifier user2 = new UserIdentifier("test2", "celeborn");
+ UserCongestionControlContext context2 =
controller1.getUserCongestionContext(user2);
+ produceBytes(controller1, user2, 400);
+ controller1.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(400));
+ Assert.assertFalse(controller1.isUserCongested(context2));
+
+ controller1.checkCongestion();
+ Assert.assertTrue(controller1.isUserCongested(context1));
+ Assert.assertFalse(controller1.isUserCongested(context2));
+
+ Thread.sleep(1001);
+ produceBytes(controller1, user1, 200);
+ produceBytes(controller1, user1, 200);
+ controller1.getProducedBufferStatusHub().add(new
BufferStatusHub.BufferStatusNode(400));
+ controller1.checkCongestion();
+ Assert.assertFalse(controller1.isUserCongested(context1));
+ Assert.assertFalse(controller1.isUserCongested(context2));
+ controller1.close();
+ }
+
+ private void clearBufferStatus(CongestionController controller) {
+ controller.getProducedBufferStatusHub().clear();
+ controller.getConsumedBufferStatusHub().clear();
+ for (UserCongestionControlContext context :
controller.getUserCongestionContextMap().values()) {
+ context.getUserBufferInfo().getBufferStatusHub().clear();
+ }
+ }
}