lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r889930583
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,158 @@
package org.apache.flink.iteration.datacache.nonkeyed;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
/** Records the data received and replayed them on required. */
public class DataCacheWriter<T> {
+ /** A soft limit on the max allowed size of a single segment. */
+ static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+ /** The tool to serialize received records into bytes. */
private final TypeSerializer<T> serializer;
+ /** The file system that contains the cache files. */
private final FileSystem fileSystem;
+ /** A generator to generate paths of cache files. */
private final SupplierWithException<Path, IOException> pathGenerator;
- private final List<Segment> finishSegments;
+ /** An optional pool that provide memory segments to hold cached records
in memory. */
+ @Nullable private final MemorySegmentPool segmentPool;
+
+ /** The segments that contain previously added records. */
+ private final List<Segment> finishedSegments;
- private SegmentWriter currentSegment;
+ /** The current writer for new records. */
+ @Nullable private SegmentWriter<T> currentWriter;
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator)
throws IOException {
- this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+ this(serializer, fileSystem, pathGenerator, null,
Collections.emptyList());
}
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator,
- List<Segment> priorFinishedSegments)
+ MemorySegmentPool segmentPool)
throws IOException {
- this.serializer = serializer;
- this.fileSystem = fileSystem;
- this.pathGenerator = pathGenerator;
-
- this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
- this.currentSegment = new SegmentWriter(pathGenerator.get());
+ this(serializer, fileSystem, pathGenerator, segmentPool,
Collections.emptyList());
}
- public void addRecord(T record) throws IOException {
- currentSegment.addRecord(record);
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this(serializer, fileSystem, pathGenerator, null, finishedSegments);
}
- public void finishCurrentSegment() throws IOException {
- finishCurrentSegment(true);
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ @Nullable MemorySegmentPool segmentPool,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this.fileSystem = fileSystem;
+ this.pathGenerator = pathGenerator;
+ this.segmentPool = segmentPool;
+ this.serializer = serializer;
+ this.finishedSegments = new ArrayList<>();
+ this.finishedSegments.addAll(finishedSegments);
+ this.currentWriter = createSegmentWriter();
}
- public List<Segment> finish() throws IOException {
- finishCurrentSegment(false);
- return finishSegments;
+ public void addRecord(T record) throws IOException {
+ assert currentWriter != null;
+ if (!currentWriter.addRecord(record)) {
+ currentWriter.finish().ifPresent(finishedSegments::add);
+ currentWriter = new FileSegmentWriter<>(serializer,
pathGenerator.get());
+ Preconditions.checkState(currentWriter.addRecord(record));
+ }
}
- public FileSystem getFileSystem() {
- return fileSystem;
- }
+ /** Finishes current segment if records has ever been added to this
segment. */
+ public void finishCurrentSegmentIfAny() throws IOException {
+ if (currentWriter == null || currentWriter.getCount() == 0) {
+ return;
+ }
- public List<Segment> getFinishSegments() {
- return finishSegments;
+ currentWriter.finish().ifPresent(finishedSegments::add);
+ currentWriter = createSegmentWriter();
}
- private void finishCurrentSegment(boolean newSegment) throws IOException {
- if (currentSegment != null) {
- currentSegment.finish().ifPresent(finishSegments::add);
- currentSegment = null;
+ /** Finishes adding records and closes resources occupied for adding
records. */
+ public List<Segment> finish() throws IOException {
+ if (currentWriter == null) {
+ return finishedSegments;
}
- if (newSegment) {
- currentSegment = new SegmentWriter(pathGenerator.get());
- }
+ currentWriter.finish().ifPresent(finishedSegments::add);
+ currentWriter = null;
+ return finishedSegments;
}
- private class SegmentWriter {
-
- private final Path path;
-
- private final FSDataOutputStream outputStream;
-
- private final DataOutputView outputView;
-
- private int currentSegmentCount;
+ public List<Segment> getFinishedSegments() {
+ return finishedSegments;
+ }
- public SegmentWriter(Path path) throws IOException {
- this.path = path;
- this.outputStream = fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE);
- this.outputView = new DataOutputViewStreamWrapper(outputStream);
+ /** Cleans up all previously added records. */
+ public void cleanup() throws IOException {
+ finishCurrentSegmentIfAny();
+ for (Segment segment : finishedSegments) {
+ if (segment.isOnDisk()) {
Review Comment:
Suppose `fsSize == 0`, should we still invoke `fileSystem.delete(...)` to
delete the empty file?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,80 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing cached records. */
private final Path path;
- /** The count of the records in the file. */
+ /** The count of the records in the segment. */
Review Comment:
By `in the segment`, do you mean `in the file` or `in the memory`? Could you
explain this in the Java doc?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,80 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing cached records. */
private final Path path;
- /** The count of the records in the file. */
+ /** The count of the records in the segment. */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing cached records. */
+ private long fsSize = -1L;
+
+ /** The memory segments containing cached records. */
+ private List<MemorySegment> cache;
+
+ Segment(Path path, int count, long fsSize) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
+ this.count = count;
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
- public Segment(Path path, int count, long size) {
- this.path = path;
+ Segment(Path path, int count, List<MemorySegment> cache) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
this.count = count;
- this.size = size;
+ this.cache = checkNotNull(cache);
+ }
+
+ void setCache(List<MemorySegment> cache) {
+ this.cache = checkNotNull(cache);
}
- public Path getPath() {
+ void setDiskInfo(long fsSize) {
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
+
+ boolean isOnDisk() {
Review Comment:
Suppose `getFsSize()` can be used with `fsSize == 0`, would it be simpler to
remove this method and let caller use `getFsSize() > 0`?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,158 @@
package org.apache.flink.iteration.datacache.nonkeyed;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
/** Records the data received and replayed them on required. */
public class DataCacheWriter<T> {
+ /** A soft limit on the max allowed size of a single segment. */
+ static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+ /** The tool to serialize received records into bytes. */
private final TypeSerializer<T> serializer;
+ /** The file system that contains the cache files. */
private final FileSystem fileSystem;
+ /** A generator to generate paths of cache files. */
private final SupplierWithException<Path, IOException> pathGenerator;
- private final List<Segment> finishSegments;
+ /** An optional pool that provide memory segments to hold cached records
in memory. */
+ @Nullable private final MemorySegmentPool segmentPool;
+
+ /** The segments that contain previously added records. */
+ private final List<Segment> finishedSegments;
- private SegmentWriter currentSegment;
+ /** The current writer for new records. */
+ @Nullable private SegmentWriter<T> currentWriter;
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator)
throws IOException {
- this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+ this(serializer, fileSystem, pathGenerator, null,
Collections.emptyList());
}
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator,
- List<Segment> priorFinishedSegments)
+ MemorySegmentPool segmentPool)
throws IOException {
- this.serializer = serializer;
- this.fileSystem = fileSystem;
- this.pathGenerator = pathGenerator;
-
- this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
- this.currentSegment = new SegmentWriter(pathGenerator.get());
+ this(serializer, fileSystem, pathGenerator, segmentPool,
Collections.emptyList());
}
- public void addRecord(T record) throws IOException {
- currentSegment.addRecord(record);
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this(serializer, fileSystem, pathGenerator, null, finishedSegments);
}
- public void finishCurrentSegment() throws IOException {
- finishCurrentSegment(true);
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ @Nullable MemorySegmentPool segmentPool,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this.fileSystem = fileSystem;
+ this.pathGenerator = pathGenerator;
+ this.segmentPool = segmentPool;
+ this.serializer = serializer;
+ this.finishedSegments = new ArrayList<>();
+ this.finishedSegments.addAll(finishedSegments);
+ this.currentWriter = createSegmentWriter();
}
- public List<Segment> finish() throws IOException {
- finishCurrentSegment(false);
- return finishSegments;
+ public void addRecord(T record) throws IOException {
+ assert currentWriter != null;
+ if (!currentWriter.addRecord(record)) {
+ currentWriter.finish().ifPresent(finishedSegments::add);
+ currentWriter = new FileSegmentWriter<>(serializer,
pathGenerator.get());
+ Preconditions.checkState(currentWriter.addRecord(record));
+ }
}
- public FileSystem getFileSystem() {
- return fileSystem;
- }
+ /** Finishes current segment if records has ever been added to this
segment. */
+ public void finishCurrentSegmentIfAny() throws IOException {
+ if (currentWriter == null || currentWriter.getCount() == 0) {
+ return;
+ }
- public List<Segment> getFinishSegments() {
- return finishSegments;
+ currentWriter.finish().ifPresent(finishedSegments::add);
+ currentWriter = createSegmentWriter();
}
- private void finishCurrentSegment(boolean newSegment) throws IOException {
- if (currentSegment != null) {
- currentSegment.finish().ifPresent(finishSegments::add);
- currentSegment = null;
+ /** Finishes adding records and closes resources occupied for adding
records. */
+ public List<Segment> finish() throws IOException {
+ if (currentWriter == null) {
+ return finishedSegments;
}
- if (newSegment) {
- currentSegment = new SegmentWriter(pathGenerator.get());
- }
+ currentWriter.finish().ifPresent(finishedSegments::add);
+ currentWriter = null;
+ return finishedSegments;
}
- private class SegmentWriter {
-
- private final Path path;
-
- private final FSDataOutputStream outputStream;
-
- private final DataOutputView outputView;
-
- private int currentSegmentCount;
+ public List<Segment> getFinishedSegments() {
+ return finishedSegments;
+ }
- public SegmentWriter(Path path) throws IOException {
- this.path = path;
- this.outputStream = fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE);
- this.outputView = new DataOutputViewStreamWrapper(outputStream);
+ /** Cleans up all previously added records. */
+ public void cleanup() throws IOException {
+ finishCurrentSegmentIfAny();
+ for (Segment segment : finishedSegments) {
+ if (segment.isOnDisk()) {
+ fileSystem.delete(segment.getPath(), false);
+ }
+ if (segment.isCached()) {
+ assert segmentPool != null;
Review Comment:
After thinking about this more, I find it more consistent with other code to
just remove the assert here. Note that the code will throw NullPointerException
if segmentPool == null, which is good enough for us to investigate this bug.
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,80 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing cached records. */
private final Path path;
- /** The count of the records in the file. */
+ /** The count of the records in the segment. */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing cached records. */
+ private long fsSize = -1L;
+
+ /** The memory segments containing cached records. */
+ private List<MemorySegment> cache;
+
+ Segment(Path path, int count, long fsSize) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
+ this.count = count;
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
- public Segment(Path path, int count, long size) {
- this.path = path;
+ Segment(Path path, int count, List<MemorySegment> cache) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
this.count = count;
- this.size = size;
+ this.cache = checkNotNull(cache);
+ }
+
+ void setCache(List<MemorySegment> cache) {
+ this.cache = checkNotNull(cache);
}
- public Path getPath() {
+ void setDiskInfo(long fsSize) {
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
+
+ boolean isOnDisk() {
+ return fsSize > 0;
+ }
+
+ boolean isCached() {
+ return cache != null;
+ }
+
+ Path getPath() {
return path;
}
- public int getCount() {
+ int getCount() {
return count;
}
- public long getSize() {
- return size;
+ long getFsSize() {
+ checkState(fsSize > 0);
+ return fsSize;
+ }
+
+ List<MemorySegment> getCache() {
+ return checkNotNull(cache);
Review Comment:
For `getXXX()` method, it is in general simpler and more intuitive to just
return the fact (e.g. null or empty list) instead of throwing exception.
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,80 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing cached records. */
private final Path path;
- /** The count of the records in the file. */
+ /** The count of the records in the segment. */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing cached records. */
+ private long fsSize = -1L;
+
+ /** The memory segments containing cached records. */
+ private List<MemorySegment> cache;
+
+ Segment(Path path, int count, long fsSize) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
+ this.count = count;
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
- public Segment(Path path, int count, long size) {
- this.path = path;
+ Segment(Path path, int count, List<MemorySegment> cache) {
+ this.path = checkNotNull(path);
+ checkArgument(count > 0);
this.count = count;
- this.size = size;
+ this.cache = checkNotNull(cache);
+ }
+
+ void setCache(List<MemorySegment> cache) {
+ this.cache = checkNotNull(cache);
}
- public Path getPath() {
+ void setDiskInfo(long fsSize) {
+ checkArgument(fsSize > 0);
+ this.fsSize = fsSize;
+ }
+
+ boolean isOnDisk() {
+ return fsSize > 0;
+ }
+
+ boolean isCached() {
+ return cache != null;
+ }
+
+ Path getPath() {
return path;
}
- public int getCount() {
+ int getCount() {
return count;
}
- public long getSize() {
- return size;
+ long getFsSize() {
+ checkState(fsSize > 0);
Review Comment:
Suppose the input datastream does not have any data, we should still be able
to snapshot/persist this data stream onto disk, and be able to reload this
datastream after restarting the Flink job, right?
Then we will need to support a segment with `fsSize = 0`.
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/Segment.java:
##########
@@ -18,38 +18,80 @@
package org.apache.flink.iteration.datacache.nonkeyed;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.MemorySegment;
-import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
-/** A segment represents a single file for the cache. */
-public class Segment implements Serializable {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+/** A segment contains the information about a cache unit. */
+@Internal
+public class Segment {
+
+ /** The path to the file containing cached records. */
private final Path path;
- /** The count of the records in the file. */
+ /** The count of the records in the segment. */
private final int count;
- /** The total length of file. */
- private final long size;
+ /** The total length of file containing cached records. */
Review Comment:
nits: the records in the file is not `cached`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]