This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 334f0e5ba [CELEBORN-782] Make max components configurable for 
FileWriter#flushBuffer
334f0e5ba is described below

commit 334f0e5ba4ad3aadd1869ad0590cbd3a60a77bee
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jul 10 18:00:07 2023 +0800

    [CELEBORN-782] Make max components configurable for FileWriter#flushBuffer
    
    ### What changes were proposed in this pull request?
    Make max components configurable for FileWriter#flushBuffer.
    
    ### Why are the changes needed?
    When max components of ```CompositeByteBuf``` is too big (hard coded 256 
before this PR), netty's offheap memory
    usage will be several times bigger than true usage:
    ```
     Direct memory usage: 1044.0 MiB/4.0 GiB, disk buffer size: 255.9 MiB
    ```
    When set to 1, netty's memory usage will be very close to disk buffer:
    ```
    Direct memory usage: 376.0 MiB/4.0 GiB, disk buffer size: 353.0 MiB
    ```
    but when it set too low, for example 1, performance might decrease, 
especially for sort pusher:
    ```
    max components:   1 vs. 16
    shuffle write time:   34s vs. 30s
    ```
    For hash pusher, difference is not so big:
    ```
    max components:   1 vs. 8
    shuffle write time:   25s vs. 23s
    ```
    This PR makes the parameter configurable.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Passes GA, and manual test.
    
    Closes #1697 from waitinfuture/782.
    
    Lead-authored-by: zky.zhoukeyong <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../scala/org/apache/celeborn/common/CelebornConf.scala  | 14 ++++++++++++++
 .../celeborn/service/deploy/worker/storage/Flusher.scala |  9 +++++++--
 .../service/deploy/worker/storage/StorageManager.scala   |  4 +++-
 .../service/deploy/worker/storage/FileWriterSuiteJ.java  | 16 ++++++++++++++--
 .../worker/storage/MapPartitionFileWriterSuiteJ.java     |  8 +++++++-
 5 files changed, 45 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 84262b5cb..24992be77 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -647,6 +647,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def partitionSorterThreads: Int =
     
get(PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
   def workerPushHeartbeatEnabled: Boolean = get(WORKER_PUSH_HEARTBEAT_ENABLED)
+  def workerPushMaxComponents: Int = 
get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
   def workerFetchHeartbeatEnabled: Boolean = 
get(WORKER_FETCH_HEARTBEAT_ENABLED)
   def workerPartitionSplitEnabled: Boolean = 
get(WORKER_PARTITION_SPLIT_ENABLED)
 
@@ -2579,6 +2580,19 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS: ConfigEntry[Int] =
+    buildConf("celeborn.worker.push.compositeBuffer.maxComponents")
+      .internal
+      .categories("worker")
+      .version("0.3.0")
+      .doc("Max components of Netty `CompositeByteBuf` in `FileWriter`'s 
`flushBuffer`. " +
+        "When this value is too big, i.e. 256, there will be many memory 
fragments in Netty's memory pool, " +
+        "and total direct memory can be significantly larger than the disk 
buffer. " +
+        "When set to 1, Netty's direct memory is close to disk buffer, but 
performance " +
+        "might decrease due to frequent memory copy during compaction.")
+      .intConf
+      .createWithDefault(16)
+
   val WORKER_FETCH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.worker.fetch.heartbeat.enabled")
       .categories("worker")
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index e02dc25b4..13edab359 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -38,6 +38,7 @@ import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 abstract private[worker] class Flusher(
     val workerSource: AbstractSource,
     val threadCount: Int,
+    val maxComponents: Int,
     flushTimeMetric: TimeWindow) extends Logging {
   protected lazy val flusherId: Int = System.identityHashCode(this)
   protected val workingQueues = new 
Array[LinkedBlockingQueue[FlushTask]](threadCount)
@@ -103,7 +104,7 @@ abstract private[worker] class Flusher(
   def takeBuffer(): CompositeByteBuf = {
     var buffer = bufferQueue.poll()
     if (buffer == null) {
-      buffer = Unpooled.compositeBuffer(256)
+      buffer = Unpooled.compositeBuffer(maxComponents)
     }
     buffer
   }
@@ -147,11 +148,13 @@ private[worker] class LocalFlusher(
     workerSource: AbstractSource,
     val deviceMonitor: DeviceMonitor,
     threadCount: Int,
+    maxComponents: Int,
     val mountPoint: String,
     val diskType: StorageInfo.Type,
     timeWindow: TimeWindow) extends Flusher(
     workerSource,
     threadCount,
+    maxComponents,
     timeWindow)
   with DeviceObserver with Logging {
 
@@ -182,9 +185,11 @@ private[worker] class LocalFlusher(
 
 final private[worker] class HdfsFlusher(
     workerSource: AbstractSource,
-    hdfsFlusherThreads: Int) extends Flusher(
+    hdfsFlusherThreads: Int,
+    maxComponents: Int) extends Flusher(
     workerSource,
     hdfsFlusherThreads,
+    maxComponents,
     null) with Logging {
   override def toString: String = s"HdfsFlusher@$flusherId"
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 641a7b739..2e7f1c3fd 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -108,6 +108,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           workerSource,
           deviceMonitor,
           diskInfo.threadCount,
+          conf.workerPushMaxComponents,
           diskInfo.mountPoint,
           diskInfo.storageType,
           diskInfo.flushTimeMetrics)
@@ -130,7 +131,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       (
         Some(new HdfsFlusher(
           workerSource,
-          conf.workerHdfsFlusherThreads)),
+          conf.workerHdfsFlusherThreads,
+          conf.workerPushMaxComponents)),
         conf.workerHdfsFlusherThreads)
     } else {
       (None, 0)
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index 64091a1ee..e3db12f26 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -116,7 +116,13 @@ public class FileWriterSuiteJ {
     dirs.$plus$eq(tempDir);
     localFlusher =
         new LocalFlusher(
-            source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, "disk1", 
StorageInfo.Type.HDD, null);
+            source,
+            DeviceMonitor$.MODULE$.EmptyMonitor(),
+            1,
+            256,
+            "disk1",
+            StorageInfo.Type.HDD,
+            null);
 
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), 
"0.8");
@@ -380,7 +386,13 @@ public class FileWriterSuiteJ {
     dirs.$plus$eq(file);
     localFlusher =
         new LocalFlusher(
-            source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, "disk2", 
StorageInfo.Type.HDD, null);
+            source,
+            DeviceMonitor$.MODULE$.EmptyMonitor(),
+            1,
+            256,
+            "disk2",
+            StorageInfo.Type.HDD,
+            null);
   }
 
   @Test
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
index ca2b67b31..40093a562 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
@@ -81,7 +81,13 @@ public class MapPartitionFileWriterSuiteJ {
     dirs.$plus$eq(tempDir);
     localFlusher =
         new LocalFlusher(
-            source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, "disk1", 
StorageInfo.Type.HDD, null);
+            source,
+            DeviceMonitor$.MODULE$.EmptyMonitor(),
+            1,
+            256,
+            "disk1",
+            StorageInfo.Type.HDD,
+            null);
 
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), 
"0.8");

Reply via email to