This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 553b3abc3 [CELEBORN-1969] Remove
celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition
split at default for MapPartition
553b3abc3 is described below
commit 553b3abc3bfb621da3dd331901d07f24d53d724d
Author: SteNicholas <[email protected]>
AuthorDate: Tue Apr 22 11:37:53 2025 +0800
[CELEBORN-1969] Remove celeborn.client.shuffle.mapPartition.split.enabled
to enable shuffle partition split at default for MapPartition
### What changes were proposed in this pull request?
Remove `celeborn.client.shuffle.mapPartition.split.enabled` to enable
shuffle partition split at default for MapPartition.
### Why are the changes needed?
The default value of `celeborn.client.shuffle.mapPartition.split.enabled`
is false, which causes that file writer fills the disk for PushData as follows:
```
2025-04-15 20:20:56,759 [push-server-6-4] WARN worker.PushDataHandler -
[handlePUSH_DATA] fileWriter
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has
Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN worker.PushDataHandler -
[handlePUSH_DATA] fileWriter
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has
Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN worker.PushDataHandler -
[handlePUSH_DATA] fileWriter
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has
Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN worker.PushDataHandler -
[handlePUSH_DATA] fileWriter
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has
Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-6] ERROR
storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed,
report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN worker.PushDataHandler -
[handlePUSH_DATA] fileWriter
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-1704-0-0 partition-writer has
Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN worker.PushDataHandler -
[handlePUSH_DATA] fileWriter
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has
Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-0] ERROR
storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed,
report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN worker.PushDataHandler -
[handlePUSH_DATA] fileWriter
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has
Exception java.io.IOException: Disk quota exceeded
```
It's recommended to remove
celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition
split at default.
### Does this PR introduce _any_ user-facing change?
`celeborn.client.shuffle.mapPartition.split.enabled` is removed to enable
shuffle partition split at default for MapPartition.
### How was this patch tested?
No.
Closes #3217 from SteNicholas/CELEBORN-1969.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../scala/org/apache/celeborn/client/LifecycleManager.scala | 6 ++----
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 10 ----------
docs/configuration/client.md | 1 -
docs/migration.md | 2 ++
.../test/scala/org/apache/celeborn/tests/flink/SplitTest.scala | 1 -
5 files changed, 4 insertions(+), 16 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index f9fe5deb7..9b2ad93a6 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1197,10 +1197,8 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
rangeReadFilter,
userIdentifier,
conf.pushDataTimeoutMs,
- if (getPartitionType(shuffleId) == PartitionType.MAP)
- conf.clientShuffleMapPartitionSplitEnabled
- else true,
- isSegmentGranularityVisible))
+ partitionSplitEnabled = true,
+ isSegmentGranularityVisible = isSegmentGranularityVisible))
futures.add((future, workerInfo))
}(ec)
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 44d76a1de..1be7a15e6 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1404,7 +1404,6 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
def clientFlinkDataCompressionEnabled: Boolean =
get(CLIENT_DATA_COMPRESSION_ENABLED)
- def clientShuffleMapPartitionSplitEnabled =
get(CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED)
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
@@ -5860,15 +5859,6 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(32)
- val CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED: ConfigEntry[Boolean] =
- buildConf("celeborn.client.shuffle.mapPartition.split.enabled")
- .categories("client")
- .doc(
- "whether to enable shuffle partition split. Currently, this only
applies to MapPartition.")
- .version("0.3.1")
- .booleanConf
- .createWithDefault(false)
-
val CLIENT_CHUNK_PREFETCH_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.chunk.prefetch.enabled")
.categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index ccb794319..acf5a59a4 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -104,7 +104,6 @@ license: |
| celeborn.client.shuffle.dynamicResourceFactor | 0.5 | false | The
ChangePartitionManager will check whether (unavailable workers / shuffle
allocated workers) is more than the factor before obtaining candidate workers
from the requestSlots RPC response when
`celeborn.client.shuffle.dynamicResourceEnabled` set true | 0.6.0 | |
| celeborn.client.shuffle.expired.checkInterval | 60s | false | Interval for
client to check expired shuffles. | 0.3.0 |
celeborn.shuffle.expired.checkInterval |
| celeborn.client.shuffle.manager.port | 0 | false | Port used by the
LifecycleManager on the Driver. | 0.3.0 | celeborn.shuffle.manager.port |
-| celeborn.client.shuffle.mapPartition.split.enabled | false | false | whether
to enable shuffle partition split. Currently, this only applies to
MapPartition. | 0.3.1 | |
| celeborn.client.shuffle.partition.type | REDUCE | false | Type of shuffle's
partition. | 0.3.0 | celeborn.shuffle.partition.type |
| celeborn.client.shuffle.partitionSplit.mode | SOFT | false | soft: the
shuffle file size might be larger than split threshold. hard: the shuffle file
size will be limited to split threshold. | 0.3.0 |
celeborn.shuffle.partitionSplit.mode |
| celeborn.client.shuffle.partitionSplit.threshold | 1G | false | Shuffle file
size threshold, if file size exceeds this, trigger split. | 0.3.0 |
celeborn.shuffle.partitionSplit.threshold |
diff --git a/docs/migration.md b/docs/migration.md
index d53495280..616974d69 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -41,6 +41,8 @@ license: |
- Since 0.6.0, Celeborn changed `celeborn.<module>.io.mode` optional, of which
the default value changed from `NIO` to `EPOLL` if epoll mode is available,
falling back to `NIO` otherwise.
+- Since 0.6.0, Celeborn removed
`celeborn.client.shuffle.mapPartition.split.enabled` to enable shuffle
partition split at default for MapPartition.
+
- Since 0.6.0, Celeborn has introduced a new RESTful API namespace: /api/v1,
which uses the application/json media type for requests and responses.
The `celeborn-openapi-client` SDK is also available to help users interact
with the new RESTful APIs.
The legacy RESTful APIs have been deprecated and will be removed in future
releases.
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala
index 83c9a888c..62ee5c423 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala
@@ -65,7 +65,6 @@ class SplitTest extends AnyFunSuite with Logging with
MiniClusterFeature
configuration.setString("restart-strategy.fixed-delay.attempts", "50")
configuration.setString("restart-strategy.fixed-delay.delay", "5s")
configuration.setString(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key,
"10k")
-
configuration.setString(CelebornConf.CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED.key,
"true")
val env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
env.getConfig.setParallelism(parallelism)
SplitHelper.runSplitRead(env)