lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r889646062


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCache.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Records the data received and replays them on required. */
+@Internal
+public class DataCache<T> implements Iterable<T> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    private final TypeSerializer<T> serializer;
+
+    private final FileSystem fileSystem;
+
+    private final SupplierWithException<Path, IOException> pathGenerator;
+
+    private final MemorySegmentPool segmentPool;
+
+    private final List<Segment> finishedSegments;
+
+    private SegmentWriter<T> currentWriter;
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
+    }
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            MemorySegmentPool segmentPool)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
+    }
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        for (Segment segment : finishedSegments) {
+            tryCacheSegmentToMemory(segment);
+        }
+        this.currentWriter = createSegmentWriter();
+    }
+
+    public void addRecord(T record) throws IOException {
+        try {
+            currentWriter.addRecord(record);
+        } catch (SegmentNoVacancyException e) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            currentWriter.addRecord(record);
+        }
+    }
+
+    /** Finishes adding records and closes resources occupied for adding 
records. */
+    public void finish() throws IOException {
+        if (currentWriter == null) {
+            return;
+        }
+
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = null;
+    }
+
+    /** Cleans up all previously added records. */
+    public void cleanup() throws IOException {
+        finishCurrentSegmentIfAny();
+        for (Segment segment : finishedSegments) {
+            if (segment.getFileSegment() != null) {
+                fileSystem.delete(segment.getFileSegment().getPath(), false);
+            }
+            if (segment.getMemorySegment() != null) {
+                segmentPool.returnAll(segment.getMemorySegment().getCache());
+            }
+        }
+        finishedSegments.clear();
+    }
+
+    private void finishCurrentSegmentIfAny() throws IOException {
+        if (currentWriter == null || currentWriter.getCount() == 0) {

Review Comment:
   Will `finishCurrentSegmentIfAny()` ever be called after `finish()` has been 
called? If not, would it be simpler to skip checking `currentWriter == null` 
here since it should never happen?
   
   We can add `assert(currentWriter != null)` if needed.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheIterator.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+/** Reads the cached data from a list of segments. */
+@Internal
+public class DataCacheIterator<T> implements Iterator<T> {

Review Comment:
   It appears that `DataCacheIterator` owns the APIs to read data cache 
segments. And `DataCache` owns the APIs to write data cache segments, as well 
as APIs to recover and reply data cache snapshots.
   
   Would it be more consistent with other class names (e.g. SegmentReader, 
SegmentWrite) to rename DataCacheIterator/DataCache to 
DataCacheReader/DataCacheWriter respectively?
   
   Is it possible to still decouple `DataCacheWriter` from `DataCacheSnapshot`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCache.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Records the data received and replays them on required. */
+@Internal
+public class DataCache<T> implements Iterable<T> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    private final TypeSerializer<T> serializer;
+
+    private final FileSystem fileSystem;
+
+    private final SupplierWithException<Path, IOException> pathGenerator;
+
+    private final MemorySegmentPool segmentPool;
+
+    private final List<Segment> finishedSegments;
+
+    private SegmentWriter<T> currentWriter;
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
+    }
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            MemorySegmentPool segmentPool)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
+    }
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        for (Segment segment : finishedSegments) {
+            tryCacheSegmentToMemory(segment);
+        }
+        this.currentWriter = createSegmentWriter();
+    }
+
+    public void addRecord(T record) throws IOException {
+        try {
+            currentWriter.addRecord(record);
+        } catch (SegmentNoVacancyException e) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            currentWriter.addRecord(record);
+        }
+    }
+
+    /** Finishes adding records and closes resources occupied for adding 
records. */
+    public void finish() throws IOException {
+        if (currentWriter == null) {

Review Comment:
   Would `finish()` ever be called twice on the same `DataCache` instance? If 
not, would it be simpler to skip this check?
   
   We can add `assert(currentWriter != null)` if needed.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentWriter.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** A class that writes cache data to memory segments. */
+@Internal
+class MemorySegmentWriter<T> implements SegmentWriter<T> {
+
+    private final TypeSerializer<T> serializer;
+
+    private final MemorySegmentPool segmentPool;
+
+    private final ManagedMemoryOutputStream outputStream;
+
+    private final DataOutputView outputView;
+
+    private int count;
+
+    MemorySegmentWriter(
+            TypeSerializer<T> serializer, MemorySegmentPool segmentPool, long 
expectedSize)
+            throws SegmentNoVacancyException {
+        this.serializer = serializer;
+        this.segmentPool = segmentPool;
+        this.outputStream = new ManagedMemoryOutputStream(segmentPool, 
expectedSize);
+        this.outputView = new DataOutputViewStreamWrapper(outputStream);
+        this.count = 0;
+    }
+
+    @Override
+    public void addRecord(T record) throws IOException {
+        serializer.serialize(record, outputView);
+        count++;
+    }
+
+    @Override
+    public int getCount() {
+        return this.count;
+    }
+
+    @Override
+    public Optional<Segment> finish() throws IOException {
+        if (count > 0) {
+            return Optional.of(
+                    new Segment(
+                            new 
org.apache.flink.iteration.datacache.nonkeyed.MemorySegment(
+                                    outputStream.getSegments(), count)));
+        } else {
+            segmentPool.returnAll(outputStream.getSegments());
+            return Optional.empty();
+        }
+    }
+
+    private static class ManagedMemoryOutputStream extends OutputStream {
+        private final MemorySegmentPool segmentPool;
+
+        private final int pageSize;
+
+        private final List<MemorySegment> segments = new ArrayList<>();
+
+        private int segmentIndex;
+
+        private int segmentOffset;
+
+        private long globalOffset;

Review Comment:
   Can you add Java doc for these private variables, similar to what we did in 
`FileSegment.java`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCache.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Records the data received and replays them on required. */
+@Internal
+public class DataCache<T> implements Iterable<T> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    private final TypeSerializer<T> serializer;
+
+    private final FileSystem fileSystem;
+
+    private final SupplierWithException<Path, IOException> pathGenerator;
+
+    private final MemorySegmentPool segmentPool;
+
+    private final List<Segment> finishedSegments;
+
+    private SegmentWriter<T> currentWriter;
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, null, 
Collections.emptyList());
+    }
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            MemorySegmentPool segmentPool)
+            throws IOException {
+        this(serializer, fileSystem, pathGenerator, segmentPool, 
Collections.emptyList());
+    }
+
+    public DataCache(
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator,
+            MemorySegmentPool segmentPool,
+            List<Segment> finishedSegments)
+            throws IOException {
+        this.fileSystem = fileSystem;
+        this.pathGenerator = pathGenerator;
+        this.segmentPool = segmentPool;
+        this.serializer = serializer;
+        this.finishedSegments = new ArrayList<>();
+        this.finishedSegments.addAll(finishedSegments);
+        for (Segment segment : finishedSegments) {
+            tryCacheSegmentToMemory(segment);
+        }
+        this.currentWriter = createSegmentWriter();
+    }
+
+    public void addRecord(T record) throws IOException {
+        try {
+            currentWriter.addRecord(record);
+        } catch (SegmentNoVacancyException e) {
+            currentWriter.finish().ifPresent(finishedSegments::add);
+            currentWriter = new FileSegmentWriter<>(serializer, 
pathGenerator.get());
+            currentWriter.addRecord(record);
+        }
+    }
+
+    /** Finishes adding records and closes resources occupied for adding 
records. */
+    public void finish() throws IOException {
+        if (currentWriter == null) {
+            return;
+        }
+
+        currentWriter.finish().ifPresent(finishedSegments::add);
+        currentWriter = null;
+    }
+
+    /** Cleans up all previously added records. */
+    public void cleanup() throws IOException {
+        finishCurrentSegmentIfAny();
+        for (Segment segment : finishedSegments) {
+            if (segment.getFileSegment() != null) {
+                fileSystem.delete(segment.getFileSegment().getPath(), false);
+            }
+            if (segment.getMemorySegment() != null) {
+                segmentPool.returnAll(segment.getMemorySegment().getCache());
+            }
+        }
+        finishedSegments.clear();
+    }
+
+    private void finishCurrentSegmentIfAny() throws IOException {

Review Comment:
   Is there any use-case where we need to write to a DataCache instance after 
having read from it?
   
   If not, would it be simpler to just call `finish()` once before reading from 
the DataCache?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentWriter.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** A class that writes cache data to memory segments. */
+@Internal
+class MemorySegmentWriter<T> implements SegmentWriter<T> {
+
+    private final TypeSerializer<T> serializer;
+
+    private final MemorySegmentPool segmentPool;
+
+    private final ManagedMemoryOutputStream outputStream;
+
+    private final DataOutputView outputView;
+
+    private int count;
+
+    MemorySegmentWriter(
+            TypeSerializer<T> serializer, MemorySegmentPool segmentPool, long 
expectedSize)
+            throws SegmentNoVacancyException {
+        this.serializer = serializer;
+        this.segmentPool = segmentPool;
+        this.outputStream = new ManagedMemoryOutputStream(segmentPool, 
expectedSize);
+        this.outputView = new DataOutputViewStreamWrapper(outputStream);
+        this.count = 0;
+    }
+
+    @Override
+    public void addRecord(T record) throws IOException {
+        serializer.serialize(record, outputView);
+        count++;
+    }
+
+    @Override
+    public int getCount() {
+        return this.count;
+    }
+
+    @Override
+    public Optional<Segment> finish() throws IOException {
+        if (count > 0) {
+            return Optional.of(
+                    new Segment(
+                            new 
org.apache.flink.iteration.datacache.nonkeyed.MemorySegment(
+                                    outputStream.getSegments(), count)));
+        } else {
+            segmentPool.returnAll(outputStream.getSegments());
+            return Optional.empty();
+        }
+    }
+
+    private static class ManagedMemoryOutputStream extends OutputStream {
+        private final MemorySegmentPool segmentPool;
+
+        private final int pageSize;
+
+        private final List<MemorySegment> segments = new ArrayList<>();
+
+        private int segmentIndex;
+
+        private int segmentOffset;
+
+        private long globalOffset;
+
+        public ManagedMemoryOutputStream(MemorySegmentPool segmentPool, long 
expectedSize)
+                throws SegmentNoVacancyException {
+            this.segmentPool = segmentPool;
+            this.pageSize = segmentPool.pageSize();
+            this.segmentIndex = 0;
+            this.segmentOffset = 0;
+
+            Preconditions.checkArgument(expectedSize >= 0);
+            ensureCapacity(Math.max(expectedSize, 1L));
+        }
+
+        public List<MemorySegment> getSegments() {
+            return segments;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            write(new byte[] {(byte) b}, 0, 1);
+        }
+
+        @Override
+        public void write(@Nullable byte[] b, int off, int len) throws 
IOException {
+            ensureCapacity(globalOffset + len);
+            writeRecursive(b, off, len);
+        }
+
+        private void ensureCapacity(long capacity) throws 
SegmentNoVacancyException {
+            Preconditions.checkArgument(capacity > 0);
+            int required =
+                    (int) (capacity % pageSize == 0 ? capacity / pageSize : 
capacity / pageSize + 1)
+                            - segments.size();
+
+            List<MemorySegment> allocatedSegments = new ArrayList<>();
+            for (int i = 0; i < required; i++) {
+                MemorySegment memorySegment = segmentPool.nextSegment();
+                if (memorySegment == null) {
+                    segmentPool.returnAll(allocatedSegments);
+                    throw new SegmentNoVacancyException(new 
MemoryAllocationException());

Review Comment:
   In general we don't consider it IO failure if there is not enough memory in 
the memory pool. Thus MemoryAllocationException is not a subclass of 
IOException.
   
   It seems better to avoid converting MemoryAllocationException into an 
IOException.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCache.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Records the data received and replays them on required. */
+@Internal
+public class DataCache<T> implements Iterable<T> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    private final TypeSerializer<T> serializer;
+
+    private final FileSystem fileSystem;
+
+    private final SupplierWithException<Path, IOException> pathGenerator;
+
+    private final MemorySegmentPool segmentPool;

Review Comment:
   It is probably more readable to specify @Nullable here.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,61 +18,37 @@
 
 package org.apache.flink.iteration.datacache.nonkeyed;
 
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.annotation.Internal;
 
-import java.io.Serializable;
-import java.util.Objects;
+/** A segment contains the information about a cache unit. */
+@Internal
+class Segment {
 
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+    private FileSegment fileSegment;
 
-    private final Path path;
+    private MemorySegment memorySegment;
 
-    /** The count of the records in the file. */
-    private final int count;
-
-    /** The total length of file. */
-    private final long size;
-
-    public Segment(Path path, int count, long size) {
-        this.path = path;
-        this.count = count;
-        this.size = size;
-    }
-
-    public Path getPath() {
-        return path;
+    Segment(FileSegment fileSegment) {
+        this.fileSegment = fileSegment;
     }
 
-    public int getCount() {
-        return count;
+    Segment(MemorySegment memorySegment) {
+        this.memorySegment = memorySegment;
     }
 
-    public long getSize() {
-        return size;
+    void setFileSegment(FileSegment fileSegment) {
+        this.fileSegment = fileSegment;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-
-        if (!(o instanceof Segment)) {
-            return false;
-        }
-
-        Segment segment = (Segment) o;
-        return count == segment.count && size == segment.size && 
Objects.equals(path, segment.path);
+    FileSegment getFileSegment() {
+        return fileSegment;
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(path, count, size);
+    void setMemorySegment(MemorySegment memorySegment) {

Review Comment:
   Since this method is never called as of now, would it be simpler to remove 
this method and declare `memorySegment` as final?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/FileSegmentWriter.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/** A class that writes cache data to file system. */
+@Internal
+class FileSegmentWriter<T> implements SegmentWriter<T> {
+
+    private static final long FILE_SIZE_LIMIT = 1L << 30; // 1GB
+
+    private final TypeSerializer<T> serializer;
+
+    private final Path path;
+
+    private final FileSystem fileSystem;
+
+    private final FSDataOutputStream outputStream;
+
+    private final BufferedOutputStream bufferedOutputStream;
+
+    private final DataOutputView outputView;
+
+    private int count;
+
+    FileSegmentWriter(TypeSerializer<T> serializer, Path path) throws 
IOException {
+        this.serializer = serializer;
+        this.path = path;
+        this.fileSystem = path.getFileSystem();
+        this.outputStream = fileSystem.create(path, 
FileSystem.WriteMode.NO_OVERWRITE);
+        this.bufferedOutputStream = new BufferedOutputStream(outputStream);
+        this.outputView = new 
DataOutputViewStreamWrapper(bufferedOutputStream);
+    }
+
+    @Override
+    public void addRecord(T record) throws IOException {
+        if (outputStream.getPos() >= FILE_SIZE_LIMIT) {
+            throw new SegmentNoVacancyException();

Review Comment:
   Exception (including IOException) usually indicates that "something is wrong 
and we need to either recover from it or fail fast". 
   
   Since we expect segment to have limited size, it is probably better not to 
use an exception to indicate this. How about we have `addRecord(...)` return a 
boolean, which is false if the record write failed due to size limit?
   
   Then we could also reduce the number of classes by removing 
`SegmentNoVacancyException`.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,61 +18,37 @@
 
 package org.apache.flink.iteration.datacache.nonkeyed;
 
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.annotation.Internal;
 
-import java.io.Serializable;
-import java.util.Objects;
+/** A segment contains the information about a cache unit. */
+@Internal
+class Segment {
 
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+    private FileSegment fileSegment;

Review Comment:
   Since every segment will eventually be persisted to disk, would it be 
simpler to declare this variable as final and instantiate it before/when 
constructing this Segment instance?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,61 +18,37 @@
 
 package org.apache.flink.iteration.datacache.nonkeyed;
 
-import org.apache.flink.core.fs.Path;
+import org.apache.flink.annotation.Internal;
 
-import java.io.Serializable;
-import java.util.Objects;
+/** A segment contains the information about a cache unit. */
+@Internal
+class Segment {
 
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+    private FileSegment fileSegment;
 
-    private final Path path;
+    private MemorySegment memorySegment;

Review Comment:
   Instead of creating a new class `MemorySegment` whose name collide with a 
class in `org.apache.flink.core.memory.MemorySegment`, would it be simpler to 
just put `List<org.apache.flink.core.memory.MemorySegment>` inside `Segment`?
   
   The `count` in `MemorySegment` and `FileSegment` can also be moved to 
`Segment` if we need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to