This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 82022a942 [CELEBORN-1362] Remove unnecessary configuration 
celeborn.client.flink.inputGate.minMemory and 
celeborn.client.flink.resultPartition.minMemory
82022a942 is described below

commit 82022a94277c3fa9cb06f2c77c804ca9efec9d82
Author: SteNicholas <[email protected]>
AuthorDate: Mon Apr 1 11:15:14 2024 +0800

    [CELEBORN-1362] Remove unnecessary configuration 
celeborn.client.flink.inputGate.minMemory and 
celeborn.client.flink.resultPartition.minMemory
    
    ### What changes were proposed in this pull request?
    
    Remove unnecessary configuration 
`celeborn.client.flink.inputGate.minMemory` and 
`celeborn.client.flink.resultPartition.minMemory`.
    
    ### Why are the changes needed?
    
    `celeborn.client.flink.inputGate.minMemory` and 
`celeborn.client.flink.resultPartition.minMemory` are configured as min memory 
reserved at present. Meanwhile, `celeborn.client.flink.inputGate.memory` should 
be at least `networkBufferSize * MIN_BUFFERS_PER_GATE` bytes, and 
`celeborn.client.flink.resultPartition.memory` should be at least 
`networkBufferSize * MIN_BUFFERS_PER_PARTITION` bytes. Therefore, 
`celeborn.client.flink.inputGate.minMemory` and 
`celeborn.client.flink.resultPartiti [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `PluginSideConfSuiteJ#testCoalesce`
    
    Closes #2433 from SteNicholas/CELEBORN-1362.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../AbstractRemoteShuffleInputGateFactory.java      | 16 +++-------------
 ...AbstractRemoteShuffleResultPartitionFactory.java | 18 ++++--------------
 .../celeborn/plugin/flink/PluginSideConfSuiteJ.java |  6 ------
 .../org/apache/celeborn/common/CelebornConf.scala   | 21 ---------------------
 docs/configuration/client.md                        |  2 --
 docs/migration.md                                   |  2 +-
 6 files changed, 8 insertions(+), 57 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index c3bc6c298..fcf8907a4 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -66,18 +66,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
   public AbstractRemoteShuffleInputGateFactory(
       CelebornConf conf, NetworkBufferPool networkBufferPool, int 
networkBufferSize) {
     this.celebornConf = conf;
-    long configuredMemorySize = celebornConf.clientFlinkMemoryPerInputGate();
-    long minConfiguredMemorySize = 
celebornConf.clientFlinkMemoryPerInputGateMin();
-    if (configuredMemorySize < minConfiguredMemorySize) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Insufficient network memory per input gate, please increase %s 
to at " + "least %s.",
-              CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
-              celebornConf.clientFlinkMemoryPerInputGateMin()));
-    }
-
-    this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize / 
networkBufferSize);
-    this.supportFloatingBuffers = 
celebornConf.clientFlinkInputGateSupportFloatingBuffer();
+    this.numBuffersPerGate =
+        Utils.checkedDownCast(celebornConf.clientFlinkMemoryPerInputGate() / 
networkBufferSize);
     if (numBuffersPerGate < MIN_BUFFERS_PER_GATE) {
       throw new IllegalArgumentException(
           String.format(
@@ -86,7 +76,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
               CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
               networkBufferSize * MIN_BUFFERS_PER_GATE));
     }
-
+    this.supportFloatingBuffers = 
celebornConf.clientFlinkInputGateSupportFloatingBuffer();
     this.networkBufferSize = networkBufferSize;
     this.numConcurrentReading = celebornConf.clientFlinkNumConcurrentReading();
     this.networkBufferPool = networkBufferPool;
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
index 35fbf324b..e22d6ae47 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
@@ -71,18 +71,9 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
       ResultPartitionManager partitionManager,
       BufferPoolFactory bufferPoolFactory,
       int networkBufferSize) {
-    long configuredMemorySize = 
celebornConf.clientFlinkMemoryPerResultPartition();
-    long minConfiguredMemorySize = 
celebornConf.clientFlinkMemoryPerResultPartitionMin();
-    if (configuredMemorySize < minConfiguredMemorySize) {
-      throw new IllegalArgumentException(
-          String.format(
-              "Insufficient network memory per result partition, please 
increase %s "
-                  + "to at least %s.",
-              CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(), 
minConfiguredMemorySize));
-    }
-
-    this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize / 
networkBufferSize);
-    this.supportFloatingBuffers = 
celebornConf.clientFlinkResultPartitionSupportFloatingBuffer();
+    this.numBuffersPerPartition =
+        Utils.checkedDownCast(
+            celebornConf.clientFlinkMemoryPerResultPartition() / 
networkBufferSize);
     if (numBuffersPerPartition < MIN_BUFFERS_PER_PARTITION) {
       throw new IllegalArgumentException(
           String.format(
@@ -91,11 +82,10 @@ public abstract class 
AbstractRemoteShuffleResultPartitionFactory {
               CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(),
               networkBufferSize * MIN_BUFFERS_PER_PARTITION));
     }
-
+    this.supportFloatingBuffers = 
celebornConf.clientFlinkResultPartitionSupportFloatingBuffer();
     this.partitionManager = partitionManager;
     this.bufferPoolFactory = bufferPoolFactory;
     this.networkBufferSize = networkBufferSize;
-
     this.compressionCodec = celebornConf.shuffleCompressionCodec().name();
   }
 
diff --git 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
index 99a6a83cd..4518d4a92 100644
--- 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
+++ 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
@@ -29,8 +29,6 @@ public class PluginSideConfSuiteJ {
   public void testCoalesce() {
     Configuration flinkConf = new Configuration();
     CelebornConf celebornConf = FlinkUtils.toCelebornConf(flinkConf);
-    Assert.assertEquals(8 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerResultPartitionMin());
-    Assert.assertEquals(8 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerInputGateMin());
     Assert.assertEquals(Integer.MAX_VALUE, 
celebornConf.clientFlinkNumConcurrentReading());
     Assert.assertEquals(64 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerResultPartition());
     Assert.assertEquals(32 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerInputGate());
@@ -38,8 +36,6 @@ public class PluginSideConfSuiteJ {
     Assert.assertTrue(celebornConf.clientFlinkDataCompressionEnabled());
     Assert.assertEquals("LZ4", celebornConf.shuffleCompressionCodec().name());
 
-    flinkConf.setString("remote-shuffle.job.min.memory-per-partition", "16m");
-    flinkConf.setString("remote-shuffle.job.min.memory-per-gate", "17m");
     flinkConf.setString("remote-shuffle.job.concurrent-readings-per-gate", 
"12323");
     flinkConf.setString("remote-shuffle.job.memory-per-partition", "1888m");
     flinkConf.setString("remote-shuffle.job.memory-per-gate", "176m");
@@ -48,8 +44,6 @@ public class PluginSideConfSuiteJ {
     flinkConf.setString("remote-shuffle.job.compression.codec", "ZSTD");
 
     celebornConf = FlinkUtils.toCelebornConf(flinkConf);
-    Assert.assertEquals(16 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerResultPartitionMin());
-    Assert.assertEquals(17 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerInputGateMin());
     Assert.assertEquals(12323, celebornConf.clientFlinkNumConcurrentReading());
     Assert.assertEquals(1888 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerResultPartition());
     Assert.assertEquals(176 * 1024 * 1024, 
celebornConf.clientFlinkMemoryPerInputGate());
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 8e849a4b0..1f838bbfc 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1127,9 +1127,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def testPushReplicaDataTimeout: Boolean = 
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
   def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
   def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
-  def clientFlinkMemoryPerResultPartitionMin: Long = 
get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
   def clientFlinkMemoryPerResultPartition: Long = 
get(CLIENT_MEMORY_PER_RESULT_PARTITION)
-  def clientFlinkMemoryPerInputGateMin: Long = 
get(CLIENT_MEMORY_PER_INPUT_GATE_MIN)
   def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
   def clientFlinkNumConcurrentReading: Int = 
get(CLIENT_NUM_CONCURRENT_READINGS)
   def clientFlinkInputGateSupportFloatingBuffer: Boolean =
@@ -4490,25 +4488,6 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  // Flink specific client configurations.
-  val CLIENT_MEMORY_PER_RESULT_PARTITION_MIN: ConfigEntry[Long] =
-    buildConf("celeborn.client.flink.resultPartition.minMemory")
-      .withAlternative("remote-shuffle.job.min.memory-per-partition")
-      .categories("client")
-      .version("0.3.0")
-      .doc("Min memory reserved for a result partition.")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("8m")
-
-  val CLIENT_MEMORY_PER_INPUT_GATE_MIN: ConfigEntry[Long] =
-    buildConf("celeborn.client.flink.inputGate.minMemory")
-      .withAlternative("remote-shuffle.job.min.memory-per-gate")
-      .categories("client")
-      .doc("Min memory reserved for a input gate.")
-      .version("0.3.0")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("8m")
-
   val CLIENT_NUM_CONCURRENT_READINGS: ConfigEntry[Int] =
     buildConf("celeborn.client.flink.inputGate.concurrentReadings")
       .withAlternative("remote-shuffle.job.concurrent-readings-per-gate")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 20aa71580..761b48138 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -36,10 +36,8 @@ license: |
 | celeborn.client.flink.compression.enabled | true | false | Whether to 
compress data in Flink plugin. | 0.3.0 | 
remote-shuffle.job.enable-data-compression | 
 | celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false | 
Max concurrent reading channels for a input gate. | 0.3.0 | 
remote-shuffle.job.concurrent-readings-per-gate | 
 | celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a 
input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate | 
-| celeborn.client.flink.inputGate.minMemory | 8m | false | Min memory reserved 
for a input gate. | 0.3.0 | remote-shuffle.job.min.memory-per-gate | 
 | celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | 
Whether to support floating buffer in Flink input gates. | 0.3.0 | 
remote-shuffle.job.support-floating-buffer-per-input-gate | 
 | celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved 
for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition | 
-| celeborn.client.flink.resultPartition.minMemory | 8m | false | Min memory 
reserved for a result partition. | 0.3.0 | 
remote-shuffle.job.min.memory-per-partition | 
 | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | 
Whether to support floating buffer for result partitions. | 0.3.0 | 
remote-shuffle.job.support-floating-buffer-per-output-gate | 
 | celeborn.client.mr.pushData.max | 32m | false | Max size for a push data 
sent from mr client. | 0.4.0 |  | 
 | celeborn.client.push.buffer.initial.size | 8k | false |  | 0.3.0 | 
celeborn.push.buffer.initial.size | 
diff --git a/docs/migration.md b/docs/migration.md
index 0c6214da2..93802dd52 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -36,7 +36,7 @@ license: |
 
 - Since 0.5.0, Celeborn deprecate `celeborn.quota.configuration.path`. Please 
use `celeborn.dynamicConfig.store.fs.path` instead.
 
-- Since 0.5.0, Celeborn client removes configuration 
`celeborn.client.push.splitPartition.threads`.
+- Since 0.5.0, Celeborn client removes configuration 
`celeborn.client.push.splitPartition.threads`, 
`celeborn.client.flink.inputGate.minMemory` and 
`celeborn.client.flink.resultPartition.minMemory`.
 
 ## Upgrading from 0.4.0 to 0.4.1
 

Reply via email to