lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r889994816


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheReader.java:
##########
@@ -20,120 +20,91 @@
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-/** Reads the cached data from a list of paths. */
+/** Reads the cached data from a list of segments. */
 public class DataCacheReader<T> implements Iterator<T> {
 
+    /** The tool to deserialize bytes into records. */
     private final TypeSerializer<T> serializer;
 
-    private final FileSystem fileSystem;
-
+    /** The segments where to read the records from. */
     private final List<Segment> segments;
 
-    @Nullable private SegmentReader currentSegmentReader;
+    /** The current reader for next records. */
+    @Nullable private SegmentReader<T> currentReader;

Review Comment:
   nits: since the class name is `DataCacheReader`, could we rename this 
parameter as `currentSegmentReader` to minimize confusion?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,162 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter<T> {
 
+    /** A soft limit on the max allowed size of a single segment. */
+    static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+    /** The tool to serialize received records into bytes. */
     private final TypeSerializer<T> serializer;
 
+    /** The file system that contains the cache files. */
     private final FileSystem fileSystem;
 
+    /** A generator to generate paths of cache files. */
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    /** An optional pool that provide memory segments to hold cached records 
in memory. */
+    @Nullable private final MemorySegmentPool segmentPool;
+
+    /** The segments that contain previously added records. */
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    /** The current writer for new records. */
+    @Nullable private SegmentWriter<T> currentWriter;

Review Comment:
   nits: could we rename this variable as `currentSegmentWriter`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheReader.java:
##########
@@ -20,120 +20,91 @@
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-/** Reads the cached data from a list of paths. */
+/** Reads the cached data from a list of segments. */
 public class DataCacheReader<T> implements Iterator<T> {
 
+    /** The tool to deserialize bytes into records. */
     private final TypeSerializer<T> serializer;
 
-    private final FileSystem fileSystem;
-
+    /** The segments where to read the records from. */
     private final List<Segment> segments;
 
-    @Nullable private SegmentReader currentSegmentReader;
+    /** The current reader for next records. */
+    @Nullable private SegmentReader<T> currentReader;
 
-    public DataCacheReader(
-            TypeSerializer<T> serializer, FileSystem fileSystem, List<Segment> 
segments)
-            throws IOException {
-        this(serializer, fileSystem, segments, new Tuple2<>(0, 0));
+    /** The index of the segment that current reader reads from. */
+    private int segmentIndex;

Review Comment:
   nits: how about renaming it as `currentSegmentIndex` to make the name more 
consistent with other variables?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheReader.java:
##########
@@ -20,120 +20,91 @@
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
-/** Reads the cached data from a list of paths. */
+/** Reads the cached data from a list of segments. */
 public class DataCacheReader<T> implements Iterator<T> {
 
+    /** The tool to deserialize bytes into records. */
     private final TypeSerializer<T> serializer;
 
-    private final FileSystem fileSystem;
-
+    /** The segments where to read the records from. */
     private final List<Segment> segments;
 
-    @Nullable private SegmentReader currentSegmentReader;
+    /** The current reader for next records. */
+    @Nullable private SegmentReader<T> currentReader;
 
-    public DataCacheReader(
-            TypeSerializer<T> serializer, FileSystem fileSystem, List<Segment> 
segments)
-            throws IOException {
-        this(serializer, fileSystem, segments, new Tuple2<>(0, 0));
+    /** The index of the segment that current reader reads from. */
+    private int segmentIndex;
+
+    /** The number of records that have been read through current reader so 
far. */
+    private int segmentCount;

Review Comment:
   nits: to avoid confusing this count with the number of records that have 
been read by this data cache, how about renaming it as `curentSegmentCount`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,162 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter<T> {
 
+    /** A soft limit on the max allowed size of a single segment. */
+    static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+    /** The tool to serialize received records into bytes. */
     private final TypeSerializer<T> serializer;
 
+    /** The file system that contains the cache files. */
     private final FileSystem fileSystem;
 
+    /** A generator to generate paths of cache files. */
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    /** An optional pool that provide memory segments to hold cached records 
in memory. */
+    @Nullable private final MemorySegmentPool segmentPool;
+
+    /** The segments that contain previously added records. */
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    /** The current writer for new records. */
+    @Nullable private SegmentWriter<T> currentWriter;
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator)
             throws IOException {
-        this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
     }
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator,
-            List<Segment> priorFinishedSegments)
+            MemorySegmentPool segmentPool)
             throws IOException {
-        this.serializer = serializer;
-        this.fileSystem = fileSystem;
-        this.pathGenerator = pathGenerator;
-
-        this.finishSegments = new ArrayList<>(priorFinishedSegments);
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
+    }
 
-        this.currentSegment = new SegmentWriter(pathGenerator.get());
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, finishedSegments);
     }
 
-    public void addRecord(T record) throws IOException {
-        currentSegment.addRecord(record);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            @Nullable MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        this.currentWriter = createSegmentWriter();
     }
 
-    public void finishCurrentSegment() throws IOException {
-        finishCurrentSegment(true);
+    public void addRecord(T record) throws IOException {
+        if (!currentWriter.addRecord(record)) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            Preconditions.checkState(currentWriter.addRecord(record));
+        }
     }
 
+    /** Finishes adding records and closes resources occupied for adding 
records. */
     public List<Segment> finish() throws IOException {
-        finishCurrentSegment(false);
-        return finishSegments;
-    }
+        if (currentWriter == null) {
+            return finishedSegments;
+        }
 
-    public FileSystem getFileSystem() {
-        return fileSystem;
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = null;
+        return finishedSegments;
     }
 
-    public List<Segment> getFinishSegments() {
-        return finishSegments;
+    /**
+     * Flushes all added records to segments and returns a list of segments 
containing all cached
+     * records.
+     */
+    public List<Segment> getSegments() throws IOException {
+        finishCurrentSegmentIfAny();
+        return finishedSegments;
     }
 
-    private void finishCurrentSegment(boolean newSegment) throws IOException {
-        if (currentSegment != null) {
-            currentSegment.finish().ifPresent(finishSegments::add);
-            currentSegment = null;
+    private void finishCurrentSegmentIfAny() throws IOException {
+        if (currentWriter == null) {
+            return;
         }
 
-        if (newSegment) {
-            currentSegment = new SegmentWriter(pathGenerator.get());
-        }
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = createSegmentWriter();
     }
 
-    private class SegmentWriter {
-
-        private final Path path;
-
-        private final FSDataOutputStream outputStream;
-
-        private final DataOutputView outputView;
-
-        private int currentSegmentCount;
-
-        public SegmentWriter(Path path) throws IOException {
-            this.path = path;
-            this.outputStream = fileSystem.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
-            this.outputView = new DataOutputViewStreamWrapper(outputStream);
+    /** Cleans up all previously added records. */
+    public void cleanup() throws IOException {
+        finishCurrentSegmentIfAny();
+        for (Segment segment : finishedSegments) {
+            if (segment.getFsSize() > 0) {
+                fileSystem.delete(segment.getPath(), false);
+            }
+            if (segment.getCache() != null) {
+                segmentPool.returnAll(segment.getCache());
+            }
         }
+        finishedSegments.clear();
+    }
 
-        public void addRecord(T record) throws IOException {
-            serializer.serialize(record, outputView);
-            currentSegmentCount += 1;
-        }
+    /** Persists the segments in this writer to disk. */
+    public void persistSegmentsToDisk() throws IOException {

Review Comment:
   nits: given that we use `FileSegmentWriter`, would it be more consistent to 
rename this method as `writeSegmentsToFiles()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to