This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new cb76c4de4 [CELEBORN-350][FLINK] Add PluginConf to be compatible with 
old configurations
cb76c4de4 is described below

commit cb76c4de4c8ac27d90034ed07d9733aa080ab1fe
Author: zhongqiangchen <[email protected]>
AuthorDate: Tue Feb 28 20:36:11 2023 +0800

    [CELEBORN-350][FLINK] Add PluginConf to be compatible with old 
configurations
---
 .../celeborn/plugin/flink/config/PluginConf.java   | 51 ++++++++++++++++++
 .../celeborn/plugin/flink/PluginConfSuiteJ.java    | 63 ++++++++++++++++++++++
 .../flink/RemoteShuffleInputGateFactory.java       | 23 ++++----
 .../flink/RemoteShuffleResultPartitionFactory.java | 26 +++++----
 .../plugin/flink/RemoteShuffleServiceFactory.java  |  5 +-
 .../org/apache/celeborn/common/CelebornConf.scala  |  9 ----
 docs/configuration/client.md                       |  1 -
 7 files changed, 148 insertions(+), 30 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
new file mode 100644
index 000000000..f293de6c0
--- /dev/null
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/config/PluginConf.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.config;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.celeborn.common.CelebornConf;
+
+public enum PluginConf {
+  MIN_MEMORY_PER_PARTITION("remote-shuffle.job.min.memory-per-partition", "", 
"8m"),
+  MIN_MEMORY_PER_GATE("remote-shuffle.job.min.memory-per-gate", "", "8m"),
+  NUM_CONCURRENT_READINGS(
+      "remote-shuffle.job.concurrent-readings-per-gate", "", 
String.valueOf(Integer.MAX_VALUE)),
+  MEMORY_PER_RESULT_PARTITION("remote-shuffle.job.memory-per-partition", "", 
"64m"),
+  MEMORY_PER_INPUT_GATE("remote-shuffle.job.memory-per-gate", "", "32m"),
+  ENABLE_DATA_COMPRESSION("remote-shuffle.job.enable-data-compression", "", 
"true"),
+  REMOTE_SHUFFLE_COMPRESSION_CODEC(
+      "remote-shuffle.job.compression.codec",
+      CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(),
+      "LZ4"),
+  ;
+
+  public String name;
+  public String alterName;
+  public String defaultValue;
+
+  PluginConf(String name, String alterName, String defaultValue) {
+    this.name = name;
+    this.alterName = alterName;
+    this.defaultValue = defaultValue;
+  }
+
+  public static String getValue(Configuration flinkConf, PluginConf conf) {
+    return flinkConf.getString(conf.name, conf.defaultValue);
+  }
+}
diff --git 
a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
new file mode 100644
index 000000000..c6eac4920
--- /dev/null
+++ 
b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/PluginConfSuiteJ.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.celeborn.plugin.flink.config.PluginConf;
+
+public class PluginConfSuiteJ {
+  @Test
+  public void testColesce() {
+    Configuration flinkConf = new Configuration();
+    Assert.assertEquals("8m", PluginConf.getValue(flinkConf, 
PluginConf.MIN_MEMORY_PER_PARTITION));
+    Assert.assertEquals("8m", PluginConf.getValue(flinkConf, 
PluginConf.MIN_MEMORY_PER_GATE));
+    Assert.assertTrue(
+        Integer.MAX_VALUE
+            == Integer.valueOf(PluginConf.getValue(flinkConf, 
PluginConf.NUM_CONCURRENT_READINGS)));
+    Assert.assertEquals(
+        "64m", PluginConf.getValue(flinkConf, 
PluginConf.MEMORY_PER_RESULT_PARTITION));
+    Assert.assertEquals("32m", PluginConf.getValue(flinkConf, 
PluginConf.MEMORY_PER_INPUT_GATE));
+
+    Assert.assertEquals("true", PluginConf.getValue(flinkConf, 
PluginConf.ENABLE_DATA_COMPRESSION));
+    Assert.assertEquals(
+        "LZ4", PluginConf.getValue(flinkConf, 
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC));
+
+    flinkConf.setString(PluginConf.MIN_MEMORY_PER_PARTITION.name, "16m");
+    flinkConf.setString(PluginConf.MIN_MEMORY_PER_GATE.name, "17m");
+    flinkConf.setString(PluginConf.NUM_CONCURRENT_READINGS.name, "12323");
+    flinkConf.setString(PluginConf.MEMORY_PER_RESULT_PARTITION.name, "1888m");
+    flinkConf.setString(PluginConf.MEMORY_PER_INPUT_GATE.name, "176m");
+    flinkConf.setString(PluginConf.ENABLE_DATA_COMPRESSION.name, "false");
+    flinkConf.setString(PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC.name, 
"lz423");
+    Assert.assertEquals("16m", PluginConf.getValue(flinkConf, 
PluginConf.MIN_MEMORY_PER_PARTITION));
+    Assert.assertEquals("17m", PluginConf.getValue(flinkConf, 
PluginConf.MIN_MEMORY_PER_GATE));
+    Assert.assertTrue(
+        12323
+            == Integer.valueOf(PluginConf.getValue(flinkConf, 
PluginConf.NUM_CONCURRENT_READINGS)));
+    Assert.assertEquals(
+        "1888m", PluginConf.getValue(flinkConf, 
PluginConf.MEMORY_PER_RESULT_PARTITION));
+    Assert.assertEquals("176m", PluginConf.getValue(flinkConf, 
PluginConf.MEMORY_PER_INPUT_GATE));
+    Assert.assertEquals(
+        "false", PluginConf.getValue(flinkConf, 
PluginConf.ENABLE_DATA_COMPRESSION));
+    Assert.assertEquals(
+        "lz423", PluginConf.getValue(flinkConf, 
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC));
+  }
+}
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
index 76f535496..a3485721f 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -20,6 +20,7 @@ package org.apache.celeborn.plugin.flink;
 
 import java.io.IOException;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -30,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.config.PluginConf;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /** Factory class to create {@link RemoteShuffleInputGate}. */
@@ -57,20 +59,23 @@ public class RemoteShuffleInputGateFactory {
   /** Sum of buffers. */
   private final int numBuffersPerGate;
 
-  public static final String MEMORY_PER_INPUT_GATE = "32m";
-  public static final String MIN_MEMORY_PER_GATE = "8m";
-  public static final int NUM_CONCURRENT_READINGS = Integer.MAX_VALUE;
   private CelebornConf celebornConf;
 
   public RemoteShuffleInputGateFactory(
-      CelebornConf conf, NetworkBufferPool networkBufferPool, int 
networkBufferSize) {
+      Configuration flinkConf,
+      CelebornConf conf,
+      NetworkBufferPool networkBufferPool,
+      int networkBufferSize) {
+    this.celebornConf = conf;
     long configuredMemorySize =
-        
org.apache.celeborn.common.util.Utils.byteStringAsBytes(MEMORY_PER_INPUT_GATE);
+        org.apache.celeborn.common.util.Utils.byteStringAsBytes(
+            PluginConf.getValue(flinkConf, PluginConf.MEMORY_PER_INPUT_GATE));
     if (configuredMemorySize < MIN_BUFFERS_PER_GATE) {
       throw new IllegalArgumentException(
           String.format(
               "Insufficient network memory per input gate, please increase %s 
to at " + "least %s.",
-              MEMORY_PER_INPUT_GATE, MIN_MEMORY_PER_GATE));
+              PluginConf.MEMORY_PER_INPUT_GATE.name,
+              PluginConf.getValue(flinkConf, PluginConf.MIN_MEMORY_PER_GATE)));
     }
 
     this.numBuffersPerGate = Utils.checkedDownCast(configuredMemorySize / 
networkBufferSize);
@@ -79,13 +84,13 @@ public class RemoteShuffleInputGateFactory {
           String.format(
               "Insufficient network memory per input gate, please increase %s 
to at "
                   + "least %d bytes.",
-              MEMORY_PER_INPUT_GATE, networkBufferSize * 
MIN_BUFFERS_PER_GATE));
+              PluginConf.MEMORY_PER_INPUT_GATE.name, networkBufferSize * 
MIN_BUFFERS_PER_GATE));
     }
 
     this.networkBufferSize = networkBufferSize;
-    this.numConcurrentReading = NUM_CONCURRENT_READINGS;
+    this.numConcurrentReading =
+        Integer.valueOf(PluginConf.getValue(flinkConf, 
PluginConf.NUM_CONCURRENT_READINGS));
     this.networkBufferPool = networkBufferPool;
-    this.celebornConf = conf;
   }
 
   /** Create {@link RemoteShuffleInputGate} from {@link 
InputGateDeploymentDescriptor}. */
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
index 0fbb92d2d..c2eb26140 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.protocol.CompressionCodec;
+import org.apache.celeborn.plugin.flink.config.PluginConf;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /** Factory class to create {@link RemoteShuffleResultPartition}. */
@@ -62,24 +64,26 @@ public class RemoteShuffleResultPartitionFactory {
    */
   private final int numBuffersPerPartition;
 
-  private final String minMemorySize = "8m";
+  private String compressionCodec;
 
   public RemoteShuffleResultPartitionFactory(
+      Configuration flinkConf,
       CelebornConf celebornConf,
       ResultPartitionManager partitionManager,
       BufferPoolFactory bufferPoolFactory,
       int networkBufferSize) {
     long configuredMemorySize =
         org.apache.celeborn.common.util.Utils.byteStringAsBytes(
-            celebornConf.memoryPerResultPartition());
+            PluginConf.getValue(flinkConf, 
PluginConf.MEMORY_PER_RESULT_PARTITION));
     long minConfiguredMemorySize =
-        org.apache.celeborn.common.util.Utils.byteStringAsBytes(minMemorySize);
+        org.apache.celeborn.common.util.Utils.byteStringAsBytes(
+            PluginConf.getValue(flinkConf, 
PluginConf.MIN_MEMORY_PER_PARTITION));
     if (configuredMemorySize < minConfiguredMemorySize) {
       throw new IllegalArgumentException(
           String.format(
               "Insufficient network memory per result partition, please 
increase %s "
                   + "to at least %s.",
-              CelebornConf.MEMORY_PER_RESULT_PARTITION().key(), 
minConfiguredMemorySize));
+              PluginConf.MEMORY_PER_RESULT_PARTITION.name, 
minConfiguredMemorySize));
     }
 
     this.numBuffersPerPartition = Utils.checkedDownCast(configuredMemorySize / 
networkBufferSize);
@@ -88,13 +92,18 @@ public class RemoteShuffleResultPartitionFactory {
           String.format(
               "Insufficient network memory per partition, please increase %s 
to at "
                   + "least %d bytes.",
-              CelebornConf.MEMORY_PER_RESULT_PARTITION().key(),
+              PluginConf.MEMORY_PER_RESULT_PARTITION.name,
               networkBufferSize * MIN_BUFFERS_PER_PARTITION));
     }
 
     this.partitionManager = partitionManager;
     this.bufferPoolFactory = bufferPoolFactory;
     this.networkBufferSize = networkBufferSize;
+    if (PluginConf.getValue(flinkConf, 
PluginConf.ENABLE_DATA_COMPRESSION).equals("false")) {
+      throw new RuntimeException("remote-shuffle.job.enable-data-compression 
must be true");
+    }
+    this.compressionCodec =
+        PluginConf.getValue(flinkConf, 
PluginConf.REMOTE_SHUFFLE_COMPRESSION_CODEC);
   }
 
   public ResultPartition create(
@@ -134,12 +143,11 @@ public class RemoteShuffleResultPartitionFactory {
       int numMappers) {
 
     // in flink1.14/1.15, just support LZ4
-    if (celebornConf.shuffleCompressionCodec() != CompressionCodec.LZ4) {
-      throw new IllegalStateException(
-          "Unknown CompressionMethod " + 
celebornConf.shuffleCompressionCodec());
+    if (!compressionCodec.equals(CompressionCodec.LZ4.name())) {
+      throw new IllegalStateException("Unknown CompressionMethod " + 
compressionCodec);
     }
     final BufferCompressor bufferCompressor =
-        new BufferCompressor(networkBufferSize, 
celebornConf.shuffleCompressionCodec().name());
+        new BufferCompressor(networkBufferSize, compressionCodec);
     RemoteShuffleDescriptor rsd = (RemoteShuffleDescriptor) shuffleDescriptor;
     ResultPartition partition =
         new RemoteShuffleResultPartition(
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
index d8ba441c4..4cc02c0fc 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -73,9 +73,10 @@ public class RemoteShuffleServiceFactory
     CelebornConf celebornConf = FlinkUtils.toCelebornConf(configuration);
     RemoteShuffleResultPartitionFactory resultPartitionFactory =
         new RemoteShuffleResultPartitionFactory(
-            celebornConf, resultPartitionManager, networkBufferPool, 
bufferSize);
+            configuration, celebornConf, resultPartitionManager, 
networkBufferPool, bufferSize);
     RemoteShuffleInputGateFactory inputGateFactory =
-        new RemoteShuffleInputGateFactory(celebornConf, networkBufferPool, 
bufferSize);
+        new RemoteShuffleInputGateFactory(
+            configuration, celebornConf, networkBufferPool, bufferSize);
 
     return new RemoteShuffleEnvironment(
         networkBufferPool,
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 473ccf13f..55cb37d48 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -754,7 +754,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerDirectMemoryRatioForReadBuffer: Double = 
get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
   def workerDirectMemoryRatioForShuffleStorage: Double =
     get(WORKER_DIRECT_MEMORY_RATIO_FOR_SHUFFLE_STORAGE)
-  def memoryPerResultPartition: String = get(MEMORY_PER_RESULT_PARTITION)
 
   def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
 
@@ -2967,14 +2966,6 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("15s")
 
-  val MEMORY_PER_RESULT_PARTITION: ConfigEntry[String] =
-    buildConf("celeborn.client.network.memory.perResultPartition")
-      .categories("client")
-      .version("0.3.0")
-      .doc("The size of network buffers required per result partition. The 
minimum valid value is 8M. Usually, several hundreds of megabytes memory is 
enough for large scale batch jobs.")
-      .stringConf
-      .createWithDefault("64m")
-
   val WORKER_PARTITION_READ_BUFFERS_MIN: ConfigEntry[Int] =
     buildConf("celeborn.worker.partition.initial.readBuffersMin")
       .categories("worker")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 34f95b1bb..c63132e3c 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -23,7 +23,6 @@ license: |
 | celeborn.client.blacklistSlave.enabled | true | When true, Celeborn will add 
partition's peer worker into blacklist when push data to slave failed. | 0.3.0 
| 
 | celeborn.client.closeIdleConnections | true | Whether client will close idle 
connections. | 0.3.0 | 
 | celeborn.client.maxRetries | 15 | Max retry times for client to connect 
master endpoint | 0.2.0 | 
-| celeborn.client.network.memory.perResultPartition | 64m | The size of 
network buffers required per result partition. The minimum valid value is 8M. 
Usually, several hundreds of megabytes memory is enough for large scale batch 
jobs. | 0.3.0 | 
 | celeborn.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch 
request. | 0.2.0 | 
 | celeborn.fetch.maxRetries | 3 | Max retries of fetch chunk | 0.2.0 | 
 | celeborn.fetch.timeout | 120s | Timeout for a task to fetch chunk. | 0.2.0 | 

Reply via email to