FMX commented on code in PR #3366:
URL: https://github.com/apache/celeborn/pull/3366#discussion_r2209121744
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4376,6 +4380,40 @@ object CelebornConf extends Logging {
.longConf
.createOptional
+ val WORKER_OPEN_HDFS_OUTPUT_STREAM_MAX: ConfigEntry[Int] =
+ buildConf("celeborn.worker.openHdfsOutputStream.max")
+ .categories("worker")
+ .doc("If the number of opened hdfs output streams on a worker exceeds
this configuration value, " +
+ "the worker will be marked as high-load in the heartbeat report, " +
+ "and the master will not include that node in the response of
RequestSlots.")
+ .version("0.7.0")
+ .intConf
+ .createWithDefault(10000)
Review Comment:
The default value of 10000 is too large; the default size of a Java thread
stack is 1MB, which means that this will consume approximately 10GB of memory.
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4376,6 +4380,40 @@ object CelebornConf extends Logging {
.longConf
.createOptional
+ val WORKER_OPEN_HDFS_OUTPUT_STREAM_MAX: ConfigEntry[Int] =
+ buildConf("celeborn.worker.openHdfsOutputStream.max")
+ .categories("worker")
+ .doc("If the number of opened hdfs output streams on a worker exceeds
this configuration value, " +
+ "the worker will be marked as high-load in the heartbeat report, " +
+ "and the master will not include that node in the response of
RequestSlots.")
+ .version("0.7.0")
+ .intConf
+ .createWithDefault(10000)
+
+ val WORKER_HDFS_OUTPUT_STREAM_IDLE_MS_MAX: ConfigEntry[Long] =
+ buildConf("celeborn.worker.hdfsOutputStream.idleMs.max")
Review Comment:
``` suggestion
buildConf("celeborn.worker.hdfsOutputStream.maxIdleTime")
```
Celeborn's config values support multiple time formats.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/hdfs/StreamsManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.hdfs;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.service.deploy.worker.WorkerSource;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+// A concurrent HDFS stream map implementation with the ability to safely
cleanup particular entry from the map
+// in parallel.
+public final class StreamsManager {
Review Comment:
IMO, we can use Guava cache to implement this manager, and it supports the
LRU eviction strategy.
There is no need to worry about introducing new dependencies, as Celeborn
already has the Guava dependencies.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]