RexXiong commented on code in PR #2130:
URL: 
https://github.com/apache/incubator-celeborn/pull/2130#discussion_r1431005454


##########
common/src/main/java/org/apache/celeborn/common/meta/NonMemoryFileInfo.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.Utils;
+
+public class NonMemoryFileInfo extends FileInfo {

Review Comment:
   Prefer use PersistentFileInfo instead of NonMemoryFileInfo



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartitionReader.java:
##########
@@ -129,7 +130,7 @@ public void open(FileChannel dataFileChannel, FileChannel 
indexFileChannel, long
       this.dataFileChannel = dataFileChannel;
       this.indexFileChannel = indexFileChannel;
       // index is (offset,length)
-      long indexRegionSize = fileInfo.getNumSubpartitions() * (long) 
INDEX_ENTRY_SIZE;
+      long indexRegionSize = fileInfo.getFileMeta().getNumSubpartitions() * 
(long) INDEX_ENTRY_SIZE;

Review Comment:
   1.Directly Use NonMemoryFileInfo as the parameter of the 
MapDataPartitionReader constructor 



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapDataPartition.java:
##########
@@ -81,27 +82,29 @@ public MapDataPartition(
     updateBuffersTarget((this.minReadBuffers + this.maxReadBuffers) / 2 + 1);
     logger.debug(
         "read map partition {} with {} {}",
-        fileInfo.getFilePath(),
+        ((NonMemoryFileInfo) fileInfo).getFilePath(),

Review Comment:
   1.Directly Use NonMemoryFileInfo as the parameter of the MapDataPartition 
constructor 
   2.Consider rename MapDataPartition -> MapPartitionData



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -184,8 +187,8 @@ public FileInfo getSortedFileInfo(
     Set<String> sorting =
         sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
 
-    String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
-    String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
+    String sortedFilePath = Utils.getSortedFilePath(((NonMemoryFileInfo) 
fileInfo).getFilePath());

Review Comment:
   Take NonMemoryFileInfo as the method parameter



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -525,9 +541,9 @@ class FileSorter {
 
     FileSorter(FileInfo fileInfo, String fileId, String shuffleKey) throws 
IOException {
       this.originFileInfo = fileInfo;
-      this.originFilePath = fileInfo.getFilePath();
+      this.originFilePath = ((NonMemoryFileInfo) fileInfo).getFilePath();

Review Comment:
   Take NonMemoryFileInfo as the FileSorter constructor parameter



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionPartitionDataWriter.java:
##########
@@ -24,23 +24,24 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.meta.NonMemoryFileInfo;
 import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.protocol.PartitionSplitMode;
 import org.apache.celeborn.common.protocol.PartitionType;
 
 /*
  * reduce partition file writer, it will create chunk index
  */
-public final class ReducePartitionFileWriter extends FileWriter {
-  private static final Logger logger = 
LoggerFactory.getLogger(ReducePartitionFileWriter.class);
+public final class ReducePartitionPartitionDataWriter extends 
PartitionDataWriter {

Review Comment:
   ReducePartitionPartitionDataWriter -> ReducePartitionDataWriter



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -98,34 +99,41 @@ public FileWriter(
       PartitionType partitionType,
       boolean rangeReadFilter)
       throws IOException {
-    this.fileInfo = fileInfo;
-    this.flusher = flusher;
-    this.flushWorkerIndex = flusher.getWorkerIndex();
+    this.storageManager = storageManager;
+    CreateFileResult createFileResult = 
storageManager.createFile(createFileContext, this);
+    fileInfo = createFileResult.fileInfo();

Review Comment:
   Convert fileInfo to NonMemoryFileInfo at this point



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionPartitionDataWriter.java:
##########
@@ -40,8 +40,9 @@
 /*
  * map partition file writer, it will create index for each partition
  */
-public final class MapPartitionFileWriter extends FileWriter {
-  private static final Logger logger = 
LoggerFactory.getLogger(MapPartitionFileWriter.class);
+public final class MapPartitionPartitionDataWriter extends PartitionDataWriter 
{

Review Comment:
   MapPartitionPartitionDataWriter -> MapPartitionDataWriter



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -398,94 +405,20 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
               rangeReadFilter)
           case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
         }
-        if (workerGracefulShutdown) {
-          hdfsWriter.setStorageManager(this)
-          hdfsWriter.setShuffleKey(shuffleKey)
-        }
-        hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
-        return hdfsWriter
-      } else if (dirs.nonEmpty && 
location.getStorageInfo.localDiskAvailable()) {
-        val dir = dirs(getNextIndex() % dirs.size)
-        val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, 
mountPoints)
-        val shuffleDir = new File(dir, s"$appId/$shuffleId")
-        val file = new File(shuffleDir, fileName)
-        try {
-          val fileInfo =
-            new FileInfo(
-              file.getAbsolutePath,
-              userIdentifier,
-              partitionType,
-              partitionSplitEnabled)
-          fileInfo.setMountPoint(mountPoint)
-          fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
-          shuffleDir.mkdirs()
-          if (file.exists()) {
-            throw new FileAlreadyExistsException(
-              s"Shuffle data file ${file.getAbsolutePath} already exists.")
-          } else {
-            val createFileSuccess = file.createNewFile()
-            if (!createFileSuccess) {
-              throw new CelebornException(
-                s"Create shuffle data file ${file.getAbsolutePath} failed!")
-            }
-          }
-          val fileWriter = partitionType match {
-            case PartitionType.MAP => new MapPartitionFileWriter(
-                fileInfo,
-                localFlushers.get(mountPoint),
-                workerSource,
-                conf,
-                deviceMonitor,
-                splitThreshold,
-                splitMode,
-                rangeReadFilter)
-            case PartitionType.REDUCE => new ReducePartitionFileWriter(
-                fileInfo,
-                localFlushers.get(mountPoint),
-                workerSource,
-                conf,
-                deviceMonitor,
-                splitThreshold,
-                splitMode,
-                rangeReadFilter)
-            case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
-          }
-          if (workerGracefulShutdown) {
-            fileWriter.setStorageManager(this)
-            fileWriter.setShuffleKey(shuffleKey)
-          }
-          deviceMonitor.registerFileWriter(fileWriter)
-          val map = workingDirWriters.computeIfAbsent(dir, 
workingDirWriterListFunc)
-          map.put(fileInfo.getFilePath, fileWriter)
-          location.getStorageInfo.setMountPoint(mountPoint)
-          logDebug(s"location $location set disk hint to 
${location.getStorageInfo} ")
-          return fileWriter
-        } catch {
-          case fe: FileAlreadyExistsException =>
-            logError("Failed to create fileWriter because of existed file", fe)
-            throw fe
-          case t: Throwable =>
-            logError(
-              s"Create FileWriter for ${file.getAbsolutePath} of mount 
$mountPoint " +
-                s"failed, report to DeviceMonitor",
-              t)
-            exception = new IOException(t)
-            deviceMonitor.reportNonCriticalError(
-              mountPoint,
-              exception,
-              DiskStatus.READ_OR_WRITE_FAILURE)
-        }
-      } else {
-        exception = new IOException("No storage available for location:" + 
location.toString)
+      } catch {
+        case e: Exception =>
+          logError("Create partition data writer failed", e)
+          throw e
       }
-      retryCount += 1
+    if (writer.getFileInfo.isInstanceOf[NonMemoryFileInfo] && 
!(writer.getFileInfo.asInstanceOf[
+        NonMemoryFileInfo].isHdfs)) {
+      deviceMonitor.registerFileWriter(writer)
     }
-
-    throw exception
+    writer
   }
 
   def getFileInfo(shuffleKey: String, fileName: String): FileInfo = {
-    val shuffleMap = fileInfos.get(shuffleKey)
+    val shuffleMap = nonMemoryFileInfos.get(shuffleKey)

Review Comment:
   Directly return NonMemoryFileInfo for getFileInfo~



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala:
##########
@@ -327,12 +327,14 @@ private[deploy] class Controller(
     future
   }
 
-  private def waitMapPartitionRegionFinished(fileWriter: FileWriter, 
waitTimeout: Long): Unit = {
-    if (fileWriter.isInstanceOf[MapPartitionFileWriter]) {
+  private def waitMapPartitionRegionFinished(
+      fileWriter: PartitionDataWriter,
+      waitTimeout: Long): Unit = {
+    if (fileWriter.isInstanceOf[MapPartitionPartitionDataWriter]) {

Review Comment:
   Can we add method `checkPartitionRegionFinished(timeout)` to 
MapPartitionPartitionDataWriter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to