lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890992134
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java:
##########
@@ -167,26 +183,69 @@ public static DataCacheSnapshot recover(
if (isDistributedFS) {
segments = deserializeSegments(dis);
} else {
- int totalRecords = dis.readInt();
- long totalSize = dis.readLong();
-
- Path path = pathGenerator.get();
- try (FSDataOutputStream outputStream =
- fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE)) {
-
- BoundedInputStream inputStream =
- new BoundedInputStream(checkpointInputStream,
totalSize);
- inputStream.setPropagateClose(false);
- IOUtils.copyBytes(inputStream, outputStream, false);
- inputStream.close();
+ int segmentNum = dis.readInt();
+ segments = new ArrayList<>(segmentNum);
+ for (int i = 0; i < segmentNum; i++) {
+ int count = dis.readInt();
+ long fsSize = dis.readLong();
+ Path path = pathGenerator.get();
+ try (FSDataOutputStream outputStream =
+ fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE)) {
+
+ BoundedInputStream boundedInputStream =
+ new BoundedInputStream(checkpointInputStream,
fsSize);
+ boundedInputStream.setPropagateClose(false);
+ IOUtils.copyBytes(boundedInputStream, outputStream,
false);
+ boundedInputStream.close();
+ }
+ segments.add(new Segment(path, count, fsSize));
}
- segments = Collections.singletonList(new Segment(path,
totalRecords, totalSize));
}
return new DataCacheSnapshot(fileSystem, readerPosition, segments);
}
}
+ /**
+ * Makes an attempt to cache the segments in memory.
+ *
+ * <p>The attempt is made at segment granularity, which means there might
be only part of the
+ * segments are cached.
+ *
+ * <p>This method does not throw exception if there is not enough memory
space for caching a
+ * segment.
+ */
+ public <T> void tryReadSegmentsToMemory(
+ TypeSerializer<T> serializer, MemorySegmentPool segmentPool)
throws IOException {
+ boolean cacheSuccess;
+ for (Segment segment : segments) {
+ if (!segment.getCache().isEmpty()) {
+ continue;
+ }
+
+ SegmentReader<T> reader = new FileSegmentReader<>(serializer,
segment, 0);
+ SegmentWriter<T> writer;
+ try {
+ writer =
+ new MemorySegmentWriter<>(
+ serializer, segment.getPath(), segmentPool,
segment.getFsSize());
+ } catch (MemoryAllocationException e) {
+ continue;
Review Comment:
In order to reduce the chance that we repeatedly read part of the segment
from disk into memory and then fail due to memory limitation, how about we
break out of the loop on the first failure?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]