This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 74102d8e3 [CELEBORN-1410] Combine multiple ShuffleBlockInfo into a
single ShuffleBlockInfo
74102d8e3 is described below
commit 74102d8e3e214418066c146e0c0bef7e0ef46262
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu May 23 21:22:45 2024 +0800
[CELEBORN-1410] Combine multiple ShuffleBlockInfo into a single
ShuffleBlockInfo
Merging smaller `ShuffleBlockInfo` corresponding into same mapID, such that
size of each block does not exceeds `celeborn.shuffle.chunk.size`
As sorted ShuffleBlocks are contiguous, we can compact multiple
`ShuffleBlockInfo` into one as long as the size of compacted one does not
exceeds half of `celeborn.shuffle.chunk.size`. This way we can decrease the
number of ShuffleBlock objects.
No
Existing UTs
Closes #2524 from s0nskar/CELEBORN-1410.
Lead-authored-by: Sanskar Modi <[email protected]>
Co-authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 12 +++++++++++
docs/configuration/worker.md | 1 +
.../worker/storage/PartitionFilesSorter.java | 23 ++++++++++++++++++----
3 files changed, 32 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5c1cc0865..ea0147d5d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -686,6 +686,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize
* 2)
def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
def partitionSorterSortPartitionTimeout: Long =
get(PARTITION_SORTER_SORT_TIMEOUT)
+ def workerPartitionSorterShuffleBlockCompactionFactor: Double =
+ get(WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR)
def partitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_PER_PARTITION_RESERVED_MEMORY)
def partitionSorterThreads: Int =
@@ -2537,6 +2539,16 @@ object CelebornConf extends Logging {
.checkValue(v => v < Int.MaxValue, "Reserved memory per partition must
be less than 2GB.")
.createWithDefaultString("1mb")
+ val WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR: ConfigEntry[Double] =
+ buildConf("celeborn.shuffle.sortPartition.block.compactionFactor")
+ .categories("worker")
+ .version("0.4.2")
+ .doc("Combine sorted shuffle blocks such that size of compacted shuffle
block does not " +
+ s"exceed compactionFactor * ${SHUFFLE_CHUNK_SIZE.key}")
+ .doubleConf
+ .checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
+ .createWithDefault(0.25)
+
val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.buffer.size")
.categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 1f7f48a24..69221343b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -24,6 +24,7 @@ license: |
| celeborn.master.endpoints | <localhost>:9097 | Endpoints of master
nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 | |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size
smaller than this configuration of partition size for estimation. | 0.3.0 |
celeborn.shuffle.minPartitionSizeToEstimate |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged
shuffle data. For example, if a reducer's shuffle data is 128M and the data
will need 16 fetch chunk requests to fetch. | 0.2.0 | |
+| celeborn.shuffle.sortPartition.block.compactionFactor | 0.25 | Combine
sorted shuffle blocks such that size of compacted shuffle block does not exceed
compactionFactor * celeborn.shuffle.chunk.size | 0.4.2 | |
| celeborn.storage.availableTypes | HDD | Enabled storages. Available options:
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.hdfs.kerberos.keytab | <undefined> | Kerberos keytab
file path for HDFS storage connection. | 0.3.2 | |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index d2a6388a8..95eddcf0a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -84,6 +84,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private final AtomicLong sortedFilesSize = new AtomicLong();
protected final long sortTimeout;
protected final long shuffleChunkSize;
+ protected final double compactionFactor;
protected final long reservedMemoryPerPartition;
private boolean gracefulShutdown;
private long partitionSorterShutdownAwaitTime;
@@ -100,6 +101,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
this.sortTimeout = conf.partitionSorterSortPartitionTimeout();
this.shuffleChunkSize = conf.shuffleChunkSize();
this.reservedMemoryPerPartition =
conf.partitionSorterReservedMemoryPerPartition();
+ this.compactionFactor =
conf.workerPartitionSorterShuffleBlockCompactionFactor();
this.partitionSorterShutdownAwaitTime =
conf.workerGracefulShutdownPartitionSorterCloseAwaitTimeMs();
this.indexCacheMaxWeight = conf.partitionSorterIndexCacheMaxWeight();
@@ -623,10 +625,23 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
for (ShuffleBlockInfo blockInfo : originShuffleBlocks) {
long offset = blockInfo.offset;
long length = blockInfo.length;
- ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
- sortedBlock.offset = fileIndex;
- sortedBlock.length = length;
- sortedShuffleBlocks.add(sortedBlock);
+ // Combine multiple small `ShuffleBlockInfo` for same mapId such
that size of compacted
+ // `ShuffleBlockInfo` does not exceed `compactionFactor` *
`shuffleChunkSize`
+ boolean shuffleBlockCompacted = false;
+ if (!sortedShuffleBlocks.isEmpty()) {
+ ShuffleBlockInfo lastShuffleBlock =
+ sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1);
+ if (lastShuffleBlock.length + length <= compactionFactor *
shuffleChunkSize) {
+ lastShuffleBlock.length += length;
+ shuffleBlockCompacted = true;
+ }
+ }
+ if (!shuffleBlockCompacted) {
+ ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
+ sortedBlock.offset = fileIndex;
+ sortedBlock.length = length;
+ sortedShuffleBlocks.add(sortedBlock);
+ }
fileIndex += transferBlock(offset, length);
}
sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);