This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 553b3abc3 [CELEBORN-1969] Remove 
celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition 
split at default for MapPartition
553b3abc3 is described below

commit 553b3abc3bfb621da3dd331901d07f24d53d724d
Author: SteNicholas <[email protected]>
AuthorDate: Tue Apr 22 11:37:53 2025 +0800

    [CELEBORN-1969] Remove celeborn.client.shuffle.mapPartition.split.enabled 
to enable shuffle partition split at default for MapPartition
    
    ### What changes were proposed in this pull request?
    
    Remove `celeborn.client.shuffle.mapPartition.split.enabled` to enable 
shuffle partition split at default for MapPartition.
    
    ### Why are the changes needed?
    
    The default value of `celeborn.client.shuffle.mapPartition.split.enabled` 
is false, which causes that file writer fills the disk for PushData as follows:
    
    ```
    2025-04-15 20:20:56,759 [push-server-6-4] WARN  worker.PushDataHandler - 
[handlePUSH_DATA] fileWriter 
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has 
Exception java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [push-server-6-6] WARN  worker.PushDataHandler - 
[handlePUSH_DATA] fileWriter 
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has 
Exception java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [push-server-6-4] WARN  worker.PushDataHandler - 
[handlePUSH_DATA] fileWriter 
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has 
Exception java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [push-server-6-3] WARN  worker.PushDataHandler - 
[handlePUSH_DATA] fileWriter 
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has 
Exception java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-6] ERROR 
storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, 
report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [push-server-6-4] WARN  worker.PushDataHandler - 
[handlePUSH_DATA] fileWriter 
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-1704-0-0 partition-writer has 
Exception java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [push-server-6-3] WARN  worker.PushDataHandler - 
[handlePUSH_DATA] fileWriter 
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has 
Exception java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-0] ERROR 
storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, 
report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
    2025-04-15 20:20:56,760 [push-server-6-6] WARN  worker.PushDataHandler - 
[handlePUSH_DATA] fileWriter 
1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has 
Exception java.io.IOException: Disk quota exceeded
    ```
    
    It's recommended to remove 
celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition 
split at default.
    
    ### Does this PR introduce _any_ user-facing change?
    
    `celeborn.client.shuffle.mapPartition.split.enabled` is removed to enable 
shuffle partition split at default for MapPartition.
    
    ### How was this patch tested?
    
    No.
    
    Closes #3217 from SteNicholas/CELEBORN-1969.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../scala/org/apache/celeborn/client/LifecycleManager.scala    |  6 ++----
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala   | 10 ----------
 docs/configuration/client.md                                   |  1 -
 docs/migration.md                                              |  2 ++
 .../test/scala/org/apache/celeborn/tests/flink/SplitTest.scala |  1 -
 5 files changed, 4 insertions(+), 16 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index f9fe5deb7..9b2ad93a6 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -1197,10 +1197,8 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
             rangeReadFilter,
             userIdentifier,
             conf.pushDataTimeoutMs,
-            if (getPartitionType(shuffleId) == PartitionType.MAP)
-              conf.clientShuffleMapPartitionSplitEnabled
-            else true,
-            isSegmentGranularityVisible))
+            partitionSplitEnabled = true,
+            isSegmentGranularityVisible = isSegmentGranularityVisible))
         futures.add((future, workerInfo))
       }(ec)
     }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 44d76a1de..1be7a15e6 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1404,7 +1404,6 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def clientFlinkResultPartitionSupportFloatingBuffer: Boolean =
     get(CLIENT_RESULT_PARTITION_SUPPORT_FLOATING_BUFFER)
   def clientFlinkDataCompressionEnabled: Boolean = 
get(CLIENT_DATA_COMPRESSION_ENABLED)
-  def clientShuffleMapPartitionSplitEnabled = 
get(CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED)
   def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
   def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
 
@@ -5860,15 +5859,6 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(32)
 
-  val CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED: ConfigEntry[Boolean] =
-    buildConf("celeborn.client.shuffle.mapPartition.split.enabled")
-      .categories("client")
-      .doc(
-        "whether to enable shuffle partition split. Currently, this only 
applies to MapPartition.")
-      .version("0.3.1")
-      .booleanConf
-      .createWithDefault(false)
-
   val CLIENT_CHUNK_PREFETCH_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.client.chunk.prefetch.enabled")
       .categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index ccb794319..acf5a59a4 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -104,7 +104,6 @@ license: |
 | celeborn.client.shuffle.dynamicResourceFactor | 0.5 | false | The 
ChangePartitionManager will check whether (unavailable workers / shuffle 
allocated workers) is more than the factor before obtaining candidate workers 
from the requestSlots RPC response when 
`celeborn.client.shuffle.dynamicResourceEnabled` set true | 0.6.0 |  | 
 | celeborn.client.shuffle.expired.checkInterval | 60s | false | Interval for 
client to check expired shuffles. | 0.3.0 | 
celeborn.shuffle.expired.checkInterval | 
 | celeborn.client.shuffle.manager.port | 0 | false | Port used by the 
LifecycleManager on the Driver. | 0.3.0 | celeborn.shuffle.manager.port | 
-| celeborn.client.shuffle.mapPartition.split.enabled | false | false | whether 
to enable shuffle partition split. Currently, this only applies to 
MapPartition. | 0.3.1 |  | 
 | celeborn.client.shuffle.partition.type | REDUCE | false | Type of shuffle's 
partition. | 0.3.0 | celeborn.shuffle.partition.type | 
 | celeborn.client.shuffle.partitionSplit.mode | SOFT | false | soft: the 
shuffle file size might be larger than split threshold. hard: the shuffle file 
size will be limited to split threshold. | 0.3.0 | 
celeborn.shuffle.partitionSplit.mode | 
 | celeborn.client.shuffle.partitionSplit.threshold | 1G | false | Shuffle file 
size threshold, if file size exceeds this, trigger split. | 0.3.0 | 
celeborn.shuffle.partitionSplit.threshold | 
diff --git a/docs/migration.md b/docs/migration.md
index d53495280..616974d69 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -41,6 +41,8 @@ license: |
 
 - Since 0.6.0, Celeborn changed `celeborn.<module>.io.mode` optional, of which 
the default value changed from `NIO` to `EPOLL` if epoll mode is available, 
falling back to `NIO` otherwise.
 
+- Since 0.6.0, Celeborn removed 
`celeborn.client.shuffle.mapPartition.split.enabled` to enable shuffle 
partition split at default for MapPartition.
+
 - Since 0.6.0, Celeborn has introduced a new RESTful API namespace: /api/v1, 
which uses the application/json media type for requests and responses.
    The `celeborn-openapi-client` SDK is also available to help users interact 
with the new RESTful APIs.
    The legacy RESTful APIs have been deprecated and will be removed in future 
releases.
diff --git 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala
index 83c9a888c..62ee5c423 100644
--- 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala
+++ 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/SplitTest.scala
@@ -65,7 +65,6 @@ class SplitTest extends AnyFunSuite with Logging with 
MiniClusterFeature
     configuration.setString("restart-strategy.fixed-delay.attempts", "50")
     configuration.setString("restart-strategy.fixed-delay.delay", "5s")
     
configuration.setString(CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key, 
"10k")
-    
configuration.setString(CelebornConf.CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED.key,
 "true")
     val env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
     env.getConfig.setParallelism(parallelism)
     SplitHelper.runSplitRead(env)

Reply via email to