This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 641400533 [CELEBORN-782] Make max components configurable for
FileWriter#flushBuffer
641400533 is described below
commit 641400533bc7e0fb41a077c13d08a93201fe5ad8
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jul 10 18:00:07 2023 +0800
[CELEBORN-782] Make max components configurable for FileWriter#flushBuffer
### What changes were proposed in this pull request?
Make max components configurable for FileWriter#flushBuffer.
### Why are the changes needed?
When max components of ```CompositeByteBuf``` is too big (hard coded 256
before this PR), netty's offheap memory
usage will be several times bigger than true usage:
```
Direct memory usage: 1044.0 MiB/4.0 GiB, disk buffer size: 255.9 MiB
```
When set to 1, netty's memory usage will be very close to disk buffer:
```
Direct memory usage: 376.0 MiB/4.0 GiB, disk buffer size: 353.0 MiB
```
but when it set too low, for example 1, performance might decrease,
especially for sort pusher:
```
max components: 1 vs. 16
shuffle write time: 34s vs. 30s
```
For hash pusher, difference is not so big:
```
max components: 1 vs. 8
shuffle write time: 25s vs. 23s
```
This PR makes the parameter configurable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA, and manual test.
Closes #1697 from waitinfuture/782.
Lead-authored-by: zky.zhoukeyong <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 334f0e5ba4ad3aadd1869ad0590cbd3a60a77bee)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../scala/org/apache/celeborn/common/CelebornConf.scala | 14 ++++++++++++++
.../celeborn/service/deploy/worker/storage/Flusher.scala | 9 +++++++--
.../service/deploy/worker/storage/StorageManager.scala | 4 +++-
.../service/deploy/worker/storage/FileWriterSuiteJ.java | 16 ++++++++++++++--
.../worker/storage/MapPartitionFileWriterSuiteJ.java | 8 +++++++-
5 files changed, 45 insertions(+), 6 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 84262b5cb..24992be77 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -647,6 +647,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def partitionSorterThreads: Int =
get(PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
def workerPushHeartbeatEnabled: Boolean = get(WORKER_PUSH_HEARTBEAT_ENABLED)
+ def workerPushMaxComponents: Int =
get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
def workerFetchHeartbeatEnabled: Boolean =
get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean =
get(WORKER_PARTITION_SPLIT_ENABLED)
@@ -2579,6 +2580,19 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
+ val WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.push.compositeBuffer.maxComponents")
+ .internal
+ .categories("worker")
+ .version("0.3.0")
+ .doc("Max components of Netty `CompositeByteBuf` in `FileWriter`'s
`flushBuffer`. " +
+ "When this value is too big, i.e. 256, there will be many memory
fragments in Netty's memory pool, " +
+ "and total direct memory can be significantly larger than the disk
buffer. " +
+ "When set to 1, Netty's direct memory is close to disk buffer, but
performance " +
+ "might decrease due to frequent memory copy during compaction.")
+ .intConf
+ .createWithDefault(16)
+
val WORKER_FETCH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.fetch.heartbeat.enabled")
.categories("worker")
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index e02dc25b4..13edab359 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -38,6 +38,7 @@ import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager
abstract private[worker] class Flusher(
val workerSource: AbstractSource,
val threadCount: Int,
+ val maxComponents: Int,
flushTimeMetric: TimeWindow) extends Logging {
protected lazy val flusherId: Int = System.identityHashCode(this)
protected val workingQueues = new
Array[LinkedBlockingQueue[FlushTask]](threadCount)
@@ -103,7 +104,7 @@ abstract private[worker] class Flusher(
def takeBuffer(): CompositeByteBuf = {
var buffer = bufferQueue.poll()
if (buffer == null) {
- buffer = Unpooled.compositeBuffer(256)
+ buffer = Unpooled.compositeBuffer(maxComponents)
}
buffer
}
@@ -147,11 +148,13 @@ private[worker] class LocalFlusher(
workerSource: AbstractSource,
val deviceMonitor: DeviceMonitor,
threadCount: Int,
+ maxComponents: Int,
val mountPoint: String,
val diskType: StorageInfo.Type,
timeWindow: TimeWindow) extends Flusher(
workerSource,
threadCount,
+ maxComponents,
timeWindow)
with DeviceObserver with Logging {
@@ -182,9 +185,11 @@ private[worker] class LocalFlusher(
final private[worker] class HdfsFlusher(
workerSource: AbstractSource,
- hdfsFlusherThreads: Int) extends Flusher(
+ hdfsFlusherThreads: Int,
+ maxComponents: Int) extends Flusher(
workerSource,
hdfsFlusherThreads,
+ maxComponents,
null) with Logging {
override def toString: String = s"HdfsFlusher@$flusherId"
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 641a7b739..2e7f1c3fd 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -108,6 +108,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
workerSource,
deviceMonitor,
diskInfo.threadCount,
+ conf.workerPushMaxComponents,
diskInfo.mountPoint,
diskInfo.storageType,
diskInfo.flushTimeMetrics)
@@ -130,7 +131,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(
Some(new HdfsFlusher(
workerSource,
- conf.workerHdfsFlusherThreads)),
+ conf.workerHdfsFlusherThreads,
+ conf.workerPushMaxComponents)),
conf.workerHdfsFlusherThreads)
} else {
(None, 0)
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index 64091a1ee..e3db12f26 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -116,7 +116,13 @@ public class FileWriterSuiteJ {
dirs.$plus$eq(tempDir);
localFlusher =
new LocalFlusher(
- source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, "disk1",
StorageInfo.Type.HDD, null);
+ source,
+ DeviceMonitor$.MODULE$.EmptyMonitor(),
+ 1,
+ 256,
+ "disk1",
+ StorageInfo.Type.HDD,
+ null);
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
"0.8");
@@ -380,7 +386,13 @@ public class FileWriterSuiteJ {
dirs.$plus$eq(file);
localFlusher =
new LocalFlusher(
- source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, "disk2",
StorageInfo.Type.HDD, null);
+ source,
+ DeviceMonitor$.MODULE$.EmptyMonitor(),
+ 1,
+ 256,
+ "disk2",
+ StorageInfo.Type.HDD,
+ null);
}
@Test
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
index ca2b67b31..40093a562 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriterSuiteJ.java
@@ -81,7 +81,13 @@ public class MapPartitionFileWriterSuiteJ {
dirs.$plus$eq(tempDir);
localFlusher =
new LocalFlusher(
- source, DeviceMonitor$.MODULE$.EmptyMonitor(), 1, "disk1",
StorageInfo.Type.HDD, null);
+ source,
+ DeviceMonitor$.MODULE$.EmptyMonitor(),
+ 1,
+ 256,
+ "disk1",
+ StorageInfo.Type.HDD,
+ null);
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
"0.8");