This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 82022a942 [CELEBORN-1362] Remove unnecessary configuration
celeborn.client.flink.inputGate.minMemory and
celeborn.client.flink.resultPartition.minMemory
82022a942 is described below
commit 82022a94277c3fa9cb06f2c77c804ca9efec9d82
Author: SteNicholas <[email protected]>
AuthorDate: Mon Apr 1 11:15:14 2024 +0800
[CELEBORN-1362] Remove unnecessary configuration
celeborn.client.flink.inputGate.minMemory and
celeborn.client.flink.resultPartition.minMemory
### What changes were proposed in this pull request?
Remove unnecessary configuration
`celeborn.client.flink.inputGate.minMemory` and
`celeborn.client.flink.resultPartition.minMemory`.
### Why are the changes needed?
`celeborn.client.flink.inputGate.minMemory` and
`celeborn.client.flink.resultPartition.minMemory` are configured as min memory
reserved at present. Meanwhile, `celeborn.client.flink.inputGate.memory` should
be at least `networkBufferSize * MIN_BUFFERS_PER_GATE` bytes, and
`celeborn.client.flink.resultPartition.memory` should be at least
`networkBufferSize * MIN_BUFFERS_PER_PARTITION` bytes. Therefore,
`celeborn.client.flink.inputGate.minMemory` and
`celeborn.client.flink.resultPartiti [...]
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`PluginSideConfSuiteJ#testCoalesce`
Closes #2433 from SteNicholas/CELEBORN-1362.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../AbstractRemoteShuffleInputGateFactory.java | 16 +++-------------
...AbstractRemoteShuffleResultPartitionFactory.java | 18 ++++--------------
.../celeborn/plugin/flink/PluginSideConfSuiteJ.java | 6 ------
.../org/apache/celeborn/common/CelebornConf.scala | 21 ---------------------
docs/configuration/client.md | 2 --
docs/migration.md | 2 +-
6 files changed, 8 insertions(+), 57 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index c3bc6c298..fcf8907a4 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -66,18 +66,8 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
public AbstractRemoteShuffleInputGateFactory(
CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
this.celebornConf = conf;
- long configuredMemorySize = celebornConf.clientFlinkMemoryPerInputGate();
- long minConfiguredMemorySize =
celebornConf.clientFlinkMemoryPerInputGateMin();
- if (configuredMemorySize < minConfiguredMemorySize) {
- throw new IllegalArgumentException(
- String.format(
- "Insufficient network memory per input gate, please increase %s
to at " + "least %s.",
- CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
- celebornConf.clientFlinkMemoryPerInputGateMin()));
- }
-
- this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize /
networkBufferSize);
- this.supportFloatingBuffers =
celebornConf.clientFlinkInputGateSupportFloatingBuffer();
+ this.numBuffersPerGate =
+ Utils.checkedDownCast(celebornConf.clientFlinkMemoryPerInputGate() /
networkBufferSize);
if (numBuffersPerGate < MIN_BUFFERS_PER_GATE) {
throw new IllegalArgumentException(
String.format(
@@ -86,7 +76,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
CelebornConf.CLIENT_MEMORY_PER_INPUT_GATE().key(),
networkBufferSize * MIN_BUFFERS_PER_GATE));
}
-
+ this.supportFloatingBuffers =
celebornConf.clientFlinkInputGateSupportFloatingBuffer();
this.networkBufferSize = networkBufferSize;
this.numConcurrentReading = celebornConf.clientFlinkNumConcurrentReading();
this.networkBufferPool = networkBufferPool;
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
index 35fbf324b..e22d6ae47 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java
@@ -71,18 +71,9 @@ public abstract class
AbstractRemoteShuffleResultPartitionFactory {
ResultPartitionManager partitionManager,
BufferPoolFactory bufferPoolFactory,
int networkBufferSize) {
- long configuredMemorySize =
celebornConf.clientFlinkMemoryPerResultPartition();
- long minConfiguredMemorySize =
celebornConf.clientFlinkMemoryPerResultPartitionMin();
- if (configuredMemorySize < minConfiguredMemorySize) {
- throw new IllegalArgumentException(
- String.format(
- "Insufficient network memory per result partition, please
increase %s "
- + "to at least %s.",
- CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(),
minConfiguredMemorySize));
- }
-
- this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize /
networkBufferSize);
- this.supportFloatingBuffers =
celebornConf.clientFlinkResultPartitionSupportFloatingBuffer();
+ this.numBuffersPerPartition =
+ Utils.checkedDownCast(
+ celebornConf.clientFlinkMemoryPerResultPartition() /
networkBufferSize);
if (numBuffersPerPartition < MIN_BUFFERS_PER_PARTITION) {
throw new IllegalArgumentException(
String.format(
@@ -91,11 +82,10 @@ public abstract class
AbstractRemoteShuffleResultPartitionFactory {
CelebornConf.CLIENT_MEMORY_PER_RESULT_PARTITION().key(),
networkBufferSize * MIN_BUFFERS_PER_PARTITION));
}
-
+ this.supportFloatingBuffers =
celebornConf.clientFlinkResultPartitionSupportFloatingBuffer();
this.partitionManager = partitionManager;
this.bufferPoolFactory = bufferPoolFactory;
this.networkBufferSize = networkBufferSize;
-
this.compressionCodec = celebornConf.shuffleCompressionCodec().name();
}
diff --git
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
index 99a6a83cd..4518d4a92 100644
---
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
+++
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginSideConfSuiteJ.java
@@ -29,8 +29,6 @@ public class PluginSideConfSuiteJ {
public void testCoalesce() {
Configuration flinkConf = new Configuration();
CelebornConf celebornConf = FlinkUtils.toCelebornConf(flinkConf);
- Assert.assertEquals(8 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartitionMin());
- Assert.assertEquals(8 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGateMin());
Assert.assertEquals(Integer.MAX_VALUE,
celebornConf.clientFlinkNumConcurrentReading());
Assert.assertEquals(64 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartition());
Assert.assertEquals(32 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGate());
@@ -38,8 +36,6 @@ public class PluginSideConfSuiteJ {
Assert.assertTrue(celebornConf.clientFlinkDataCompressionEnabled());
Assert.assertEquals("LZ4", celebornConf.shuffleCompressionCodec().name());
- flinkConf.setString("remote-shuffle.job.min.memory-per-partition", "16m");
- flinkConf.setString("remote-shuffle.job.min.memory-per-gate", "17m");
flinkConf.setString("remote-shuffle.job.concurrent-readings-per-gate",
"12323");
flinkConf.setString("remote-shuffle.job.memory-per-partition", "1888m");
flinkConf.setString("remote-shuffle.job.memory-per-gate", "176m");
@@ -48,8 +44,6 @@ public class PluginSideConfSuiteJ {
flinkConf.setString("remote-shuffle.job.compression.codec", "ZSTD");
celebornConf = FlinkUtils.toCelebornConf(flinkConf);
- Assert.assertEquals(16 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartitionMin());
- Assert.assertEquals(17 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGateMin());
Assert.assertEquals(12323, celebornConf.clientFlinkNumConcurrentReading());
Assert.assertEquals(1888 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerResultPartition());
Assert.assertEquals(176 * 1024 * 1024,
celebornConf.clientFlinkMemoryPerInputGate());
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 8e849a4b0..1f838bbfc 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1127,9 +1127,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def testPushReplicaDataTimeout: Boolean =
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
def testAlternative: String = get(TEST_ALTERNATIVE.key, "celeborn")
- def clientFlinkMemoryPerResultPartitionMin: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION_MIN)
def clientFlinkMemoryPerResultPartition: Long =
get(CLIENT_MEMORY_PER_RESULT_PARTITION)
- def clientFlinkMemoryPerInputGateMin: Long =
get(CLIENT_MEMORY_PER_INPUT_GATE_MIN)
def clientFlinkMemoryPerInputGate: Long = get(CLIENT_MEMORY_PER_INPUT_GATE)
def clientFlinkNumConcurrentReading: Int =
get(CLIENT_NUM_CONCURRENT_READINGS)
def clientFlinkInputGateSupportFloatingBuffer: Boolean =
@@ -4490,25 +4488,6 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
- // Flink specific client configurations.
- val CLIENT_MEMORY_PER_RESULT_PARTITION_MIN: ConfigEntry[Long] =
- buildConf("celeborn.client.flink.resultPartition.minMemory")
- .withAlternative("remote-shuffle.job.min.memory-per-partition")
- .categories("client")
- .version("0.3.0")
- .doc("Min memory reserved for a result partition.")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("8m")
-
- val CLIENT_MEMORY_PER_INPUT_GATE_MIN: ConfigEntry[Long] =
- buildConf("celeborn.client.flink.inputGate.minMemory")
- .withAlternative("remote-shuffle.job.min.memory-per-gate")
- .categories("client")
- .doc("Min memory reserved for a input gate.")
- .version("0.3.0")
- .bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("8m")
-
val CLIENT_NUM_CONCURRENT_READINGS: ConfigEntry[Int] =
buildConf("celeborn.client.flink.inputGate.concurrentReadings")
.withAlternative("remote-shuffle.job.concurrent-readings-per-gate")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 20aa71580..761b48138 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -36,10 +36,8 @@ license: |
| celeborn.client.flink.compression.enabled | true | false | Whether to
compress data in Flink plugin. | 0.3.0 |
remote-shuffle.job.enable-data-compression |
| celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false |
Max concurrent reading channels for a input gate. | 0.3.0 |
remote-shuffle.job.concurrent-readings-per-gate |
| celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a
input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate |
-| celeborn.client.flink.inputGate.minMemory | 8m | false | Min memory reserved
for a input gate. | 0.3.0 | remote-shuffle.job.min.memory-per-gate |
| celeborn.client.flink.inputGate.supportFloatingBuffer | true | false |
Whether to support floating buffer in Flink input gates. | 0.3.0 |
remote-shuffle.job.support-floating-buffer-per-input-gate |
| celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved
for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition |
-| celeborn.client.flink.resultPartition.minMemory | 8m | false | Min memory
reserved for a result partition. | 0.3.0 |
remote-shuffle.job.min.memory-per-partition |
| celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false |
Whether to support floating buffer for result partitions. | 0.3.0 |
remote-shuffle.job.support-floating-buffer-per-output-gate |
| celeborn.client.mr.pushData.max | 32m | false | Max size for a push data
sent from mr client. | 0.4.0 | |
| celeborn.client.push.buffer.initial.size | 8k | false | | 0.3.0 |
celeborn.push.buffer.initial.size |
diff --git a/docs/migration.md b/docs/migration.md
index 0c6214da2..93802dd52 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -36,7 +36,7 @@ license: |
- Since 0.5.0, Celeborn deprecate `celeborn.quota.configuration.path`. Please
use `celeborn.dynamicConfig.store.fs.path` instead.
-- Since 0.5.0, Celeborn client removes configuration
`celeborn.client.push.splitPartition.threads`.
+- Since 0.5.0, Celeborn client removes configuration
`celeborn.client.push.splitPartition.threads`,
`celeborn.client.flink.inputGate.minMemory` and
`celeborn.client.flink.resultPartition.minMemory`.
## Upgrading from 0.4.0 to 0.4.1