lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r889905269


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,158 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter<T> {
 
+    /** A soft limit on the max allowed size of a single segment. */
+    static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+    /** The tool to serialize received records into bytes. */
     private final TypeSerializer<T> serializer;
 
+    /** The file system that contains the cache files. */
     private final FileSystem fileSystem;
 
+    /** A generator to generate paths of cache files. */
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    /** An optional pool that provide memory segments to hold cached records 
in memory. */
+    @Nullable private final MemorySegmentPool segmentPool;
+
+    /** The segments that contain previously added records. */
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    /** The current writer for new records. */
+    @Nullable private SegmentWriter<T> currentWriter;
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator)
             throws IOException {
-        this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
     }
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator,
-            List<Segment> priorFinishedSegments)
+            MemorySegmentPool segmentPool)
             throws IOException {
-        this.serializer = serializer;
-        this.fileSystem = fileSystem;
-        this.pathGenerator = pathGenerator;
-
-        this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
-        this.currentSegment = new SegmentWriter(pathGenerator.get());
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
     }
 
-    public void addRecord(T record) throws IOException {
-        currentSegment.addRecord(record);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, finishedSegments);
     }
 
-    public void finishCurrentSegment() throws IOException {
-        finishCurrentSegment(true);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            @Nullable MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        this.currentWriter = createSegmentWriter();
     }
 
-    public List<Segment> finish() throws IOException {
-        finishCurrentSegment(false);
-        return finishSegments;
+    public void addRecord(T record) throws IOException {
+        assert currentWriter != null;
+        if (!currentWriter.addRecord(record)) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            Preconditions.checkState(currentWriter.addRecord(record));
+        }
     }
 
-    public FileSystem getFileSystem() {
-        return fileSystem;
-    }
+    /** Finishes current segment if records has ever been added to this 
segment. */
+    public void finishCurrentSegmentIfAny() throws IOException {
+        if (currentWriter == null || currentWriter.getCount() == 0) {
+            return;
+        }
 
-    public List<Segment> getFinishSegments() {
-        return finishSegments;
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = createSegmentWriter();
     }
 
-    private void finishCurrentSegment(boolean newSegment) throws IOException {
-        if (currentSegment != null) {
-            currentSegment.finish().ifPresent(finishSegments::add);
-            currentSegment = null;
+    /** Finishes adding records and closes resources occupied for adding 
records. */
+    public List<Segment> finish() throws IOException {
+        if (currentWriter == null) {
+            return finishedSegments;
         }
 
-        if (newSegment) {
-            currentSegment = new SegmentWriter(pathGenerator.get());
-        }
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = null;
+        return finishedSegments;
     }
 
-    private class SegmentWriter {
-
-        private final Path path;
-
-        private final FSDataOutputStream outputStream;
-
-        private final DataOutputView outputView;
-
-        private int currentSegmentCount;
+    public List<Segment> getFinishedSegments() {
+        return finishedSegments;
+    }
 
-        public SegmentWriter(Path path) throws IOException {
-            this.path = path;
-            this.outputStream = fileSystem.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
-            this.outputView = new DataOutputViewStreamWrapper(outputStream);
+    /** Cleans up all previously added records. */
+    public void cleanup() throws IOException {
+        finishCurrentSegmentIfAny();
+        for (Segment segment : finishedSegments) {
+            if (segment.isOnDisk()) {
+                fileSystem.delete(segment.getPath(), false);
+            }
+            if (segment.isCached()) {
+                assert segmentPool != null;
+                segmentPool.returnAll(segment.getCache());
+            }
         }
+        finishedSegments.clear();
+    }
 
-        public void addRecord(T record) throws IOException {
-            serializer.serialize(record, outputView);
-            currentSegmentCount += 1;
-        }
+    public void persistSegmentsToDisk() throws IOException {

Review Comment:
   It appears that we almost always call `finishCurrentSegmentIfAny()` before 
calling `persistSegmentsToDisk()`. Would it be simpler to just invoke 
`finishCurrentSegmentIfAny()` inside this method so that algorithm developers 
don't need to explicitly call `finishCurrentSegmentIfAny()`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,158 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter<T> {
 
+    /** A soft limit on the max allowed size of a single segment. */
+    static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+    /** The tool to serialize received records into bytes. */
     private final TypeSerializer<T> serializer;
 
+    /** The file system that contains the cache files. */
     private final FileSystem fileSystem;
 
+    /** A generator to generate paths of cache files. */
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    /** An optional pool that provide memory segments to hold cached records 
in memory. */
+    @Nullable private final MemorySegmentPool segmentPool;
+
+    /** The segments that contain previously added records. */
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    /** The current writer for new records. */
+    @Nullable private SegmentWriter<T> currentWriter;
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator)
             throws IOException {
-        this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
     }
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator,
-            List<Segment> priorFinishedSegments)
+            MemorySegmentPool segmentPool)
             throws IOException {
-        this.serializer = serializer;
-        this.fileSystem = fileSystem;
-        this.pathGenerator = pathGenerator;
-
-        this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
-        this.currentSegment = new SegmentWriter(pathGenerator.get());
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
     }
 
-    public void addRecord(T record) throws IOException {
-        currentSegment.addRecord(record);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, finishedSegments);
     }
 
-    public void finishCurrentSegment() throws IOException {
-        finishCurrentSegment(true);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            @Nullable MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        this.currentWriter = createSegmentWriter();
     }
 
-    public List<Segment> finish() throws IOException {
-        finishCurrentSegment(false);
-        return finishSegments;
+    public void addRecord(T record) throws IOException {
+        assert currentWriter != null;
+        if (!currentWriter.addRecord(record)) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            Preconditions.checkState(currentWriter.addRecord(record));
+        }
     }
 
-    public FileSystem getFileSystem() {
-        return fileSystem;
-    }
+    /** Finishes current segment if records has ever been added to this 
segment. */
+    public void finishCurrentSegmentIfAny() throws IOException {
+        if (currentWriter == null || currentWriter.getCount() == 0) {
+            return;
+        }
 
-    public List<Segment> getFinishSegments() {
-        return finishSegments;
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = createSegmentWriter();
     }
 
-    private void finishCurrentSegment(boolean newSegment) throws IOException {
-        if (currentSegment != null) {
-            currentSegment.finish().ifPresent(finishSegments::add);
-            currentSegment = null;
+    /** Finishes adding records and closes resources occupied for adding 
records. */
+    public List<Segment> finish() throws IOException {
+        if (currentWriter == null) {
+            return finishedSegments;
         }
 
-        if (newSegment) {
-            currentSegment = new SegmentWriter(pathGenerator.get());
-        }
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = null;
+        return finishedSegments;
     }
 
-    private class SegmentWriter {
-
-        private final Path path;
-
-        private final FSDataOutputStream outputStream;
-
-        private final DataOutputView outputView;
-
-        private int currentSegmentCount;
+    public List<Segment> getFinishedSegments() {
+        return finishedSegments;

Review Comment:
   It seems that we almost always call `finishCurrentSegmentIfAny()` before 
calling `getFinishedSegments()`. Would it be simpler to just invoke 
`finishCurrentSegmentIfAny()` inside this method so that algorithm developers 
don't need to explicitly call `finishCurrentSegmentIfAny()`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/SegmentWriter.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Writer for the data to be cached to a segment. */
+@Internal
+interface SegmentWriter<T> {
+    /** Adds a record to the writer. */

Review Comment:
   Can you add Java doc explaining the return value?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,158 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter<T> {
 
+    /** A soft limit on the max allowed size of a single segment. */
+    static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+    /** The tool to serialize received records into bytes. */
     private final TypeSerializer<T> serializer;
 
+    /** The file system that contains the cache files. */
     private final FileSystem fileSystem;
 
+    /** A generator to generate paths of cache files. */
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    /** An optional pool that provide memory segments to hold cached records 
in memory. */
+    @Nullable private final MemorySegmentPool segmentPool;
+
+    /** The segments that contain previously added records. */
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    /** The current writer for new records. */
+    @Nullable private SegmentWriter<T> currentWriter;
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator)
             throws IOException {
-        this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
     }
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator,
-            List<Segment> priorFinishedSegments)
+            MemorySegmentPool segmentPool)
             throws IOException {
-        this.serializer = serializer;
-        this.fileSystem = fileSystem;
-        this.pathGenerator = pathGenerator;
-
-        this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
-        this.currentSegment = new SegmentWriter(pathGenerator.get());
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
     }
 
-    public void addRecord(T record) throws IOException {
-        currentSegment.addRecord(record);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, finishedSegments);
     }
 
-    public void finishCurrentSegment() throws IOException {
-        finishCurrentSegment(true);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            @Nullable MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        this.currentWriter = createSegmentWriter();
     }
 
-    public List<Segment> finish() throws IOException {
-        finishCurrentSegment(false);
-        return finishSegments;
+    public void addRecord(T record) throws IOException {
+        assert currentWriter != null;

Review Comment:
   After thinking about this more, I find it more consistent with other code to 
just remove the assert here. Note that the code will throw NullPointerException 
if `currentWriter  == null`, which is good enough for us to investigate this 
bug.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,158 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter<T> {
 
+    /** A soft limit on the max allowed size of a single segment. */
+    static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+    /** The tool to serialize received records into bytes. */
     private final TypeSerializer<T> serializer;
 
+    /** The file system that contains the cache files. */
     private final FileSystem fileSystem;
 
+    /** A generator to generate paths of cache files. */
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    /** An optional pool that provide memory segments to hold cached records 
in memory. */
+    @Nullable private final MemorySegmentPool segmentPool;
+
+    /** The segments that contain previously added records. */
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    /** The current writer for new records. */
+    @Nullable private SegmentWriter<T> currentWriter;
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator)
             throws IOException {
-        this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
     }
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator,
-            List<Segment> priorFinishedSegments)
+            MemorySegmentPool segmentPool)
             throws IOException {
-        this.serializer = serializer;
-        this.fileSystem = fileSystem;
-        this.pathGenerator = pathGenerator;
-
-        this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
-        this.currentSegment = new SegmentWriter(pathGenerator.get());
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
     }
 
-    public void addRecord(T record) throws IOException {
-        currentSegment.addRecord(record);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, finishedSegments);
     }
 
-    public void finishCurrentSegment() throws IOException {
-        finishCurrentSegment(true);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            @Nullable MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        this.currentWriter = createSegmentWriter();
     }
 
-    public List<Segment> finish() throws IOException {
-        finishCurrentSegment(false);
-        return finishSegments;
+    public void addRecord(T record) throws IOException {
+        assert currentWriter != null;
+        if (!currentWriter.addRecord(record)) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            Preconditions.checkState(currentWriter.addRecord(record));
+        }
     }
 
-    public FileSystem getFileSystem() {
-        return fileSystem;
-    }
+    /** Finishes current segment if records has ever been added to this 
segment. */
+    public void finishCurrentSegmentIfAny() throws IOException {

Review Comment:
   It appears that this method is called when we increment epoch watermark, end 
input, snapshot state, or inside `processPendingElementsAndWatermarks()`.
   
   The first four cases happens at most regularly. The segment will most likely 
contain data. And in the rare cases when it does not contain data, the 
performance impact of re-creating an empty segment should be negligible given 
the frequency of these cases. 
   
   Due to the use of `hasPendingElements` inside 
`AbstractBroadcastWrapperOperator`, it appears that 
`processPendingElementsAndWatermarks()` is called only once for each 
`AbstractBroadcastWrapperOperator()` instance, which suggests that the overhead 
to re-create empty segment due to this method will also be pretty small.
   
   How about we remove the optimization related to `getCount()` to simplify the 
code, and only add it if we have concrete reason to believe it will improve 
performance?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,158 @@
 package org.apache.flink.iteration.datacache.nonkeyed;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.SupplierWithException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 
 /** Records the data received and replayed them on required. */
 public class DataCacheWriter<T> {
 
+    /** A soft limit on the max allowed size of a single segment. */
+    static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+    /** The tool to serialize received records into bytes. */
     private final TypeSerializer<T> serializer;
 
+    /** The file system that contains the cache files. */
     private final FileSystem fileSystem;
 
+    /** A generator to generate paths of cache files. */
     private final SupplierWithException<Path, IOException> pathGenerator;
 
-    private final List<Segment> finishSegments;
+    /** An optional pool that provide memory segments to hold cached records 
in memory. */
+    @Nullable private final MemorySegmentPool segmentPool;
+
+    /** The segments that contain previously added records. */
+    private final List<Segment> finishedSegments;
 
-    private SegmentWriter currentSegment;
+    /** The current writer for new records. */
+    @Nullable private SegmentWriter<T> currentWriter;
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator)
             throws IOException {
-        this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
     }
 
     public DataCacheWriter(
             TypeSerializer<T> serializer,
             FileSystem fileSystem,
             SupplierWithException<Path, IOException> pathGenerator,
-            List<Segment> priorFinishedSegments)
+            MemorySegmentPool segmentPool)
             throws IOException {
-        this.serializer = serializer;
-        this.fileSystem = fileSystem;
-        this.pathGenerator = pathGenerator;
-
-        this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
-        this.currentSegment = new SegmentWriter(pathGenerator.get());
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
     }
 
-    public void addRecord(T record) throws IOException {
-        currentSegment.addRecord(record);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, finishedSegments);
     }
 
-    public void finishCurrentSegment() throws IOException {
-        finishCurrentSegment(true);
+    public DataCacheWriter(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            @Nullable MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        this.currentWriter = createSegmentWriter();
     }
 
-    public List<Segment> finish() throws IOException {
-        finishCurrentSegment(false);
-        return finishSegments;
+    public void addRecord(T record) throws IOException {
+        assert currentWriter != null;
+        if (!currentWriter.addRecord(record)) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            Preconditions.checkState(currentWriter.addRecord(record));
+        }
     }
 
-    public FileSystem getFileSystem() {
-        return fileSystem;
-    }
+    /** Finishes current segment if records has ever been added to this 
segment. */
+    public void finishCurrentSegmentIfAny() throws IOException {
+        if (currentWriter == null || currentWriter.getCount() == 0) {

Review Comment:
   It looks like we can make this method private and let 
`persistSegmentsToDisk/getFinishedSegments` call this method.
   
   And we can rename `getFinishedSegments()` as `getSegments()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to