lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890739744
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -59,87 +80,100 @@ public DataCacheWriter(
SupplierWithException<Path, IOException> pathGenerator,
List<Segment> priorFinishedSegments)
throws IOException {
- this.serializer = serializer;
+ this(serializer, fileSystem, pathGenerator, null,
priorFinishedSegments);
+ }
+
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ @Nullable MemorySegmentPool segmentPool,
+ List<Segment> priorFinishedSegments)
+ throws IOException {
this.fileSystem = fileSystem;
this.pathGenerator = pathGenerator;
-
- this.finishSegments = new ArrayList<>(priorFinishedSegments);
-
- this.currentSegment = new SegmentWriter(pathGenerator.get());
+ this.segmentPool = segmentPool;
+ this.serializer = serializer;
+ this.finishedSegments = new ArrayList<>(priorFinishedSegments);
+ this.currentSegmentWriter = createSegmentWriter();
}
public void addRecord(T record) throws IOException {
- currentSegment.addRecord(record);
- }
-
- public void finishCurrentSegment() throws IOException {
- finishCurrentSegment(true);
+ if (!currentSegmentWriter.addRecord(record)) {
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = new FileSegmentWriter<>(serializer,
pathGenerator.get());
+ Preconditions.checkState(currentSegmentWriter.addRecord(record));
+ }
}
+ /** Finishes adding records and closes resources occupied for adding
records. */
public List<Segment> finish() throws IOException {
- finishCurrentSegment(false);
- return finishSegments;
- }
+ if (currentSegmentWriter == null) {
+ return finishedSegments;
+ }
- public FileSystem getFileSystem() {
- return fileSystem;
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = null;
+ return finishedSegments;
}
- public List<Segment> getFinishSegments() {
- return finishSegments;
+ /**
+ * Flushes all added records to segments and returns a list of segments
containing all cached
+ * records.
+ */
+ public List<Segment> getSegments() throws IOException {
+ finishCurrentSegmentIfExists();
+ return finishedSegments;
}
- private void finishCurrentSegment(boolean newSegment) throws IOException {
- if (currentSegment != null) {
- currentSegment.finish().ifPresent(finishSegments::add);
- currentSegment = null;
+ private void finishCurrentSegmentIfExists() throws IOException {
+ if (currentSegmentWriter == null) {
+ return;
}
- if (newSegment) {
- currentSegment = new SegmentWriter(pathGenerator.get());
- }
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = createSegmentWriter();
}
- private class SegmentWriter {
-
- private final Path path;
-
- private final FSDataOutputStream outputStream;
-
- private final DataOutputView outputView;
-
- private int currentSegmentCount;
-
- public SegmentWriter(Path path) throws IOException {
- this.path = path;
- this.outputStream = fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE);
- this.outputView = new DataOutputViewStreamWrapper(outputStream);
+ /** Cleans up all previously added records. */
+ public void cleanup() throws IOException {
Review Comment:
Would it be more consistent with `State::clear()` to rename this method as
`clear()`?
##########
flink-ml-lib/src/main/java/org/apache/flink/ml/clustering/kmeans/KMeans.java:
##########
@@ -254,58 +272,150 @@ public Tuple3<Integer, DenseVector, Long>
map(Tuple2<Integer, DenseVector> value
DenseVector, DenseVector[], Tuple2<Integer,
DenseVector>>,
IterationListener<Tuple2<Integer, DenseVector>> {
private final DistanceMeasure distanceMeasure;
- private ListState<DenseVector> points;
- private ListState<DenseVector[]> centroids;
+ private ListState<DenseVector[]> centroidsState;
+ private DenseVector[] centroids;
Review Comment:
Does this improve performance by using `centroids`? If not, it seems simpler
to not adding this variable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]