This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cb76c4de4 [CELEBORN-350][FLINK] Add PluginConf to be compatible with
old configurations
cb76c4de4 is described below
commit cb76c4de4c8ac27d90034ed07d9733aa080ab1fe
Author: zhongqiangchen <[email protected]>
AuthorDate: Tue Feb 28 20:36:11 2023 +0800
[CELEBORN-350][FLINK] Add PluginConf to be compatible with old
configurations
---
.../celeborn/plugin/flink/config/PluginConf.java | 51 ++++++++++++++++++
.../celeborn/plugin/flink/PluginConfSuiteJ.java | 63 ++++++++++++++++++++++
.../flink/RemoteShuffleInputGateFactory.java | 23 ++++----
.../flink/RemoteShuffleResultPartitionFactory.java | 26 +++++----
.../plugin/flink/RemoteShuffleServiceFactory.java | 5 +-
.../org/apache/celeborn/common/CelebornConf.scala | 9 ----
docs/configuration/client.md | 1 -
7 files changed, 148 insertions(+), 30 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
new file mode 100644
index 000000000..f293de6c0
--- /dev/null
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public enum PluginConf {
+ MIN_MEMORY_PER_PARTITION("remote-shuffle.job.min.memory-per-partition", "",
"8m"),
+ MIN_MEMORY_PER_GATE("remote-shuffle.job.min.memory-per-gate", "", "8m"),
+ NUM_CONCURRENT_READINGS(
+ "remote-shuffle.job.concurrent-readings-per-gate", "",
String.valueOf(Integer.MAX_VALUE)),
+ MEMORY_PER_RESULT_PARTITION("remote-shuffle.job.memory-per-partition", "",
"64m"),
+ MEMORY_PER_INPUT_GATE("remote-shuffle.job.memory-per-gate", "", "32m"),
+ ENABLE_DATA_COMPRESSION("remote-shuffle.job.enable-data-compression", "",
"true"),
+ REMOTE_SHUFFLE_COMPRESSION_CODEC(
+ "remote-shuffle.job.compression.codec",
+ CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(),
+ "LZ4"),
+ ;
+
+ public String name;
+ public String alterName;
+ public String defaultValue;
+
+ PluginConf(String name, String alterName, String defaultValue) {
+ this.name = name;
+ this.alterName = alterName;
+ this.defaultValue = defaultValue;
+ }
+
+ public static String getValue(Configuration flinkConf, PluginConf conf) {
+ return flinkConf.getString(conf.name, conf.defaultValue);
+ }
+}
diff --git
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
new file mode 100644
index 000000000..c6eac4920
--- /dev/null
+++
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.celeborn.plugin.flink.config.PluginConf;
+
+public class PluginConfSuiteJ {
+ @Test
+ public void testColesce() {
+ Configuration flinkConf = new Configuration();
+ Assert.assertEquals("8m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_PARTITION));
+ Assert.assertEquals("8m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_GATE));
+ Assert.assertTrue(
+ Integer.MAX_VALUE
+ == Integer.valueOf(PluginConf.getValue(flinkConf,
PluginConf.NUM_CONCURRENT_READINGS)));
+ Assert.assertEquals(
+ "64m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_RESULT_PARTITION));
+ Assert.assertEquals("32m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_INPUT_GATE));
+
+ Assert.assertEquals("true", PluginConf.getValue(flinkConf,
PluginConf.ENABLE_DATA_COMPRESSION));
+ Assert.assertEquals(
+ "LZ4", PluginConf.getValue(flinkConf,
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC));
+
+ flinkConf.setString(PluginConf.MIN_MEMORY_PER_PARTITION.name, "16m");
+ flinkConf.setString(PluginConf.MIN_MEMORY_PER_GATE.name, "17m");
+ flinkConf.setString(PluginConf.NUM_CONCURRENT_READINGS.name, "12323");
+ flinkConf.setString(PluginConf.MEMORY_PER_RESULT_PARTITION.name, "1888m");
+ flinkConf.setString(PluginConf.MEMORY_PER_INPUT_GATE.name, "176m");
+ flinkConf.setString(PluginConf.ENABLE_DATA_COMPRESSION.name, "false");
+ flinkConf.setString(PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC.name,
"lz423");
+ Assert.assertEquals("16m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_PARTITION));
+ Assert.assertEquals("17m", PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_GATE));
+ Assert.assertTrue(
+ 12323
+ == Integer.valueOf(PluginConf.getValue(flinkConf,
PluginConf.NUM_CONCURRENT_READINGS)));
+ Assert.assertEquals(
+ "1888m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_RESULT_PARTITION));
+ Assert.assertEquals("176m", PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_INPUT_GATE));
+ Assert.assertEquals(
+ "false", PluginConf.getValue(flinkConf,
PluginConf.ENABLE_DATA_COMPRESSION));
+ Assert.assertEquals(
+ "lz423", PluginConf.getValue(flinkConf,
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC));
+ }
+}
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 76f535496..a3485721f 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -20,6 +20,7 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -30,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.config.PluginConf;
import org.apache.celeborn.plugin.flink.utils.Utils;
/** Factory class to create {@link RemoteShuffleInputGate}. */
@@ -57,20 +59,23 @@ public class RemoteShuffleInputGateFactory {
/** Sum of buffers. */
private final int numBuffersPerGate;
- public static final String MEMORY_PER_INPUT_GATE = "32m";
- public static final String MIN_MEMORY_PER_GATE = "8m";
- public static final int NUM_CONCURRENT_READINGS = Integer.MAX_VALUE;
private CelebornConf celebornConf;
public RemoteShuffleInputGateFactory(
- CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
+ Configuration flinkConf,
+ CelebornConf conf,
+ NetworkBufferPool networkBufferPool,
+ int networkBufferSize) {
+ this.celebornConf = conf;
long configuredMemorySize =
-
org.apache.celeborn.common.util.Utils.byteStringAsBytes(MEMORY_PER_INPUT_GATE);
+ org.apache.celeborn.common.util.Utils.byteStringAsBytes(
+ PluginConf.getValue(flinkConf, PluginConf.MEMORY_PER_INPUT_GATE));
if (configuredMemorySize < MIN_BUFFERS_PER_GATE) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per input gate, please increase %s
to at " + "least %s.",
- MEMORY_PER_INPUT_GATE, MIN_MEMORY_PER_GATE));
+ PluginConf.MEMORY_PER_INPUT_GATE.name,
+ PluginConf.getValue(flinkConf, PluginConf.MIN_MEMORY_PER_GATE)));
}
this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize /
networkBufferSize);
@@ -79,13 +84,13 @@ public class RemoteShuffleInputGateFactory {
String.format(
"Insufficient network memory per input gate, please increase %s
to at "
+ "least %d bytes.",
- MEMORY_PER_INPUT_GATE, networkBufferSize *
MIN_BUFFERS_PER_GATE));
+ PluginConf.MEMORY_PER_INPUT_GATE.name, networkBufferSize *
MIN_BUFFERS_PER_GATE));
}
this.networkBufferSize = networkBufferSize;
- this.numConcurrentReading = NUM_CONCURRENT_READINGS;
+ this.numConcurrentReading =
+ Integer.valueOf(PluginConf.getValue(flinkConf,
PluginConf.NUM_CONCURRENT_READINGS));
this.networkBufferPool = networkBufferPool;
- this.celebornConf = conf;
}
/** Create {@link RemoteShuffleInputGate} from {@link
InputGateDeploymentDescriptor}. */
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 0fbb92d2d..c2eb26140 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.protocol.CompressionCodec;
+import org.apache.celeborn.plugin.flink.config.PluginConf;
import org.apache.celeborn.plugin.flink.utils.Utils;
/** Factory class to create {@link RemoteShuffleResultPartition}. */
@@ -62,24 +64,26 @@ public class RemoteShuffleResultPartitionFactory {
*/
private final int numBuffersPerPartition;
- private final String minMemorySize = "8m";
+ private String compressionCodec;
public RemoteShuffleResultPartitionFactory(
+ Configuration flinkConf,
CelebornConf celebornConf,
ResultPartitionManager partitionManager,
BufferPoolFactory bufferPoolFactory,
int networkBufferSize) {
long configuredMemorySize =
org.apache.celeborn.common.util.Utils.byteStringAsBytes(
- celebornConf.memoryPerResultPartition());
+ PluginConf.getValue(flinkConf,
PluginConf.MEMORY_PER_RESULT_PARTITION));
long minConfiguredMemorySize =
- org.apache.celeborn.common.util.Utils.byteStringAsBytes(minMemorySize);
+ org.apache.celeborn.common.util.Utils.byteStringAsBytes(
+ PluginConf.getValue(flinkConf,
PluginConf.MIN_MEMORY_PER_PARTITION));
if (configuredMemorySize < minConfiguredMemorySize) {
throw new IllegalArgumentException(
String.format(
"Insufficient network memory per result partition, please
increase %s "
+ "to at least %s.",
- CelebornConf.MEMORY_PER_RESULT_PARTITION().key(),
minConfiguredMemorySize));
+ PluginConf.MEMORY_PER_RESULT_PARTITION.name,
minConfiguredMemorySize));
}
this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize /
networkBufferSize);
@@ -88,13 +92,18 @@ public class RemoteShuffleResultPartitionFactory {
String.format(
"Insufficient network memory per partition, please increase %s
to at "
+ "least %d bytes.",
- CelebornConf.MEMORY_PER_RESULT_PARTITION().key(),
+ PluginConf.MEMORY_PER_RESULT_PARTITION.name,
networkBufferSize * MIN_BUFFERS_PER_PARTITION));
}
this.partitionManager = partitionManager;
this.bufferPoolFactory = bufferPoolFactory;
this.networkBufferSize = networkBufferSize;
+ if (PluginConf.getValue(flinkConf,
PluginConf.ENABLE_DATA_COMPRESSION).equals("false")) {
+ throw new RuntimeException("remote-shuffle.job.enable-data-compression
must be true");
+ }
+ this.compressionCodec =
+ PluginConf.getValue(flinkConf,
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC);
}
public ResultPartition create(
@@ -134,12 +143,11 @@ public class RemoteShuffleResultPartitionFactory {
int numMappers) {
// in flink1.14/1.15, just support LZ4
- if (celebornConf.shuffleCompressionCodec() != CompressionCodec.LZ4) {
- throw new IllegalStateException(
- "Unknown CompressionMethod " +
celebornConf.shuffleCompressionCodec());
+ if (!compressionCodec.equals(CompressionCodec.LZ4.name())) {
+ throw new IllegalStateException("Unknown CompressionMethod " +
compressionCodec);
}
final BufferCompressor bufferCompressor =
- new BufferCompressor(networkBufferSize,
celebornConf.shuffleCompressionCodec().name());
+ new BufferCompressor(networkBufferSize, compressionCodec);
RemoteShuffleDescriptor rsd = (RemoteShuffleDescriptor) shuffleDescriptor;
ResultPartition partition =
new RemoteShuffleResultPartition(
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
index d8ba441c4..4cc02c0fc 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -73,9 +73,10 @@ public class RemoteShuffleServiceFactory
CelebornConf celebornConf = FlinkUtils.toCelebornConf(configuration);
RemoteShuffleResultPartitionFactory resultPartitionFactory =
new RemoteShuffleResultPartitionFactory(
- celebornConf, resultPartitionManager, networkBufferPool,
bufferSize);
+ configuration, celebornConf, resultPartitionManager,
networkBufferPool, bufferSize);
RemoteShuffleInputGateFactory inputGateFactory =
- new RemoteShuffleInputGateFactory(celebornConf, networkBufferPool,
bufferSize);
+ new RemoteShuffleInputGateFactory(
+ configuration, celebornConf, networkBufferPool, bufferSize);
return new RemoteShuffleEnvironment(
networkBufferPool,
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 473ccf13f..55cb37d48 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -754,7 +754,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerDirectMemoryRatioForReadBuffer: Double =
get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
def workerDirectMemoryRatioForShuffleStorage: Double =
get(WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE)
- def memoryPerResultPartition: String = get(MEMORY_PER_RESULT_PARTITION)
def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
@@ -2967,14 +2966,6 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("15s")
- val MEMORY_PER_RESULT_PARTITION: ConfigEntry[String] =
- buildConf("celeborn.client.network.memory.perResultPartition")
- .categories("client")
- .version("0.3.0")
- .doc("The size of network buffers required per result partition. The
minimum valid value is 8M. Usually, several hundreds of megabytes memory is
enough for large scale batch jobs.")
- .stringConf
- .createWithDefault("64m")
-
val WORKER_PARTITION_READ_BUFFERS_MIN: ConfigEntry[Int] =
buildConf("celeborn.worker.partition.initial.readBuffersMin")
.categories("worker")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 34f95b1bb..c63132e3c 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -23,7 +23,6 @@ license: |
| celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add
partition's peer worker into blacklist when push data to slave failed. | 0.3.0
|
| celeborn.client.closeIdleConnections | true | Whether client will close idle
connections. | 0.3.0 |
| celeborn.client.maxRetries | 15 | Max retry times for client to connect
master endpoint | 0.2.0 |
-| celeborn.client.network.memory.perResultPartition | 64m | The size of
network buffers required per result partition. The minimum valid value is 8M.
Usually, several hundreds of megabytes memory is enough for large scale batch
jobs. | 0.3.0 |
| celeborn.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch
request. | 0.2.0 |
| celeborn.fetch.maxRetries | 3 | Max retries of fetch chunk | 0.2.0 |
| celeborn.fetch.timeout | 120s | Timeout for a task to fetch chunk. | 0.2.0 |