waitinfuture commented on code in PR #2130:
URL: 
https://github.com/apache/incubator-celeborn/pull/2130#discussion_r1433998146


##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -17,293 +17,40 @@
 
 package org.apache.celeborn.common.meta;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.celeborn.common.identity.UserIdentifier;
-import org.apache.celeborn.common.protocol.PartitionType;
-import org.apache.celeborn.common.util.Utils;
-
-public class FileInfo {
-  private static Logger logger = LoggerFactory.getLogger(FileInfo.class);
-  private String mountPoint;
-  private final String filePath;
-  private final PartitionType partitionType;
-  private final UserIdentifier userIdentifier;
-
-  /**
-   * A flag used to indicate whether this FileInfo is sorted or not. 
Currently, it is only set for
-   * unsorted FileInfo instances.
-   */
-  private final AtomicBoolean sorted = new AtomicBoolean(false);
-
-  /** The set of stream IDs that are fetching this FileInfo. */
-  private final Set<Long> streams = ConcurrentHashMap.newKeySet();
 
-  // members for ReducePartition
-  private final List<Long> chunkOffsets;
-
-  // members for MapPartition
-  private int bufferSize;
-  private int numSubpartitions;
-
-  private volatile long bytesFlushed;
+public abstract class FileInfo {
+  private UserIdentifier userIdentifier;

Review Comment:
   final, to be consistent with previous code



##########
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReduceFileMeta implements FileMeta {
+  private List<Long> chunkOffsets;
+  private AtomicBoolean sorted = new AtomicBoolean(false);
+
+  public ReduceFileMeta() {
+    this.chunkOffsets = new ArrayList<>();
+    chunkOffsets.add(0L);
+  }
+
+  public ReduceFileMeta(List<Long> chunkOffsets) {
+    this.chunkOffsets = chunkOffsets;
+  }
+
+  @Override
+  public synchronized List<Long> getChunkOffsets() {
+    return chunkOffsets;
+  }
+
+  @Override
+  public synchronized void addChunkOffset(long offset) {
+    chunkOffsets.add(offset);
+  }
+
+  public synchronized long getLastChunkOffset() {
+    return chunkOffsets.get(chunkOffsets.size() - 1);
+  }
+
+  @Override
+  public synchronized int getNumChunks() {
+    if (chunkOffsets.isEmpty()) {
+      return 0;
+    } else {
+      return chunkOffsets.size() - 1;
+    }
+  }
+
+  @Override
+  public synchronized void setSorted() {

Review Comment:
   Should remain the previous logic:
   ```
     public void setSorted() {
       synchronized (sorted) {
         sorted.set(true);
       }
     }
   ```
   
   Refactoring code should not change original logic.



##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -495,6 +496,26 @@ message PbFileInfo {
   bool partitionSplitEnabled = 8;
 }
 
+message PbMapFileMeta{
+  int32 bufferSize = 1;
+  int32 numSubPartitions = 2;
+}
+
+message PbReduceFileMeta {
+  repeated int64 chunkOffsets = 1;
+  bool sorted = 2;
+}
+
+message PbNonMemoryFile{
+  PbUserIdentifier userIdentifier = 1;
+  bool partitionSplitEnabled = 2;
+  PbMapFileMeta mapFileMeta = 3;
+  PbReduceFileMeta reduceFileMeta = 4;
+  string filePath = 5;

Review Comment:
   `filePath` is already in `PbStorageInfo`



##########
common/src/main/java/org/apache/celeborn/common/meta/FileMeta.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta;
+
+import java.util.List;
+
+import org.apache.commons.lang3.NotImplementedException;
+
+public interface FileMeta {

Review Comment:
   The purpose of this PR is to refactor FileInfo so that MapPartition and 
ReducePartition are separated. However this `FileMeta` has interfaces for both 
MapPartition and ReducePartition, I think it's not a good abstract. Better to 
only contain common interfaces, and cast to specific types when needed.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java:
##########
@@ -101,7 +102,7 @@ public class MapDataPartitionReader implements 
Comparable<MapDataPartitionReader
   private AtomicInteger numInUseBuffers = new AtomicInteger(0);
   private boolean isOpen = false;
 
-  public MapDataPartitionReader(
+  public MapPartitionDataReader(
       int startPartitionIndex,
       int endPartitionIndex,
       FileInfo fileInfo,

Review Comment:
   If we are sure this is always `DiskFileInfo`, change the type to 
`DiskFileInfo`



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java:
##########
@@ -37,18 +37,19 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.meta.NonMemoryFileInfo;
 import org.apache.celeborn.common.util.FileChannelUtils;
 import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
 import org.apache.celeborn.service.deploy.worker.memory.BufferRecycler;
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
 
 // this means active data partition
-class MapDataPartition implements MemoryManager.ReadBufferTargetChangeListener 
{
-  public static final Logger logger = 
LoggerFactory.getLogger(MapDataPartition.class);
+class MapPartitionData implements MemoryManager.ReadBufferTargetChangeListener 
{
+  public static final Logger logger = 
LoggerFactory.getLogger(MapPartitionData.class);
   private final FileInfo fileInfo;

Review Comment:
   Change the type to `DiskFileInfo`



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -98,34 +99,41 @@ public FileWriter(
       PartitionType partitionType,
       boolean rangeReadFilter)
       throws IOException {
-    this.fileInfo = fileInfo;
-    this.flusher = flusher;
-    this.flushWorkerIndex = flusher.getWorkerIndex();
+    this.storageManager = storageManager;
+    CreateFileResult createFileResult = 
storageManager.createFile(createFileContext, this);

Review Comment:
   I think it's not a good idea to call `createFile` in constructor here, and I 
think it's unnecessary to define `CreateFileResult`. Instead, should call 
`createFile` in `StorageManager#createPartitionDataWriter` and pass `flusher` 
and `fileInfo` as parameters to the constructor.



##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -495,6 +496,26 @@ message PbFileInfo {
   bool partitionSplitEnabled = 8;
 }
 
+message PbMapFileMeta{
+  int32 bufferSize = 1;
+  int32 numSubPartitions = 2;
+}
+
+message PbReduceFileMeta {
+  repeated int64 chunkOffsets = 1;
+  bool sorted = 2;
+}
+
+message PbNonMemoryFile{
+  PbUserIdentifier userIdentifier = 1;
+  bool partitionSplitEnabled = 2;
+  PbMapFileMeta mapFileMeta = 3;
+  PbReduceFileMeta reduceFileMeta = 4;
+  string filePath = 5;
+  PbStorageInfo storageInfo = 6;
+  int64 bytesFlusher = 7;

Review Comment:
   bytesFlushed



##########
common/src/main/java/org/apache/celeborn/common/meta/NonMemoryFileInfo.java:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.Utils;
+
+public class NonMemoryFileInfo extends FileInfo {

Review Comment:
   Maybe `DiskFileInfo` is better. Even for HDFS and ObjectStore, we can see 
them as a giant disk. In cloud native database, DBMS that use ObjectStore are 
referred to as  `Shared Disk DMBS`.



##########
common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala:
##########
@@ -138,10 +149,11 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     val pbFileInfo = PbSerDeUtils.toPbFileInfo(fileInfo1)
     val restoredFileInfo = PbSerDeUtils.fromPbFileInfo(pbFileInfo)
 
-    assert(restoredFileInfo.getFilePath.equals(fileInfo1.getFilePath))
-    assert(restoredFileInfo.getChunkOffsets.equals(fileInfo1.getChunkOffsets))
+    assert(
+      restoredFileInfo.getFilePath.equals(fileInfo1.getFilePath))
+    assert(restoredFileInfo.getChunkOffsets.equals(
+      fileInfo1.getChunkOffsets))
     
assert(restoredFileInfo.getUserIdentifier.equals(fileInfo1.getUserIdentifier))
-    
assert(restoredFileInfo.getPartitionType.equals(fileInfo1.getPartitionType))

Review Comment:
   why delete this line?



##########
common/src/main/java/org/apache/celeborn/common/meta/NonMemoryFileInfo.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.Utils;
+
+public class NonMemoryFileInfo extends FileInfo {
+  private static Logger logger = 
LoggerFactory.getLogger(NonMemoryFileInfo.class);
+  private final Set<Long> streams = ConcurrentHashMap.newKeySet();
+  private String filePath;
+  private StorageInfo.Type storageType;
+  private volatile long bytesFlushed;
+
+  public NonMemoryFileInfo(
+      UserIdentifier userIdentifier,
+      boolean partitionSplitEnabled,
+      FileMeta fileMeta,
+      String filePath,
+      StorageInfo.Type storageType) {
+    super(userIdentifier, partitionSplitEnabled, fileMeta);
+    this.filePath = filePath;
+    this.storageType = storageType;
+  }
+
+  public NonMemoryFileInfo(
+      UserIdentifier userIdentifier,
+      boolean partitionSplitEnabled,
+      FileMeta fileMeta,
+      String filePath) {
+    super(userIdentifier, partitionSplitEnabled, fileMeta);
+    this.filePath = filePath;
+    // assume that a fileinfo reloaded from pb is local file
+    this.storageType = StorageInfo.Type.HDD;

Review Comment:
   If I remember correctly, we should merge `HDD` and `SSD` to `LOCAL`



##########
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReduceFileMeta implements FileMeta {
+  private List<Long> chunkOffsets;
+  private AtomicBoolean sorted = new AtomicBoolean(false);

Review Comment:
   final, to be consistent with previous code



##########
common/src/main/java/org/apache/celeborn/common/meta/ReduceFileMeta.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ReduceFileMeta implements FileMeta {
+  private List<Long> chunkOffsets;

Review Comment:
   final, to be consistent with previous code



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -847,12 +774,119 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   }
 
   def getActiveShuffleSize(): Long = {
-    
fileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum
+    
nonMemoryFileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum
   }
 
   def getActiveShuffleFileCount(): Long = {
-    fileInfos.asScala.values.map(_.size()).sum
+    nonMemoryFileInfos.asScala.values.map(_.size()).sum
+  }
+
+  def createFile(
+      createFileContext: CreateFileContext,
+      partitionDataWriter: PartitionDataWriter): CreateFileResult = {
+    val (location, appId, shuffleId, fileName) = (
+      createFileContext.location,
+      createFileContext.appId,
+      createFileContext.shuffleId,
+      createFileContext.fileName)
+    val suggestedMountPoint = location.getStorageInfo.getMountPoint
+    var retryCount = 0

Review Comment:
   Should increment `retryCount` after each loop, as previous code does.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -847,12 +774,119 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   }
 
   def getActiveShuffleSize(): Long = {
-    
fileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum
+    
nonMemoryFileInfos.values().asScala.map(_.values().asScala.map(_.getBytesFlushed).sum).sum
   }
 
   def getActiveShuffleFileCount(): Long = {
-    fileInfos.asScala.values.map(_.size()).sum
+    nonMemoryFileInfos.asScala.values.map(_.size()).sum
+  }
+
+  def createFile(

Review Comment:
   I don't think we should extract this method. Is it for test purpose? Better 
to change the test code.



##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -50,8 +51,8 @@
  * Note: Once FlushNotifier.exception is set, the whole file is not available.
  *       That's fine some of the internal state(e.g. bytesFlushed) may be 
inaccurate.
  */
-public abstract class FileWriter implements DeviceObserver {
-  private static final Logger logger = 
LoggerFactory.getLogger(FileWriter.class);
+public abstract class PartitionDataWriter implements DeviceObserver {
+  private static final Logger logger = 
LoggerFactory.getLogger(PartitionDataWriter.class);
   private static final long WAIT_INTERVAL_MS = 5;
 
   protected final FileInfo fileInfo;

Review Comment:
   If this is always `DiskFileInfo`, just make the type to be `DiskFileInfo`, 
to avoid type conversion in many places. If you plan to support memory file in 
the future, change accordingly at that time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to