yunfengzhou-hub commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r888749310


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,104 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
-/** Records the data received and replayed them on required. */
+/** Records the data received and replays them on required. */
 public class DataCacheWriter<T> {
 
-    private final TypeSerializer<T> serializer;
-
     private final FileSystem fileSystem;
 
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    private final LimitedSizeMemoryManager memoryManager;
+
+    private final TypeSerializer<T> serializer;
+
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    private SegmentWriter<T> currentWriter;
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
-            SupplierWithException<Path, IOException> pathGenerator)
+            SupplierWithException<Path, IOException> pathGenerator,
+            LimitedSizeMemoryManager memoryManager)
             throws IOException {
-        this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+        this(serializer, fileSystem, pathGenerator, memoryManager, 
Collections.emptyList());
     }
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator,
+            LimitedSizeMemoryManager memoryManager,
             List<Segment> priorFinishedSegments)
             throws IOException {
         this.serializer = serializer;
         this.fileSystem = fileSystem;
         this.pathGenerator = pathGenerator;
-
-        this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
-        this.currentSegment = new SegmentWriter(pathGenerator.get());
+        this.memoryManager = memoryManager;
+        this.finishedSegments = new ArrayList<>(priorFinishedSegments);
+        this.currentWriter =
+                SegmentWriter.create(
+                        pathGenerator.get(), this.memoryManager, serializer, 
0L, true, true);
     }
 
     public void addRecord(T record) throws IOException {
-        currentSegment.addRecord(record);
+        boolean success = currentWriter.addRecord(record);

Review Comment:
   I believe such case is possible. For example, we have written size of a 
vector to the last bytes of a memory segment, and failed to continue writing 
the values of the vector because no more segments are available for this 
operator in memory manager. In this case the code would re-create an 
`FsSegmentWriter` and re-write the size of the vector to the file. 
   
   Such case exists, but it will do no harm to the program, because the idling 
`size` value in the segment is not tracked so would not be accessed. It does 
not waste memory space either since we are allocating space at segments' 
granularity. It will be released afterwards along with those valid values in 
the segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to