waitinfuture commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1606733485


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java:
##########
@@ -47,24 +49,47 @@ public class ChunkStreamManager {
   protected final ConcurrentHashMap<String, Set<Long>> shuffleStreamIds;
 
   /** State of a single stream. */
-  protected static class StreamState {
-    final FileManagedBuffers buffers;
-    final String shuffleKey;
-    final String fileName;
-    final TimeWindow fetchTimeMetric;
-
+  public static class StreamState {
+    public final ChunkBuffers buffers;
+    public final String shuffleKey;
+    public final String fileName;
+    public final TimeWindow fetchTimeMetric;
+    public final int startIndex;
+    public final int endIndex;
     // Used to keep track of the number of chunks being transferred and not 
finished yet.
     volatile long chunksBeingTransferred = 0L;
+    public MemoryFileInfo memoryFileInfo;

Review Comment:
   unused



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -440,10 +463,16 @@ class FetchHandler(
       streamType: StreamType): Unit = {
     streamType match {
       case StreamType.ChunkStream =>
-        val (shuffleKey, fileName) = 
chunkStreamManager.getShuffleKeyAndFileName(streamId)
+        val streamState = chunkStreamManager.getStreamState(streamId)
+        val (shuffleKey, fileName, startIndex, endIndex) = (

Review Comment:
   startIndex, endIndex are not used, why add new variables 
streamState.startIndex, streamState.endIndex?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala:
##########
@@ -240,57 +241,79 @@ class FetchHandler(
         s"$endIndex get file name $fileName from client channel " +
         s"${NettyUtils.getRemoteAddress(client.getChannel)}")
 
-      var fileInfo = getRawDiskFileInfo(shuffleKey, fileName)
       val streamId = chunkStreamManager.nextStreamId()
+      var fileInfo = getRawFileInfo(shuffleKey, fileName, streamId)
       // we must get sorted fileInfo for the following cases.
       // 1. when the current request is a non-range openStream, but the 
original unsorted file
       //    has been deleted by another range's openStream request.
       // 2. when the current request is a range openStream request.
-      if ((endIndex != Int.MaxValue) || (endIndex == Int.MaxValue && 
!fileInfo.addStream(
-          streamId))) {
+      if ((endIndex != Int.MaxValue) || (endIndex == Int.MaxValue
+        // this will add stream only if this fileinfo is disk file info

Review Comment:
   This comment seems incorrect.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java:
##########
@@ -47,24 +49,47 @@ public class ChunkStreamManager {
   protected final ConcurrentHashMap<String, Set<Long>> shuffleStreamIds;
 
   /** State of a single stream. */
-  protected static class StreamState {
-    final FileManagedBuffers buffers;
-    final String shuffleKey;
-    final String fileName;
-    final TimeWindow fetchTimeMetric;
-
+  public static class StreamState {
+    public final ChunkBuffers buffers;
+    public final String shuffleKey;
+    public final String fileName;
+    public final TimeWindow fetchTimeMetric;
+    public final int startIndex;
+    public final int endIndex;
     // Used to keep track of the number of chunks being transferred and not 
finished yet.
     volatile long chunksBeingTransferred = 0L;
+    public MemoryFileInfo memoryFileInfo;
 
     StreamState(
         String shuffleKey,
-        FileManagedBuffers buffers,
+        ChunkBuffers buffers,
         String fileName,
-        TimeWindow fetchTimeMetric) {
+        TimeWindow fetchTimeMetric,
+        int startIndex,
+        int endIndex) {
       this.buffers = buffers;
       this.shuffleKey = shuffleKey;
       this.fileName = fileName;
       this.fetchTimeMetric = fetchTimeMetric;
+      this.startIndex = startIndex;
+      this.endIndex = endIndex;
+    }
+
+    StreamState(

Review Comment:
   Seems this method is unnecessary



##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -40,7 +45,20 @@ public FileMeta getFileMeta() {
     return fileMeta;
   }
 
-  public abstract long getFileLength();
+  public ReduceFileMeta getReduceFileMeta() {
+    return (ReduceFileMeta) fileMeta;
+  }
+
+  public long getFileLength() {
+    return bytesFlushed;
+  }
+
+  public void updateBytesFlushed(long bytes) {
+    bytesFlushed += bytes;
+    if (fileMeta instanceof ReduceFileMeta) {

Review Comment:
   use the local variable



##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -53,4 +71,40 @@ public boolean isPartitionSplitEnabled() {
   public void setPartitionSplitEnabled(boolean partitionSplitEnabled) {
     this.partitionSplitEnabled = partitionSplitEnabled;
   }
+
+  boolean isReduceFileMeta() {

Review Comment:
   Better to make it a local variable to avoid frequent invocation



##########
common/src/main/java/org/apache/celeborn/common/meta/MemoryFileInfo.java:
##########
@@ -17,17 +17,46 @@
 
 package org.apache.celeborn.common.meta;
 
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
 import io.netty.buffer.CompositeByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.util.ShuffleBlockInfoUtils.ShuffleBlockInfo;
 
 public class MemoryFileInfo extends FileInfo {
+  Logger logger = LoggerFactory.getLogger(MemoryFileInfo.class);
   private CompositeByteBuf buffer;
-  private long length;
+  private CompositeByteBuf sortedBuffer;
+  private Map<Integer, List<ShuffleBlockInfo>> sortedIndexes;
+  //  private AtomicInteger readerCount = new AtomicInteger(0);
+  private AtomicBoolean evicted = new AtomicBoolean(false);
+  private Consumer<MemoryFileInfo> evictFunc;
+  private MemoryFileInfo originFileInfo;

Review Comment:
   What't the purpose to add `originFileInfo` here, and also why `addStream`, 
`isStreamsEmpty`, `closeStream` differ from FileInfo's methods?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -354,65 +384,94 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) {
       throw new IOException("No available working dirs!")
     }
-    val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
-    val (flusher, diskFileInfo, workingDir) = createFile(
+    val partitionDataWriterContext = new PartitionDataWriterContext(
+      splitThreshold,
+      splitMode,
+      rangeReadFilter,
       location,
       appId,
       shuffleId,
-      location.getFileName,
       userIdentifier,
       partitionType,
       partitionSplitEnabled)
+
     val writer =
       try {
         partitionType match {
           case PartitionType.MAP => new MapPartitionDataWriter(
               this,
-              diskFileInfo,
-              flusher,
               workerSource,
               conf,
               deviceMonitor,
-              splitThreshold,
-              splitMode,
-              rangeReadFilter,
-              shuffleKey)
+              partitionDataWriterContext)
           case PartitionType.REDUCE => new ReducePartitionDataWriter(
               this,
-              diskFileInfo,
-              flusher,
               workerSource,
               conf,
               deviceMonitor,
-              splitThreshold,
-              splitMode,
-              rangeReadFilter,
-              shuffleKey)
+              partitionDataWriterContext)
           case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
         }
       } catch {
         case e: Exception =>
           logError("Create partition data writer failed", e)
           throw e
       }
-    if (!(writer.getDiskFileInfo.isHdfs)) {
-      deviceMonitor.registerFileWriter(writer)
-      workingDirWriters.computeIfAbsent(workingDir, 
workingDirWriterListFunc).put(
-        diskFileInfo.getFilePath,
-        writer)
-    } else {
-      hdfsWriters.put(diskFileInfo.getFilePath, writer)
-    }
     writer
   }
 
-  def getDiskFileInfo(shuffleKey: String, fileName: String): DiskFileInfo = {
-    val shuffleMap = diskFileInfos.get(shuffleKey)
-    if (shuffleMap ne null) {
-      shuffleMap.get(fileName)
-    } else {
-      null
+  def registerMemoryPartitionWriter(writer: PartitionDataWriter, fileInfo: 
MemoryFileInfo): Unit = {
+    memoryWriters.put(fileInfo, writer)
+  }
+
+  def unregisterMemoryPartitionWriterAndFileInfo(
+      fileInfo: MemoryFileInfo,
+      shuffleKey: String,
+      fileName: String): Unit = {
+    memoryWriters.remove(fileInfo)
+    val map = memoryFileInfos.get(shuffleKey)
+    if (map != null) {
+      map.remove(fileName)
+    }
+  }
+
+  def registerDiskFilePartitionWriter(
+      writer: PartitionDataWriter,
+      workingDir: File,
+      fileInfo: DiskFileInfo): Unit = {
+    if (writer.getDiskFileInfo.isHdfs) {
+      hdfsWriters.put(fileInfo.getFilePath, writer)
+      return
+    }
+    deviceMonitor.registerFileWriter(writer)
+    workingDirWriters.computeIfAbsent(workingDir, 
workingDirWriterListFunc).put(
+      fileInfo.getFilePath,
+      writer)
+  }
+
+  def getFileInfo(
+      shuffleKey: String,
+      fileName: String,
+      streamId: Long = 0): FileInfo = {
+    val memoryShuffleMap = memoryFileInfos.get(shuffleKey)
+    if (memoryShuffleMap != null) {
+      val memoryFileInfo = memoryShuffleMap.get(fileName)
+      if (memoryFileInfo != null) {
+        if (memoryFileInfo.addStream(streamId)) {

Review Comment:
   why add stream here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to