lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890693618
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,73 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing persisted records. */
private final Path path;
- /** The count of the records in the file. */
+ /**
+ * The count of records in the file at the path if the file size is not
zero, otherwise the
+ * count of records in the cache.
+ */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing persisted records. */
+ private long fsSize = -1L;
+
+ /** The memory segments containing cached records. */
Review Comment:
Could you update the code explaining that `the cache list is empty iff the
segment has not been cached in memory`?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,73 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing persisted records. */
private final Path path;
- /** The count of the records in the file. */
+ /**
+ * The count of records in the file at the path if the file size is not
zero, otherwise the
+ * count of records in the cache.
+ */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing persisted records. */
+ private long fsSize = -1L;
+
+ /** The memory segments containing cached records. */
+ private List<MemorySegment> cache;
+
+ Segment(Path path, int count, long fsSize) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
+ this.count = count;
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
- public Segment(Path path, int count, long size) {
- this.path = path;
+ Segment(Path path, int count, List<MemorySegment> cache) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
this.count = count;
- this.size = size;
+ this.cache = checkNotNull(cache);
}
- public Path getPath() {
+ void setCache(List<MemorySegment> cache) {
+ this.cache = checkNotNull(cache);
+ }
+
+ void setFsSize(long fsSize) {
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
+
+ Path getPath() {
return path;
}
- public int getCount() {
+ int getCount() {
return count;
}
- public long getSize() {
- return size;
+ long getFsSize() {
+ return fsSize;
+ }
+
+ List<MemorySegment> getCache() {
+ return cache;
Review Comment:
Instead of indicating `the segment has been read into the memory` by using
`cache == 0`, would it be simpler to use `cache.isEmpty()`?
This would allow us to handle less special case (e.g. null). After all if
the segment has been read into memory, it must have non-empty memory segment
list.
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,73 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing persisted records. */
private final Path path;
- /** The count of the records in the file. */
+ /**
+ * The count of records in the file at the path if the file size is not
zero, otherwise the
+ * count of records in the cache.
+ */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing persisted records. */
+ private long fsSize = -1L;
Review Comment:
Would it be more intuitive to initialize the fsSize to 0?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,73 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing persisted records. */
private final Path path;
- /** The count of the records in the file. */
+ /**
+ * The count of records in the file at the path if the file size is not
zero, otherwise the
+ * count of records in the cache.
+ */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing persisted records. */
Review Comment:
Could you update the the comment explaining that `fsSize is 0 iff the
segment has not been written to the given path`.
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -63,16 +98,18 @@ public boolean equals(Object o) {
}
Segment segment = (Segment) o;
- return count == segment.count && size == segment.size &&
Objects.equals(path, segment.path);
+ return count == segment.count
+ && fsSize == segment.fsSize
Review Comment:
Should a segment be uniquely identified by its count and path only?
I suppose if we write segment data from memory to file, it should still
represent the same segment, right?
Same for `hashCode()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]