FMX commented on code in PR #3366:
URL: https://github.com/apache/celeborn/pull/3366#discussion_r2275840957


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/hdfs/StreamsManager.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.hdfs;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public final class StreamsManager {
+    public static final Logger log = 
LoggerFactory.getLogger(StreamsManager.class);
+
+    LoadingCache<Path, FSDataOutputStream> cache;
+    private final CelebornConf conf;
+    private final FileSystem fileSystem;
+    private final int maxCapacity;
+    private final long maxStreamIdleMs;
+    private long totalGetkeyCount = 0;
+    private long totalLoaderCount = 0;
+    private final long maxCount = 1000000000000L;
+
+    private final int concurrentLevel;
+    public StreamsManager(CelebornConf conf, FileSystem fileSystem) {
+        this.conf = conf;
+        this.fileSystem = fileSystem;
+        this.maxCapacity = conf.workerOpenHDFSOutputStreamMax();
+        this.maxStreamIdleMs = conf.workerHDFSOutputStreamIdleMsMax();
+        this.concurrentLevel = conf.workerHDFSOutputStreamConcurrentLevel();
+        RemovalListener<Path, FSDataOutputStream> listener = new 
RemovalListener<Path, FSDataOutputStream>() {
+            @Override
+            public void onRemoval(RemovalNotification<Path, 
FSDataOutputStream> notification) {
+                if (notification.getValue() != null) {
+                    try {
+                        notification.getValue().close();
+                        Thread.sleep(20);

Review Comment:
   Why should we sleep after we remove a DFS output stream?



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/hdfs/StreamsManager.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.hdfs;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public final class StreamsManager {
+    public static final Logger log = 
LoggerFactory.getLogger(StreamsManager.class);
+
+    LoadingCache<Path, FSDataOutputStream> cache;
+    private final CelebornConf conf;
+    private final FileSystem fileSystem;
+    private final int maxCapacity;
+    private final long maxStreamIdleMs;
+    private long totalGetkeyCount = 0;
+    private long totalLoaderCount = 0;
+    private final long maxCount = 1000000000000L;
+
+    private final int concurrentLevel;
+    public StreamsManager(CelebornConf conf, FileSystem fileSystem) {
+        this.conf = conf;
+        this.fileSystem = fileSystem;
+        this.maxCapacity = conf.workerOpenHDFSOutputStreamMax();
+        this.maxStreamIdleMs = conf.workerHDFSOutputStreamIdleMsMax();
+        this.concurrentLevel = conf.workerHDFSOutputStreamConcurrentLevel();
+        RemovalListener<Path, FSDataOutputStream> listener = new 
RemovalListener<Path, FSDataOutputStream>() {
+            @Override
+            public void onRemoval(RemovalNotification<Path, 
FSDataOutputStream> notification) {
+                if (notification.getValue() != null) {
+                    try {
+                        notification.getValue().close();

Review Comment:
   Since closing the DFS output stream will take some time, add a thread pool 
to make sure the close method won't block the thread that invoked invalidate.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/hdfs/StreamsManager.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.hdfs;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public final class StreamsManager {
+    public static final Logger log = 
LoggerFactory.getLogger(StreamsManager.class);
+
+    LoadingCache<Path, FSDataOutputStream> cache;
+    private final CelebornConf conf;
+    private final FileSystem fileSystem;
+    private final int maxCapacity;
+    private final long maxStreamIdleMs;
+    private long totalGetkeyCount = 0;
+    private long totalLoaderCount = 0;
+    private final long maxCount = 1000000000000L;
+
+    private final int concurrentLevel;
+    public StreamsManager(CelebornConf conf, FileSystem fileSystem) {
+        this.conf = conf;
+        this.fileSystem = fileSystem;
+        this.maxCapacity = conf.workerOpenHDFSOutputStreamMax();
+        this.maxStreamIdleMs = conf.workerHDFSOutputStreamIdleMsMax();
+        this.concurrentLevel = conf.workerHDFSOutputStreamConcurrentLevel();
+        RemovalListener<Path, FSDataOutputStream> listener = new 
RemovalListener<Path, FSDataOutputStream>() {
+            @Override
+            public void onRemoval(RemovalNotification<Path, 
FSDataOutputStream> notification) {
+                if (notification.getValue() != null) {
+                    try {
+                        notification.getValue().close();
+                        Thread.sleep(20);
+                    } catch (IOException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        };
+        CacheLoader<Path, FSDataOutputStream> cacheLoader = new 
CacheLoader<Path, FSDataOutputStream>() {
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public FSDataOutputStream load(Path path) throws Exception {
+                totalLoaderCount++;
+                try {
+                    return fileSystem.append(path);
+                } catch (IOException e){
+                    throw new IOException("File must be exist: " + path + "\n" 
+ e);
+                }
+            }
+        };
+        cache = CacheBuilder.newBuilder()
+                .maximumSize(this.maxCapacity)
+                .concurrencyLevel(this.concurrentLevel)
+                .expireAfterAccess(this.maxStreamIdleMs, TimeUnit.MILLISECONDS)
+                .removalListener(listener)
+                .build(cacheLoader);
+    }
+
+    public long getSize() {
+        return cache.size();
+    }
+
+    public synchronized void dropEntry(Path path) {

Review Comment:
   There is no need to synchronize on this method, especially since the close 
might take a while. Just invalidating the key should be enough.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/hdfs/StreamsManager.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.hdfs;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public final class StreamsManager {
+    public static final Logger log = 
LoggerFactory.getLogger(StreamsManager.class);
+
+    LoadingCache<Path, FSDataOutputStream> cache;
+    private final CelebornConf conf;
+    private final FileSystem fileSystem;
+    private final int maxCapacity;
+    private final long maxStreamIdleMs;
+    private long totalGetkeyCount = 0;
+    private long totalLoaderCount = 0;
+    private final long maxCount = 1000000000000L;
+
+    private final int concurrentLevel;
+    public StreamsManager(CelebornConf conf, FileSystem fileSystem) {
+        this.conf = conf;
+        this.fileSystem = fileSystem;
+        this.maxCapacity = conf.workerOpenHDFSOutputStreamMax();
+        this.maxStreamIdleMs = conf.workerHDFSOutputStreamIdleMsMax();
+        this.concurrentLevel = conf.workerHDFSOutputStreamConcurrentLevel();
+        RemovalListener<Path, FSDataOutputStream> listener = new 
RemovalListener<Path, FSDataOutputStream>() {
+            @Override
+            public void onRemoval(RemovalNotification<Path, 
FSDataOutputStream> notification) {
+                if (notification.getValue() != null) {
+                    try {
+                        notification.getValue().close();
+                        Thread.sleep(20);
+                    } catch (IOException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        };
+        CacheLoader<Path, FSDataOutputStream> cacheLoader = new 
CacheLoader<Path, FSDataOutputStream>() {
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public FSDataOutputStream load(Path path) throws Exception {
+                totalLoaderCount++;
+                try {
+                    return fileSystem.append(path);
+                } catch (IOException e){
+                    throw new IOException("File must be exist: " + path + "\n" 
+ e);
+                }
+            }
+        };
+        cache = CacheBuilder.newBuilder()
+                .maximumSize(this.maxCapacity)
+                .concurrencyLevel(this.concurrentLevel)
+                .expireAfterAccess(this.maxStreamIdleMs, TimeUnit.MILLISECONDS)
+                .removalListener(listener)
+                .build(cacheLoader);
+    }
+
+    public long getSize() {
+        return cache.size();
+    }
+
+    public synchronized void dropEntry(Path path) {
+        FSDataOutputStream outputStream = cache.getIfPresent(path);
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            cache.invalidate(path);
+        }
+    }
+
+    public long getReuseOutputStreamCount() {
+        long res = totalGetkeyCount - totalLoaderCount;
+        if (totalGetkeyCount > maxCount || totalLoaderCount > maxCount) {
+            totalGetkeyCount = 0;
+            totalLoaderCount = 0;
+        }
+        return res;
+    }
+
+    public synchronized FSDataOutputStream getOrCreateStream(Path path) {
+        totalGetkeyCount++;
+        try {
+            return cache.get(path);

Review Comment:
   This method is thread-safe.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala:
##########
@@ -467,6 +467,24 @@ private[celeborn] class Worker(
     cleanTaskQueue.size()
   }
 
+  if (hasHDFSStorage) {
+    workerSource.addGauge(WorkerSource.OPEN_HDFS_OUTPUT_STREAM_COUNT) { () =>

Review Comment:
   Try to expose the guava cache hit rate, miss rate, hit count and miss count.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/hdfs/StreamsManager.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.hdfs;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public final class StreamsManager {
+    public static final Logger log = 
LoggerFactory.getLogger(StreamsManager.class);
+
+    LoadingCache<Path, FSDataOutputStream> cache;
+    private final CelebornConf conf;
+    private final FileSystem fileSystem;
+    private final int maxCapacity;
+    private final long maxStreamIdleMs;
+    private long totalGetkeyCount = 0;
+    private long totalLoaderCount = 0;

Review Comment:
   Do not record the two counters; you can get cache stats by invoking 
`cache.stats()`.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala:
##########
@@ -101,15 +102,32 @@ private[worker] class HdfsFlushTask(
     val path: Path,
     notifier: FlushNotifier,
     keepBuffer: Boolean,
-    source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer, 
source) {
+    source: AbstractSource,
+    finalFlush: Boolean) extends DfsFlushTask(buffer, notifier, keepBuffer, 
source) {
   override def flush(copyBytes: Array[Byte]): Unit = {
     val readableBytes = buffer.readableBytes()
     val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
-    val hdfsStream = hadoopFs.append(path, 256 * 1024)
-    flush(hdfsStream) {
-      hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
-      source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
-      source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
+    if (StorageManager.streamsManager != null) {
+      val hdfsStream = StorageManager.streamsManager.getOrCreateStream(path)
+      if (hdfsStream != null) {
+        hdfsStream.synchronized {

Review Comment:
   All flusher tasks of a partition data writer will be processed by a fixed 
thread, which means that there is no need for a `synchronized` block.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala:
##########
@@ -101,15 +102,32 @@ private[worker] class HdfsFlushTask(
     val path: Path,
     notifier: FlushNotifier,
     keepBuffer: Boolean,
-    source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer, 
source) {
+    source: AbstractSource,
+    finalFlush: Boolean) extends DfsFlushTask(buffer, notifier, keepBuffer, 
source) {
   override def flush(copyBytes: Array[Byte]): Unit = {
     val readableBytes = buffer.readableBytes()
     val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
-    val hdfsStream = hadoopFs.append(path, 256 * 1024)
-    flush(hdfsStream) {
-      hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
-      source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
-      source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
+    if (StorageManager.streamsManager != null) {

Review Comment:
   Two branches of this if block have duplicate code; try to simplify this if 
block.
   You can move hdfs stream forward and reuse it.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/hdfs/StreamsManager.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage.hdfs;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public final class StreamsManager {
+    public static final Logger log = 
LoggerFactory.getLogger(StreamsManager.class);
+
+    LoadingCache<Path, FSDataOutputStream> cache;
+    private final CelebornConf conf;
+    private final FileSystem fileSystem;
+    private final int maxCapacity;
+    private final long maxStreamIdleMs;
+    private long totalGetkeyCount = 0;
+    private long totalLoaderCount = 0;
+    private final long maxCount = 1000000000000L;
+
+    private final int concurrentLevel;
+    public StreamsManager(CelebornConf conf, FileSystem fileSystem) {
+        this.conf = conf;
+        this.fileSystem = fileSystem;
+        this.maxCapacity = conf.workerOpenHDFSOutputStreamMax();
+        this.maxStreamIdleMs = conf.workerHDFSOutputStreamIdleMsMax();
+        this.concurrentLevel = conf.workerHDFSOutputStreamConcurrentLevel();
+        RemovalListener<Path, FSDataOutputStream> listener = new 
RemovalListener<Path, FSDataOutputStream>() {
+            @Override
+            public void onRemoval(RemovalNotification<Path, 
FSDataOutputStream> notification) {
+                if (notification.getValue() != null) {
+                    try {
+                        notification.getValue().close();
+                        Thread.sleep(20);
+                    } catch (IOException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        };
+        CacheLoader<Path, FSDataOutputStream> cacheLoader = new 
CacheLoader<Path, FSDataOutputStream>() {
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public FSDataOutputStream load(Path path) throws Exception {
+                totalLoaderCount++;
+                try {
+                    return fileSystem.append(path);
+                } catch (IOException e){
+                    throw new IOException("File must be exist: " + path + "\n" 
+ e);
+                }
+            }
+        };
+        cache = CacheBuilder.newBuilder()
+                .maximumSize(this.maxCapacity)
+                .concurrencyLevel(this.concurrentLevel)
+                .expireAfterAccess(this.maxStreamIdleMs, TimeUnit.MILLISECONDS)
+                .removalListener(listener)
+                .build(cacheLoader);
+    }
+
+    public long getSize() {
+        return cache.size();
+    }
+
+    public synchronized void dropEntry(Path path) {
+        FSDataOutputStream outputStream = cache.getIfPresent(path);
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            cache.invalidate(path);
+        }
+    }
+
+    public long getReuseOutputStreamCount() {
+        long res = totalGetkeyCount - totalLoaderCount;
+        if (totalGetkeyCount > maxCount || totalLoaderCount > maxCount) {
+            totalGetkeyCount = 0;
+            totalLoaderCount = 0;
+        }
+        return res;
+    }
+
+    public synchronized FSDataOutputStream getOrCreateStream(Path path) {

Review Comment:
   Remove this unnecessary `synchronized`.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -167,6 +168,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       logInfo(s"Initialize HDFS support with path $hdfsDir")
       try {
         StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf)
+        if (conf.workerReuseHDFSOutputStream) {
+          StorageManager.streamsManager = new StreamsManager(conf,
+            StorageManager.hadoopFs.get(StorageInfo.Type.HDFS))

Review Comment:
   Other Dfs system might gain benefits from this stream manager, maybe you can 
test them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to