This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 522b79759 [CELEBORN-1472] Reduce 
CongestionController#userBufferStatuses call times
522b79759 is described below

commit 522b79759ade5a187ab9be158aae92aa45c4118a
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Jun 25 11:05:24 2024 +0800

    [CELEBORN-1472] Reduce CongestionController#userBufferStatuses call times
    
    ### What changes were proposed in this pull request?
    Reduce  CongestionController#userBufferStatuses call times.
    
    ### Why are the changes needed?
    When we use sort based shuffle writer, The number of PushMergedData 
requests has increased which make CongestionController#produceBytes taking up 
much cpu time.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing uts.
    
    Closes #2583 from leixm/issue_1472.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit d362d9f75b0b8f7f0239846a5032922d2941984a)
    Signed-off-by: Shuang <[email protected]>
---
 .../congestcontrol/CongestionController.java       | 54 ++++++----------------
 .../worker/congestcontrol/UserBufferInfo.java      | 41 ++++++++++++++++
 .../deploy/worker/storage/PartitionDataWriter.java | 17 +++++--
 .../congestcontrol/TestCongestionController.java   | 18 +++++---
 4 files changed, 78 insertions(+), 52 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
index 0a5cc3eae..d05ddd582 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
@@ -113,29 +113,6 @@ public class CongestionController {
     return _INSTANCE;
   }
 
-  private static class UserBufferInfo {
-    long timestamp;
-    final BufferStatusHub bufferStatusHub;
-
-    public UserBufferInfo(long timestamp, BufferStatusHub bufferStatusHub) {
-      this.timestamp = timestamp;
-      this.bufferStatusHub = bufferStatusHub;
-    }
-
-    synchronized void updateInfo(long timestamp, 
BufferStatusHub.BufferStatusNode node) {
-      this.timestamp = timestamp;
-      this.bufferStatusHub.add(timestamp, node);
-    }
-
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    public BufferStatusHub getBufferStatusHub() {
-      return bufferStatusHub;
-    }
-  }
-
   /**
    * 1. If the total pending bytes is over high watermark, will congest users 
who produce speed is
    * higher than the potential average consume speed.
@@ -166,24 +143,19 @@ public class CongestionController {
     return false;
   }
 
-  public void produceBytes(UserIdentifier userIdentifier, int numBytes) {
-    long currentTimeMillis = System.currentTimeMillis();
-    UserBufferInfo userBufferInfo =
-        userBufferStatuses.computeIfAbsent(
-            userIdentifier,
-            user -> {
-              logger.info("New user {} comes, initializing its rate status", 
user);
-              BufferStatusHub bufferStatusHub = new 
BufferStatusHub(sampleTimeWindowSeconds);
-              UserBufferInfo userInfo = new UserBufferInfo(currentTimeMillis, 
bufferStatusHub);
-              workerSource.addGauge(
-                  WorkerSource.USER_PRODUCE_SPEED(),
-                  userIdentifier.toJMap(),
-                  () -> getUserProduceSpeed(userInfo));
-              return userInfo;
-            });
-
-    BufferStatusHub.BufferStatusNode node = new 
BufferStatusHub.BufferStatusNode(numBytes);
-    userBufferInfo.updateInfo(currentTimeMillis, node);
+  public UserBufferInfo getUserBuffer(UserIdentifier userIdentifier) {
+    return userBufferStatuses.computeIfAbsent(
+        userIdentifier,
+        user -> {
+          logger.info("New user {} comes, initializing its rate status", user);
+          BufferStatusHub bufferStatusHub = new 
BufferStatusHub(sampleTimeWindowSeconds);
+          UserBufferInfo userInfo = new 
UserBufferInfo(System.currentTimeMillis(), bufferStatusHub);
+          workerSource.addGauge(
+              WorkerSource.USER_PRODUCE_SPEED(),
+              userIdentifier.toJMap(),
+              () -> getUserProduceSpeed(userInfo));
+          return userInfo;
+        });
   }
 
   public void consumeBytes(int numBytes) {
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserBufferInfo.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserBufferInfo.java
new file mode 100644
index 000000000..4d12f5908
--- /dev/null
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserBufferInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.congestcontrol;
+
+public class UserBufferInfo {
+  long timestamp;
+  final BufferStatusHub bufferStatusHub;
+
+  public UserBufferInfo(long timestamp, BufferStatusHub bufferStatusHub) {
+    this.timestamp = timestamp;
+    this.bufferStatusHub = bufferStatusHub;
+  }
+
+  public synchronized void updateInfo(long timestamp, 
BufferStatusHub.BufferStatusNode node) {
+    this.timestamp = timestamp;
+    this.bufferStatusHub.add(timestamp, node);
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public BufferStatusHub getBufferStatusHub() {
+    return bufferStatusHub;
+  }
+}
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index a39a4f9d7..589832dbe 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +49,9 @@ import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.unsafe.Platform;
 import org.apache.celeborn.common.util.FileChannelUtils;
 import org.apache.celeborn.service.deploy.worker.WorkerSource;
+import 
org.apache.celeborn.service.deploy.worker.congestcontrol.BufferStatusHub;
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
+import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
 
 /*
@@ -104,6 +105,8 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
   private boolean metricsCollectCriticalEnabled;
   private long chunkSize;
 
+  private UserBufferInfo userBufferInfo = null;
+
   public PartitionDataWriter(
       StorageManager storageManager,
       AbstractSource workerSource,
@@ -155,6 +158,10 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       this.mapIdBitMap = new RoaringBitmap();
     }
     takeBuffer();
+    CongestionController congestionController = 
CongestionController.instance();
+    if (!isMemoryShuffleFile.get() && congestionController != null) {
+      userBufferInfo = 
congestionController.getUserBuffer(getDiskFileInfo().getUserIdentifier());
+    }
   }
 
   public void initFileChannelsForDiskFile() throws IOException {
@@ -294,10 +301,10 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       MemoryManager.instance().increaseMemoryFileStorage(numBytes);
     } else {
       MemoryManager.instance().incrementDiskBuffer(numBytes);
-      Optional.ofNullable(CongestionController.instance())
-          .ifPresent(
-              congestionController ->
-                  
congestionController.produceBytes(diskFileInfo.getUserIdentifier(), numBytes));
+      if (userBufferInfo != null) {
+        userBufferInfo.updateInfo(
+            System.currentTimeMillis(), new 
BufferStatusHub.BufferStatusNode(numBytes));
+      }
     }
 
     synchronized (flushLock) {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index b8fb5cadc..deb533fa4 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -69,7 +69,7 @@ public class TestCongestionController {
 
     Assert.assertFalse(controller.isUserCongested(userIdentifier));
 
-    controller.produceBytes(userIdentifier, 1001);
+    produceBytes(userIdentifier, 1001);
     pendingBytes = 1001;
     controller.checkCongestion();
     Assert.assertTrue(controller.isUserCongested(userIdentifier));
@@ -91,8 +91,8 @@ public class TestCongestionController {
 
     // If pendingBytes exceed the high watermark, user1 produce speed > avg 
consume speed
     // While user2 produce speed < avg consume speed
-    controller.produceBytes(user1, 800);
-    controller.produceBytes(user2, 201);
+    produceBytes(user1, 800);
+    produceBytes(user2, 201);
     controller.consumeBytes(500);
     pendingBytes = 1001;
     controller.checkCongestion();
@@ -100,8 +100,8 @@ public class TestCongestionController {
     Assert.assertFalse(controller.isUserCongested(user2));
 
     // If both users higher than the avg consume speed, should congest them 
all.
-    controller.produceBytes(user1, 800);
-    controller.produceBytes(user2, 800);
+    produceBytes(user1, 800);
+    produceBytes(user2, 800);
     controller.consumeBytes(500);
     pendingBytes = 1600;
     controller.checkCongestion();
@@ -119,7 +119,7 @@ public class TestCongestionController {
   public void testUserMetrics() throws InterruptedException {
     UserIdentifier user = new UserIdentifier("test", "celeborn");
     Assert.assertFalse(controller.isUserCongested(user));
-    controller.produceBytes(user, 800);
+    produceBytes(user, 800);
 
     Assert.assertTrue(
         isGaugeExist(
@@ -159,4 +159,10 @@ public class TestCongestionController {
             .count()
         == 1;
   }
+
+  private void produceBytes(UserIdentifier userIdentifier, long numBytes) {
+    controller
+        .getUserBuffer(userIdentifier)
+        .updateInfo(System.currentTimeMillis(), new 
BufferStatusHub.BufferStatusNode(numBytes));
+  }
 }

Reply via email to