This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 522b79759 [CELEBORN-1472] Reduce
CongestionController#userBufferStatuses call times
522b79759 is described below
commit 522b79759ade5a187ab9be158aae92aa45c4118a
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Jun 25 11:05:24 2024 +0800
[CELEBORN-1472] Reduce CongestionController#userBufferStatuses call times
### What changes were proposed in this pull request?
Reduce CongestionController#userBufferStatuses call times.
### Why are the changes needed?
When we use sort based shuffle writer, The number of PushMergedData
requests has increased which make CongestionController#produceBytes taking up
much cpu time.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing uts.
Closes #2583 from leixm/issue_1472.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit d362d9f75b0b8f7f0239846a5032922d2941984a)
Signed-off-by: Shuang <[email protected]>
---
.../congestcontrol/CongestionController.java | 54 ++++++----------------
.../worker/congestcontrol/UserBufferInfo.java | 41 ++++++++++++++++
.../deploy/worker/storage/PartitionDataWriter.java | 17 +++++--
.../congestcontrol/TestCongestionController.java | 18 +++++---
4 files changed, 78 insertions(+), 52 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
index 0a5cc3eae..d05ddd582 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
@@ -113,29 +113,6 @@ public class CongestionController {
return _INSTANCE;
}
- private static class UserBufferInfo {
- long timestamp;
- final BufferStatusHub bufferStatusHub;
-
- public UserBufferInfo(long timestamp, BufferStatusHub bufferStatusHub) {
- this.timestamp = timestamp;
- this.bufferStatusHub = bufferStatusHub;
- }
-
- synchronized void updateInfo(long timestamp,
BufferStatusHub.BufferStatusNode node) {
- this.timestamp = timestamp;
- this.bufferStatusHub.add(timestamp, node);
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public BufferStatusHub getBufferStatusHub() {
- return bufferStatusHub;
- }
- }
-
/**
* 1. If the total pending bytes is over high watermark, will congest users
who produce speed is
* higher than the potential average consume speed.
@@ -166,24 +143,19 @@ public class CongestionController {
return false;
}
- public void produceBytes(UserIdentifier userIdentifier, int numBytes) {
- long currentTimeMillis = System.currentTimeMillis();
- UserBufferInfo userBufferInfo =
- userBufferStatuses.computeIfAbsent(
- userIdentifier,
- user -> {
- logger.info("New user {} comes, initializing its rate status",
user);
- BufferStatusHub bufferStatusHub = new
BufferStatusHub(sampleTimeWindowSeconds);
- UserBufferInfo userInfo = new UserBufferInfo(currentTimeMillis,
bufferStatusHub);
- workerSource.addGauge(
- WorkerSource.USER_PRODUCE_SPEED(),
- userIdentifier.toJMap(),
- () -> getUserProduceSpeed(userInfo));
- return userInfo;
- });
-
- BufferStatusHub.BufferStatusNode node = new
BufferStatusHub.BufferStatusNode(numBytes);
- userBufferInfo.updateInfo(currentTimeMillis, node);
+ public UserBufferInfo getUserBuffer(UserIdentifier userIdentifier) {
+ return userBufferStatuses.computeIfAbsent(
+ userIdentifier,
+ user -> {
+ logger.info("New user {} comes, initializing its rate status", user);
+ BufferStatusHub bufferStatusHub = new
BufferStatusHub(sampleTimeWindowSeconds);
+ UserBufferInfo userInfo = new
UserBufferInfo(System.currentTimeMillis(), bufferStatusHub);
+ workerSource.addGauge(
+ WorkerSource.USER_PRODUCE_SPEED(),
+ userIdentifier.toJMap(),
+ () -> getUserProduceSpeed(userInfo));
+ return userInfo;
+ });
}
public void consumeBytes(int numBytes) {
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserBufferInfo.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserBufferInfo.java
new file mode 100644
index 000000000..4d12f5908
--- /dev/null
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserBufferInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.congestcontrol;
+
+public class UserBufferInfo {
+ long timestamp;
+ final BufferStatusHub bufferStatusHub;
+
+ public UserBufferInfo(long timestamp, BufferStatusHub bufferStatusHub) {
+ this.timestamp = timestamp;
+ this.bufferStatusHub = bufferStatusHub;
+ }
+
+ public synchronized void updateInfo(long timestamp,
BufferStatusHub.BufferStatusNode node) {
+ this.timestamp = timestamp;
+ this.bufferStatusHub.add(timestamp, node);
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public BufferStatusHub getBufferStatusHub() {
+ return bufferStatusHub;
+ }
+}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index a39a4f9d7..589832dbe 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +49,9 @@ import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
+import
org.apache.celeborn.service.deploy.worker.congestcontrol.BufferStatusHub;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
+import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
/*
@@ -104,6 +105,8 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
private boolean metricsCollectCriticalEnabled;
private long chunkSize;
+ private UserBufferInfo userBufferInfo = null;
+
public PartitionDataWriter(
StorageManager storageManager,
AbstractSource workerSource,
@@ -155,6 +158,10 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
this.mapIdBitMap = new RoaringBitmap();
}
takeBuffer();
+ CongestionController congestionController =
CongestionController.instance();
+ if (!isMemoryShuffleFile.get() && congestionController != null) {
+ userBufferInfo =
congestionController.getUserBuffer(getDiskFileInfo().getUserIdentifier());
+ }
}
public void initFileChannelsForDiskFile() throws IOException {
@@ -294,10 +301,10 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
MemoryManager.instance().increaseMemoryFileStorage(numBytes);
} else {
MemoryManager.instance().incrementDiskBuffer(numBytes);
- Optional.ofNullable(CongestionController.instance())
- .ifPresent(
- congestionController ->
-
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+ if (userBufferInfo != null) {
+ userBufferInfo.updateInfo(
+ System.currentTimeMillis(), new
BufferStatusHub.BufferStatusNode(numBytes));
+ }
}
synchronized (flushLock) {
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index b8fb5cadc..deb533fa4 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -69,7 +69,7 @@ public class TestCongestionController {
Assert.assertFalse(controller.isUserCongested(userIdentifier));
- controller.produceBytes(userIdentifier, 1001);
+ produceBytes(userIdentifier, 1001);
pendingBytes = 1001;
controller.checkCongestion();
Assert.assertTrue(controller.isUserCongested(userIdentifier));
@@ -91,8 +91,8 @@ public class TestCongestionController {
// If pendingBytes exceed the high watermark, user1 produce speed > avg
consume speed
// While user2 produce speed < avg consume speed
- controller.produceBytes(user1, 800);
- controller.produceBytes(user2, 201);
+ produceBytes(user1, 800);
+ produceBytes(user2, 201);
controller.consumeBytes(500);
pendingBytes = 1001;
controller.checkCongestion();
@@ -100,8 +100,8 @@ public class TestCongestionController {
Assert.assertFalse(controller.isUserCongested(user2));
// If both users higher than the avg consume speed, should congest them
all.
- controller.produceBytes(user1, 800);
- controller.produceBytes(user2, 800);
+ produceBytes(user1, 800);
+ produceBytes(user2, 800);
controller.consumeBytes(500);
pendingBytes = 1600;
controller.checkCongestion();
@@ -119,7 +119,7 @@ public class TestCongestionController {
public void testUserMetrics() throws InterruptedException {
UserIdentifier user = new UserIdentifier("test", "celeborn");
Assert.assertFalse(controller.isUserCongested(user));
- controller.produceBytes(user, 800);
+ produceBytes(user, 800);
Assert.assertTrue(
isGaugeExist(
@@ -159,4 +159,10 @@ public class TestCongestionController {
.count()
== 1;
}
+
+ private void produceBytes(UserIdentifier userIdentifier, long numBytes) {
+ controller
+ .getUserBuffer(userIdentifier)
+ .updateInfo(System.currentTimeMillis(), new
BufferStatusHub.BufferStatusNode(numBytes));
+ }
}