This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f527b22b4 [CELEBORN-1410] Combine multiple ShuffleBlockInfo into a
single ShuffleBlockInfo
f527b22b4 is described below
commit f527b22b4d579b5ccd37a5de29df391d7e42e0ac
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu May 23 21:22:45 2024 +0800
[CELEBORN-1410] Combine multiple ShuffleBlockInfo into a single
ShuffleBlockInfo
### What changes were proposed in this pull request?
Merging smaller `ShuffleBlockInfo` corresponding into same mapID, such that
size of each block does not exceeds `celeborn.shuffle.chunk.size`
### Why are the changes needed?
As sorted ShuffleBlocks are contiguous, we can compact multiple
`ShuffleBlockInfo` into one as long as the size of compacted one does not
exceeds half of `celeborn.shuffle.chunk.size`. This way we can decrease the
number of ShuffleBlock objects.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UTs
Closes #2524 from s0nskar/CELEBORN-1410.
Lead-authored-by: Sanskar Modi <[email protected]>
Co-authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 12 +++++++++++
docs/configuration/worker.md | 1 +
.../worker/storage/PartitionFilesSorter.java | 23 ++++++++++++++++++----
3 files changed, 32 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 62cb30b4d..68fa938b5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -811,6 +811,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerPartitionSorterSortPartitionTimeout: Long =
get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
def workerPartitionSorterPrefetchEnabled: Boolean =
get(WORKER_PARTITION_SORTER_PREFETCH_ENABLED)
+ def workerPartitionSorterShuffleBlockCompactionFactor: Double =
+ get(WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR)
def workerPartitionSorterReservedMemoryPerPartition: Long =
get(WORKER_PARTITION_SORTER_RESERVED_MEMORY_PER_PARTITION)
def workerPartitionSorterThreads: Int =
@@ -2961,6 +2963,16 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR: ConfigEntry[Double] =
+ buildConf("celeborn.shuffle.sortPartition.block.compactionFactor")
+ .categories("worker")
+ .version("0.4.2")
+ .doc("Combine sorted shuffle blocks such that size of compacted shuffle
block does not " +
+ s"exceed compactionFactor * ${SHUFFLE_CHUNK_SIZE.key}")
+ .doubleConf
+ .checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
+ .createWithDefault(0.25)
+
val WORKER_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.buffer.size")
.categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 42d4ec6ed..ee6756131 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -39,6 +39,7 @@ license: |
| celeborn.master.internal.endpoints | <localhost>:8097 | false |
Endpoints of master nodes just for celeborn workers to connect, allowed pattern
is: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:8097,clb2:8097,clb3:8097`.
If the port is omitted, 8097 will be used. | 0.5.0 | |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false |
Regex to decide which Celeborn configuration properties and environment
variables in master and worker environments contain sensitive information. When
this regex matches a property key or value, the value is redacted from the
logging. | 0.5.0 | |
| celeborn.shuffle.chunk.size | 8m | false | Max chunk size of reducer's
merged shuffle data. For example, if a reducer's shuffle data is 128M and the
data will need 16 fetch chunk requests to fetch. | 0.2.0 | |
+| celeborn.shuffle.sortPartition.block.compactionFactor | 0.25 | false |
Combine sorted shuffle blocks such that size of compacted shuffle block does
not exceed compactionFactor * celeborn.shuffle.chunk.size | 0.4.2 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical.
| 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos
keytab file path for HDFS storage connection. | 0.3.2 | |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 481ea6c96..151e1df4c 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -86,6 +86,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private final AtomicLong sortedFilesSize = new AtomicLong();
protected final long sortTimeout;
protected final long shuffleChunkSize;
+ protected final double compactionFactor;
protected final boolean prefetchEnabled;
protected final long reservedMemoryPerPartition;
private final long partitionSorterShutdownAwaitTime;
@@ -100,6 +101,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
this.sortTimeout = conf.workerPartitionSorterSortPartitionTimeout();
this.shuffleChunkSize = conf.shuffleChunkSize();
+ this.compactionFactor =
conf.workerPartitionSorterShuffleBlockCompactionFactor();
this.prefetchEnabled = conf.workerPartitionSorterPrefetchEnabled();
this.reservedMemoryPerPartition =
conf.workerPartitionSorterReservedMemoryPerPartition();
this.partitionSorterShutdownAwaitTime =
@@ -719,10 +721,23 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
for (ShuffleBlockInfo blockInfo : originShuffleBlocks) {
long offset = blockInfo.offset;
long length = blockInfo.length;
- ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
- sortedBlock.offset = fileIndex;
- sortedBlock.length = length;
- sortedShuffleBlocks.add(sortedBlock);
+ // Combine multiple small `ShuffleBlockInfo` for same mapId such
that size of compacted
+ // `ShuffleBlockInfo` does not exceed `compactionFactor` *
`shuffleChunkSize`
+ boolean shuffleBlockCompacted = false;
+ if (!sortedShuffleBlocks.isEmpty()) {
+ ShuffleBlockInfo lastShuffleBlock =
+ sortedShuffleBlocks.get(sortedShuffleBlocks.size() - 1);
+ if (lastShuffleBlock.length + length <= compactionFactor *
shuffleChunkSize) {
+ lastShuffleBlock.length += length;
+ shuffleBlockCompacted = true;
+ }
+ }
+ if (!shuffleBlockCompacted) {
+ ShuffleBlockInfo sortedBlock = new ShuffleBlockInfo();
+ sortedBlock.offset = fileIndex;
+ sortedBlock.length = length;
+ sortedShuffleBlocks.add(sortedBlock);
+ }
fileIndex += transferBlock(offset, length);
}
sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);