FMX commented on code in PR #2300:
URL: https://github.com/apache/celeborn/pull/2300#discussion_r1591731425


##########
common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java:
##########
@@ -53,4 +73,56 @@ public boolean isPartitionSplitEnabled() {
   public void setPartitionSplitEnabled(boolean partitionSplitEnabled) {
     this.partitionSplitEnabled = partitionSplitEnabled;
   }
+
+  private boolean isReduceFileMeta() {
+    return fileMeta instanceof ReduceFileMeta;
+  }
+
+  public boolean addStream(long streamId) {
+    if (!isReduceFileMeta()) {
+      throw new IllegalStateException("In addStream, filemeta cannot be 
MapFileMeta");
+    }
+    ReduceFileMeta reduceFileMeta = (ReduceFileMeta) fileMeta;
+    synchronized (reduceFileMeta.getSorted()) {
+      if (reduceFileMeta.getSorted().get()) {
+        return false;
+      } else {
+        streams.add(streamId);
+        return true;
+      }
+    }
+  }
+
+  public void closeStream(long streamId, int startIndex, int endIndex) {
+    if (!isReduceFileMeta()) {
+      throw new IllegalStateException("In closeStream, filemeta cannot be 
MapFileMeta");
+    }
+    ReduceFileMeta reduceFileMeta = (ReduceFileMeta) fileMeta;
+    synchronized (reduceFileMeta.getSorted()) {
+      streams.remove(streamId);
+    }
+  }
+
+  public boolean isStreamsEmpty() {
+    if (!isReduceFileMeta()) {
+      throw new IllegalStateException("In isStreamsEmpty, filemeta cannot be 
MapFileMeta");
+    }
+    ReduceFileMeta reduceFileMeta = (ReduceFileMeta) fileMeta;
+    synchronized (reduceFileMeta.getSorted()) {
+      return streams.isEmpty();
+    }
+  }
+
+  public boolean isFullyRead() {

Review Comment:
   It will read from disks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to