yunfengzhou-hub commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r876654438
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheReader.java:
##########
@@ -20,120 +20,121 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-
-import javax.annotation.Nullable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-/** Reads the cached data from a list of paths. */
+/** Reads the cached data from a list of segments. */
public class DataCacheReader<T> implements Iterator<T> {
- private final TypeSerializer<T> serializer;
+ private final MemoryManager memoryManager;
- private final FileSystem fileSystem;
+ private final TypeSerializer<T> serializer;
private final List<Segment> segments;
- @Nullable private SegmentReader currentSegmentReader;
+ private SegmentReader<T> currentReader;
+
+ private MemorySegmentWriter<T> cacheWriter;
+
+ private int segmentIndex;
public DataCacheReader(
- TypeSerializer<T> serializer, FileSystem fileSystem, List<Segment>
segments)
- throws IOException {
- this(serializer, fileSystem, segments, new Tuple2<>(0, 0));
+ TypeSerializer<T> serializer, MemoryManager memoryManager,
List<Segment> segments) {
+ this(serializer, memoryManager, segments, new Tuple2<>(0, 0));
}
public DataCacheReader(
TypeSerializer<T> serializer,
- FileSystem fileSystem,
+ MemoryManager memoryManager,
List<Segment> segments,
- Tuple2<Integer, Integer> readerPosition)
- throws IOException {
-
+ Tuple2<Integer, Integer> readerPosition) {
+ this.memoryManager = memoryManager;
this.serializer = serializer;
- this.fileSystem = fileSystem;
this.segments = segments;
+ this.segmentIndex = readerPosition.f0;
+
+ createSegmentReaderAndCache(readerPosition.f0, readerPosition.f1);
+ }
+
+ private void createSegmentReaderAndCache(int index, int startOffset) {
+ try {
+ cacheWriter = null;
- if (readerPosition.f0 < segments.size()) {
- this.currentSegmentReader = new SegmentReader(readerPosition.f0,
readerPosition.f1);
+ if (index >= segments.size()) {
+ currentReader = null;
+ return;
+ }
+
+ currentReader = SegmentReader.create(serializer,
segments.get(index), startOffset);
+
+ boolean shouldCacheInMemory =
+ startOffset == 0
+ && currentReader instanceof FsSegmentReader
+ &&
MemoryUtils.isMemoryEnoughForCache(memoryManager);
+
+ if (shouldCacheInMemory) {
+ cacheWriter =
+ new MemorySegmentWriter<>(
+ segments.get(index).path,
+ memoryManager,
+ segments.get(index).inMemorySize);
+ }
+ } catch (MemoryAllocationException e) {
+ cacheWriter = null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@Override
public boolean hasNext() {
- return currentSegmentReader != null && currentSegmentReader.hasNext();
+ return currentReader != null && currentReader.hasNext();
}
@Override
public T next() {
try {
- T next = currentSegmentReader.next();
-
- if (!currentSegmentReader.hasNext()) {
- currentSegmentReader.close();
- if (currentSegmentReader.index < segments.size() - 1) {
- currentSegmentReader = new
SegmentReader(currentSegmentReader.index + 1, 0);
- } else {
- currentSegmentReader = null;
+ T record = currentReader.next();
+
+ if (cacheWriter != null) {
+ if (!cacheWriter.addRecord(record)) {
+ cacheWriter
+ .finish()
+ .ifPresent(x ->
memoryManager.releaseMemory(x.path, x.inMemorySize));
+ cacheWriter = null;
+ }
+ }
+
+ if (!currentReader.hasNext()) {
+ currentReader.close();
+ if (cacheWriter != null) {
+ cacheWriter
+ .finish()
+ .ifPresent(
+ x -> {
+ x.fsSize =
segments.get(segmentIndex).fsSize;
+ segments.set(segmentIndex, x);
Review Comment:
A reader would not modify data stored in file or metadata about that file.
It can just create a cache of the data in a segment, and assign the cache
metadata to that segment. I'll modify the implementation to better reflect this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]