lindong28 commented on code in PR #97:
URL: https://github.com/apache/flink-ml/pull/97#discussion_r890665195
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,162 @@
package org.apache.flink.iteration.datacache.nonkeyed;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
/** Records the data received and replayed them on required. */
public class DataCacheWriter<T> {
+ /** A soft limit on the max allowed size of a single segment. */
+ static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+ /** The tool to serialize received records into bytes. */
private final TypeSerializer<T> serializer;
+ /** The file system that contains the cache files. */
private final FileSystem fileSystem;
+ /** A generator to generate paths of cache files. */
private final SupplierWithException<Path, IOException> pathGenerator;
- private final List<Segment> finishSegments;
+ /** An optional pool that provide memory segments to hold cached records
in memory. */
+ @Nullable private final MemorySegmentPool segmentPool;
+
+ /** The segments that contain previously added records. */
+ private final List<Segment> finishedSegments;
- private SegmentWriter currentSegment;
+ /** The current writer for new records. */
+ @Nullable private SegmentWriter<T> currentSegmentWriter;
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator)
throws IOException {
- this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+ this(serializer, fileSystem, pathGenerator, null,
Collections.emptyList());
}
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator,
- List<Segment> priorFinishedSegments)
+ MemorySegmentPool segmentPool)
throws IOException {
- this.serializer = serializer;
- this.fileSystem = fileSystem;
- this.pathGenerator = pathGenerator;
-
- this.finishSegments = new ArrayList<>(priorFinishedSegments);
+ this(serializer, fileSystem, pathGenerator, segmentPool,
Collections.emptyList());
+ }
- this.currentSegment = new SegmentWriter(pathGenerator.get());
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this(serializer, fileSystem, pathGenerator, null, finishedSegments);
}
- public void addRecord(T record) throws IOException {
- currentSegment.addRecord(record);
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ @Nullable MemorySegmentPool segmentPool,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this.fileSystem = fileSystem;
+ this.pathGenerator = pathGenerator;
+ this.segmentPool = segmentPool;
+ this.serializer = serializer;
+ this.finishedSegments = new ArrayList<>();
+ this.finishedSegments.addAll(finishedSegments);
Review Comment:
nits: would it be simpler to use `this.finishedSegments = new
ArrayList<>(priorFinishedSegments)`, similar to the approach before this PR?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentReader.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+/** A class that reads data cached in memory. */
+@Internal
+class MemorySegmentReader<T> implements SegmentReader<T> {
+
+ /** The tool to deserialize bytes into records. */
+ private final TypeSerializer<T> serializer;
+
+ /** The wrapper view of the input stream of memory segments to be used in
TypeSerializer API. */
+ private final DataInputView inputView;
+
+ /** The total number of records contained in the segments. */
+ private final int totalCount;
+
+ /** The number of records that have been read so far. */
+ private int count;
+
+ MemorySegmentReader(TypeSerializer<T> serializer, Segment segment, int
startOffset)
+ throws IOException {
+ ManagedMemoryInputStream inputStream = new
ManagedMemoryInputStream(segment.getCache());
+ this.inputView = new DataInputViewStreamWrapper(inputStream);
+ this.serializer = serializer;
+ this.totalCount = segment.getCount();
+ this.count = 0;
+
+ for (int ignored = 0; ignored < startOffset; ignored++) {
Review Comment:
nits: could we replace `ignored` with `i` for consistency with similar code
in the `FileSegmentReader` constructor?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/FileSegmentWriter.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/** A class that writes cache data to a target file in given file system. */
+@Internal
+class FileSegmentWriter<T> implements SegmentWriter<T> {
+
+ /** The tool to serialize received records into bytes. */
+ private final TypeSerializer<T> serializer;
+
+ /** The path to the target file. */
+ private final Path path;
+
+ /** The output stream that writes to the target file. */
+ private final FSDataOutputStream outputStream;
+
+ /** A buffer that wraps the output stream to optimize performance. */
+ private final BufferedOutputStream bufferedOutputStream;
Review Comment:
Could we remove this class variable, similar to how we handle
`bufferedInputStream` in `FileSegmentReader`?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentReader.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+/** A class that reads data cached in memory. */
+@Internal
+class MemorySegmentReader<T> implements SegmentReader<T> {
+
+ /** The tool to deserialize bytes into records. */
+ private final TypeSerializer<T> serializer;
+
+ /** The wrapper view of the input stream of memory segments to be used in
TypeSerializer API. */
+ private final DataInputView inputView;
+
+ /** The total number of records contained in the segments. */
+ private final int totalCount;
+
+ /** The number of records that have been read so far. */
+ private int count;
+
+ MemorySegmentReader(TypeSerializer<T> serializer, Segment segment, int
startOffset)
+ throws IOException {
+ ManagedMemoryInputStream inputStream = new
ManagedMemoryInputStream(segment.getCache());
+ this.inputView = new DataInputViewStreamWrapper(inputStream);
+ this.serializer = serializer;
+ this.totalCount = segment.getCount();
+ this.count = 0;
+
+ for (int ignored = 0; ignored < startOffset; ignored++) {
+ next();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return count < totalCount;
+ }
+
+ @Override
+ public T next() throws IOException {
+ T ret = serializer.deserialize(inputView);
Review Comment:
nits: could we replace `ret` with `value` for consistency with
`FileSegmentReader::next()`?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,162 @@
package org.apache.flink.iteration.datacache.nonkeyed;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
/** Records the data received and replayed them on required. */
public class DataCacheWriter<T> {
+ /** A soft limit on the max allowed size of a single segment. */
+ static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+ /** The tool to serialize received records into bytes. */
private final TypeSerializer<T> serializer;
+ /** The file system that contains the cache files. */
private final FileSystem fileSystem;
+ /** A generator to generate paths of cache files. */
private final SupplierWithException<Path, IOException> pathGenerator;
- private final List<Segment> finishSegments;
+ /** An optional pool that provide memory segments to hold cached records
in memory. */
+ @Nullable private final MemorySegmentPool segmentPool;
+
+ /** The segments that contain previously added records. */
+ private final List<Segment> finishedSegments;
- private SegmentWriter currentSegment;
+ /** The current writer for new records. */
+ @Nullable private SegmentWriter<T> currentSegmentWriter;
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator)
throws IOException {
- this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+ this(serializer, fileSystem, pathGenerator, null,
Collections.emptyList());
}
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator,
- List<Segment> priorFinishedSegments)
+ MemorySegmentPool segmentPool)
throws IOException {
- this.serializer = serializer;
- this.fileSystem = fileSystem;
- this.pathGenerator = pathGenerator;
-
- this.finishSegments = new ArrayList<>(priorFinishedSegments);
+ this(serializer, fileSystem, pathGenerator, segmentPool,
Collections.emptyList());
+ }
- this.currentSegment = new SegmentWriter(pathGenerator.get());
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this(serializer, fileSystem, pathGenerator, null, finishedSegments);
}
- public void addRecord(T record) throws IOException {
- currentSegment.addRecord(record);
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ @Nullable MemorySegmentPool segmentPool,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this.fileSystem = fileSystem;
+ this.pathGenerator = pathGenerator;
+ this.segmentPool = segmentPool;
+ this.serializer = serializer;
+ this.finishedSegments = new ArrayList<>();
+ this.finishedSegments.addAll(finishedSegments);
+ this.currentSegmentWriter = createSegmentWriter();
}
- public void finishCurrentSegment() throws IOException {
- finishCurrentSegment(true);
+ public void addRecord(T record) throws IOException {
+ if (!currentSegmentWriter.addRecord(record)) {
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = new FileSegmentWriter<>(serializer,
pathGenerator.get());
+ Preconditions.checkState(currentSegmentWriter.addRecord(record));
+ }
}
+ /** Finishes adding records and closes resources occupied for adding
records. */
public List<Segment> finish() throws IOException {
- finishCurrentSegment(false);
- return finishSegments;
- }
+ if (currentSegmentWriter == null) {
+ return finishedSegments;
+ }
- public FileSystem getFileSystem() {
- return fileSystem;
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = null;
+ return finishedSegments;
}
- public List<Segment> getFinishSegments() {
- return finishSegments;
+ /**
+ * Flushes all added records to segments and returns a list of segments
containing all cached
+ * records.
+ */
+ public List<Segment> getSegments() throws IOException {
+ finishCurrentSegmentIfAny();
+ return finishedSegments;
}
- private void finishCurrentSegment(boolean newSegment) throws IOException {
- if (currentSegment != null) {
- currentSegment.finish().ifPresent(finishSegments::add);
- currentSegment = null;
+ private void finishCurrentSegmentIfAny() throws IOException {
Review Comment:
nits: it is not very clear what `IfAny` means. And it is very rare to use
`IfAny` in method names. How about renaming it as either
`finishCurrentSegment()` or `finishCurrentSegmentIfExists()`?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/FileSegmentWriter.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+/** A class that writes cache data to a target file in given file system. */
+@Internal
+class FileSegmentWriter<T> implements SegmentWriter<T> {
+
+ /** The tool to serialize received records into bytes. */
+ private final TypeSerializer<T> serializer;
+
+ /** The path to the target file. */
+ private final Path path;
+
+ /** The output stream that writes to the target file. */
+ private final FSDataOutputStream outputStream;
+
+ /** A buffer that wraps the output stream to optimize performance. */
+ private final BufferedOutputStream bufferedOutputStream;
+
+ /** The wrapper view of output stream to be used with TypeSerializer API.
*/
+ private final DataOutputView outputView;
+
+ /** The number of records added so far. */
+ private int count;
+
+ FileSegmentWriter(TypeSerializer<T> serializer, Path path) throws
IOException {
+ this.serializer = serializer;
+ this.path = path;
+ this.outputStream = path.getFileSystem().create(path,
FileSystem.WriteMode.NO_OVERWRITE);
+ this.bufferedOutputStream = new BufferedOutputStream(outputStream);
+ this.outputView = new
DataOutputViewStreamWrapper(bufferedOutputStream);
+ }
+
+ @Override
+ public boolean addRecord(T record) throws IOException {
+ if (outputStream.getPos() >= DataCacheWriter.MAX_SEGMENT_SIZE) {
+ return false;
+ }
+ serializer.serialize(record, outputView);
+ count++;
+ return true;
+ }
+
+ @Override
+ public Optional<Segment> finish() throws IOException {
+ bufferedOutputStream.flush();
Review Comment:
This line can be removed because `outputStream.flush()` will recursively
call `bufferedOutputStream.flush()`.
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentWriter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** A class that writes cache data to memory segments. */
+@Internal
+class MemorySegmentWriter<T> implements SegmentWriter<T> {
+
+ /** The tool to serialize received records into bytes. */
+ private final TypeSerializer<T> serializer;
+
+ /** The pre-allocated path to hold cached records into file system. */
+ private final Path path;
+
+ /** The pool to allocate memory segments from. */
+ private final MemorySegmentPool segmentPool;
+
+ /** The output stream to write serialized content to memory segments. */
+ private final ManagedMemoryOutputStream outputStream;
+
+ /** The wrapper view of output stream to be used with TypeSerializer API.
*/
+ private final DataOutputView outputView;
+
+ /** The number of records added so far. */
+ private int count;
+
+ MemorySegmentWriter(
+ TypeSerializer<T> serializer,
+ Path path,
+ MemorySegmentPool segmentPool,
+ long expectedSize)
+ throws MemoryAllocationException {
+ this.serializer = serializer;
+ this.path = path;
+ this.segmentPool = segmentPool;
+ this.outputStream = new ManagedMemoryOutputStream(segmentPool,
expectedSize);
+ this.outputView = new DataOutputViewStreamWrapper(outputStream);
+ this.count = 0;
+ }
+
+ @Override
+ public boolean addRecord(T record) throws IOException {
+ if (outputStream.getPos() >= DataCacheWriter.MAX_SEGMENT_SIZE) {
+ return false;
+ }
+ try {
+ serializer.serialize(record, outputView);
+ count++;
+ return true;
+ } catch (IOException e) {
+ if (e.getCause() instanceof MemoryAllocationException) {
+ return false;
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Optional<Segment> finish() throws IOException {
+ if (count > 0) {
+ return Optional.of(new Segment(path, count,
outputStream.getSegments()));
+ } else {
+ segmentPool.returnAll(outputStream.getSegments());
+ return Optional.empty();
+ }
+ }
+
+ /** An output stream subclass that accepts bytes and writes them to memory
segments. */
+ private static class ManagedMemoryOutputStream extends OutputStream {
+
+ /** The pool to allocate memory segments from. */
+ private final MemorySegmentPool segmentPool;
+
+ /** The number of bytes in a memory segment. */
+ private final int pageSize;
+
+ /** The memory segments containing written bytes. */
+ private final List<MemorySegment> segments = new ArrayList<>();
+
+ /** The index of the segment that currently accepts written bytes. */
+ private int segmentIndex;
+
+ /** The number of bytes in the current segment that have been written.
*/
+ private int segmentOffset;
+
+ /** The number of bytes that have been written so far. */
+ private long globalOffset;
+
+ /** The number of bytes that have been allocated so far. */
+ private long allocatedBytes;
+
+ public ManagedMemoryOutputStream(MemorySegmentPool segmentPool, long
expectedSize)
+ throws MemoryAllocationException {
+ this.segmentPool = segmentPool;
+ this.pageSize = segmentPool.pageSize();
+
+ Preconditions.checkArgument(expectedSize >= 0);
Review Comment:
nits: It seems unnecessary to check this and use Math.max(..) below. Since
it does not cause any harm to have expectedSize < 0, would it be simpler to
remove this line?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentWriter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** A class that writes cache data to memory segments. */
+@Internal
+class MemorySegmentWriter<T> implements SegmentWriter<T> {
+
+ /** The tool to serialize received records into bytes. */
+ private final TypeSerializer<T> serializer;
+
+ /** The pre-allocated path to hold cached records into file system. */
+ private final Path path;
+
+ /** The pool to allocate memory segments from. */
+ private final MemorySegmentPool segmentPool;
+
+ /** The output stream to write serialized content to memory segments. */
+ private final ManagedMemoryOutputStream outputStream;
+
+ /** The wrapper view of output stream to be used with TypeSerializer API.
*/
+ private final DataOutputView outputView;
+
+ /** The number of records added so far. */
+ private int count;
+
+ MemorySegmentWriter(
+ TypeSerializer<T> serializer,
+ Path path,
+ MemorySegmentPool segmentPool,
+ long expectedSize)
+ throws MemoryAllocationException {
+ this.serializer = serializer;
+ this.path = path;
+ this.segmentPool = segmentPool;
+ this.outputStream = new ManagedMemoryOutputStream(segmentPool,
expectedSize);
+ this.outputView = new DataOutputViewStreamWrapper(outputStream);
+ this.count = 0;
+ }
+
+ @Override
+ public boolean addRecord(T record) throws IOException {
+ if (outputStream.getPos() >= DataCacheWriter.MAX_SEGMENT_SIZE) {
+ return false;
+ }
+ try {
+ serializer.serialize(record, outputView);
+ count++;
+ return true;
+ } catch (IOException e) {
+ if (e.getCause() instanceof MemoryAllocationException) {
+ return false;
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Optional<Segment> finish() throws IOException {
+ if (count > 0) {
+ return Optional.of(new Segment(path, count,
outputStream.getSegments()));
+ } else {
+ segmentPool.returnAll(outputStream.getSegments());
+ return Optional.empty();
+ }
+ }
+
+ /** An output stream subclass that accepts bytes and writes them to memory
segments. */
+ private static class ManagedMemoryOutputStream extends OutputStream {
+
+ /** The pool to allocate memory segments from. */
+ private final MemorySegmentPool segmentPool;
+
+ /** The number of bytes in a memory segment. */
+ private final int pageSize;
+
+ /** The memory segments containing written bytes. */
+ private final List<MemorySegment> segments = new ArrayList<>();
+
+ /** The index of the segment that currently accepts written bytes. */
+ private int segmentIndex;
+
+ /** The number of bytes in the current segment that have been written.
*/
+ private int segmentOffset;
+
+ /** The number of bytes that have been written so far. */
+ private long globalOffset;
+
+ /** The number of bytes that have been allocated so far. */
+ private long allocatedBytes;
+
+ public ManagedMemoryOutputStream(MemorySegmentPool segmentPool, long
expectedSize)
+ throws MemoryAllocationException {
+ this.segmentPool = segmentPool;
+ this.pageSize = segmentPool.pageSize();
+
+ Preconditions.checkArgument(expectedSize >= 0);
+ ensureCapacity(Math.max(expectedSize, 1L));
+ }
+
+ public long getPos() {
+ return globalOffset;
+ }
+
+ public List<MemorySegment> getSegments() {
+ return segments;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[] {(byte) b}, 0, 1);
+ }
+
+ @Override
+ public void write(@Nullable byte[] b, int off, int len) throws
IOException {
+ try {
+ ensureCapacity(globalOffset + len);
+ } catch (MemoryAllocationException e) {
+ throw new IOException(e);
+ }
+
+ while (len > 0) {
Review Comment:
It seems simpler to use the following code, which does not need to
explicitly call `break`.
```
while (len > 0) {
int currentLen = Math.min(len, pageSize - segmentOffset);
segments.get(segmentIndex).put(segmentOffset, b, off, currentLen);
segmentOffset += currentLen;
globalOffset += currentLen;
if (segmentOffset >= pageSize) {
segmentIndex++;
segmentOffset = 0;
}
off += currentLen;
len -= currentLen;
}
```
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentReader.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+/** A class that reads data cached in memory. */
+@Internal
+class MemorySegmentReader<T> implements SegmentReader<T> {
+
+ /** The tool to deserialize bytes into records. */
+ private final TypeSerializer<T> serializer;
+
+ /** The wrapper view of the input stream of memory segments to be used in
TypeSerializer API. */
+ private final DataInputView inputView;
+
+ /** The total number of records contained in the segments. */
+ private final int totalCount;
+
+ /** The number of records that have been read so far. */
+ private int count;
+
+ MemorySegmentReader(TypeSerializer<T> serializer, Segment segment, int
startOffset)
+ throws IOException {
+ ManagedMemoryInputStream inputStream = new
ManagedMemoryInputStream(segment.getCache());
+ this.inputView = new DataInputViewStreamWrapper(inputStream);
+ this.serializer = serializer;
+ this.totalCount = segment.getCount();
+ this.count = 0;
+
+ for (int ignored = 0; ignored < startOffset; ignored++) {
+ next();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return count < totalCount;
+ }
+
+ @Override
+ public T next() throws IOException {
+ T ret = serializer.deserialize(inputView);
+ count++;
+ return ret;
+ }
+
+ @Override
+ public void close() {}
+
+ /** An input stream subclass that reads bytes from memory segments. */
+ private static class ManagedMemoryInputStream extends InputStream {
+
+ /** The memory segments to read bytes from. */
+ private final List<MemorySegment> segments;
+
+ /** The index of the segment that is currently being read. */
+ private int segmentIndex;
+
+ /** The number of bytes that have been read from current segment so
far. */
+ private int segmentOffset;
+
+ public ManagedMemoryInputStream(List<MemorySegment> segments) {
+ this.segments = segments;
+ this.segmentIndex = 0;
+ this.segmentOffset = 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int ret = segments.get(segmentIndex).get(segmentOffset) & 0xff;
+ segmentOffset += 1;
+ if (segmentOffset >= segments.get(segmentIndex).size()) {
+ segmentIndex++;
+ segmentOffset = 0;
+ }
+ return ret;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int readLen = 0;
+
+ while (len > 0 && segmentIndex < segments.size()) {
Review Comment:
nits: the following code is probably simpler since it does not need to
explicitly write `break`.
```
while (len > 0) {
int currentLen = Math.min(segments.get(segmentIndex).size() -
segmentOffset, len);
segments.get(segmentIndex).get(segmentOffset, b, off, currentLen);
segmentOffset += currentLen;
if (segmentOffset >= segments.get(segmentIndex).size()) {
segmentIndex++;
segmentOffset = 0;
}
readLen += currentLen;
off += currentLen;
len -= currentLen;
}
```
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.java:
##########
@@ -167,26 +183,69 @@ public static DataCacheSnapshot recover(
if (isDistributedFS) {
segments = deserializeSegments(dis);
} else {
- int totalRecords = dis.readInt();
- long totalSize = dis.readLong();
-
- Path path = pathGenerator.get();
- try (FSDataOutputStream outputStream =
- fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE)) {
-
- BoundedInputStream inputStream =
- new BoundedInputStream(checkpointInputStream,
totalSize);
- inputStream.setPropagateClose(false);
- IOUtils.copyBytes(inputStream, outputStream, false);
- inputStream.close();
+ int segmentNum = dis.readInt();
+ segments = new ArrayList<>(segmentNum);
+ for (int i = 0; i < segmentNum; i++) {
+ int count = dis.readInt();
+ long fsSize = dis.readLong();
+ Path path = pathGenerator.get();
+ try (FSDataOutputStream outputStream =
+ fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE)) {
+
+ BoundedInputStream boundedInputStream =
+ new BoundedInputStream(checkpointInputStream,
fsSize);
+ boundedInputStream.setPropagateClose(false);
+ IOUtils.copyBytes(boundedInputStream, outputStream,
false);
+ boundedInputStream.close();
+ }
+ segments.add(new Segment(path, count, fsSize));
}
- segments = Collections.singletonList(new Segment(path,
totalRecords, totalSize));
}
return new DataCacheSnapshot(fileSystem, readerPosition, segments);
}
}
+ /**
+ * Makes an attempt to cache the segments in memory.
+ *
+ * <p>The attempt is made at segment granularity, which means there might
be only part of the
+ * segments are cached.
+ *
+ * <p>This method does not throw exception if there is not enough memory
space for caching a
+ * segment.
+ */
+ public <T> void tryReadSegmentsToMemory(
+ TypeSerializer<T> serializer, MemorySegmentPool segmentPool)
throws IOException {
+ boolean cacheSuccess;
+ for (Segment segment : segments) {
+ if (segment.getCache() != null) {
+ continue;
+ }
+
+ SegmentReader<T> reader = new FileSegmentReader<>(serializer,
segment, 0);
+ SegmentWriter<T> writer;
+ try {
+ writer =
+ new MemorySegmentWriter<>(
+ serializer, segment.getPath(), segmentPool,
segment.getFsSize());
+ } catch (MemoryAllocationException e) {
+ continue;
+ }
+
+ cacheSuccess = true;
+ while (cacheSuccess && reader.hasNext()) {
+ if (!writer.addRecord(reader.next())) {
+ writer.finish().ifPresent(x ->
segmentPool.returnAll(x.getCache()));
+ cacheSuccess = false;
+ }
+ }
+ if (cacheSuccess) {
+ writer.finish().ifPresent(x -> segment.setCache(x.getCache()));
Review Comment:
If cacheSuccess == true, then `writer.finish()` must return a non-empty
segment. Otherwise there is bug.
How about we just do `segment.setCache(writer.finish().get().getCache())`
here to simplify the code and detect bug?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/DataCacheWriter.java:
##########
@@ -19,127 +19,162 @@
package org.apache.flink.iteration.datacache.nonkeyed;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
/** Records the data received and replayed them on required. */
public class DataCacheWriter<T> {
+ /** A soft limit on the max allowed size of a single segment. */
+ static final long MAX_SEGMENT_SIZE = 1L << 30; // 1GB
+
+ /** The tool to serialize received records into bytes. */
private final TypeSerializer<T> serializer;
+ /** The file system that contains the cache files. */
private final FileSystem fileSystem;
+ /** A generator to generate paths of cache files. */
private final SupplierWithException<Path, IOException> pathGenerator;
- private final List<Segment> finishSegments;
+ /** An optional pool that provide memory segments to hold cached records
in memory. */
+ @Nullable private final MemorySegmentPool segmentPool;
+
+ /** The segments that contain previously added records. */
+ private final List<Segment> finishedSegments;
- private SegmentWriter currentSegment;
+ /** The current writer for new records. */
+ @Nullable private SegmentWriter<T> currentSegmentWriter;
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator)
throws IOException {
- this(serializer, fileSystem, pathGenerator, Collections.emptyList());
+ this(serializer, fileSystem, pathGenerator, null,
Collections.emptyList());
}
public DataCacheWriter(
TypeSerializer<T> serializer,
FileSystem fileSystem,
SupplierWithException<Path, IOException> pathGenerator,
- List<Segment> priorFinishedSegments)
+ MemorySegmentPool segmentPool)
throws IOException {
- this.serializer = serializer;
- this.fileSystem = fileSystem;
- this.pathGenerator = pathGenerator;
-
- this.finishSegments = new ArrayList<>(priorFinishedSegments);
+ this(serializer, fileSystem, pathGenerator, segmentPool,
Collections.emptyList());
+ }
- this.currentSegment = new SegmentWriter(pathGenerator.get());
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this(serializer, fileSystem, pathGenerator, null, finishedSegments);
}
- public void addRecord(T record) throws IOException {
- currentSegment.addRecord(record);
+ public DataCacheWriter(
+ TypeSerializer<T> serializer,
+ FileSystem fileSystem,
+ SupplierWithException<Path, IOException> pathGenerator,
+ @Nullable MemorySegmentPool segmentPool,
+ List<Segment> finishedSegments)
+ throws IOException {
+ this.fileSystem = fileSystem;
+ this.pathGenerator = pathGenerator;
+ this.segmentPool = segmentPool;
+ this.serializer = serializer;
+ this.finishedSegments = new ArrayList<>();
+ this.finishedSegments.addAll(finishedSegments);
+ this.currentSegmentWriter = createSegmentWriter();
}
- public void finishCurrentSegment() throws IOException {
- finishCurrentSegment(true);
+ public void addRecord(T record) throws IOException {
+ if (!currentSegmentWriter.addRecord(record)) {
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = new FileSegmentWriter<>(serializer,
pathGenerator.get());
+ Preconditions.checkState(currentSegmentWriter.addRecord(record));
+ }
}
+ /** Finishes adding records and closes resources occupied for adding
records. */
public List<Segment> finish() throws IOException {
- finishCurrentSegment(false);
- return finishSegments;
- }
+ if (currentSegmentWriter == null) {
+ return finishedSegments;
+ }
- public FileSystem getFileSystem() {
- return fileSystem;
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = null;
+ return finishedSegments;
}
- public List<Segment> getFinishSegments() {
- return finishSegments;
+ /**
+ * Flushes all added records to segments and returns a list of segments
containing all cached
+ * records.
+ */
+ public List<Segment> getSegments() throws IOException {
+ finishCurrentSegmentIfAny();
+ return finishedSegments;
}
- private void finishCurrentSegment(boolean newSegment) throws IOException {
- if (currentSegment != null) {
- currentSegment.finish().ifPresent(finishSegments::add);
- currentSegment = null;
+ private void finishCurrentSegmentIfAny() throws IOException {
+ if (currentSegmentWriter == null) {
+ return;
}
- if (newSegment) {
- currentSegment = new SegmentWriter(pathGenerator.get());
- }
+ currentSegmentWriter.finish().ifPresent(finishedSegments::add);
+ currentSegmentWriter = createSegmentWriter();
}
- private class SegmentWriter {
-
- private final Path path;
-
- private final FSDataOutputStream outputStream;
-
- private final DataOutputView outputView;
-
- private int currentSegmentCount;
-
- public SegmentWriter(Path path) throws IOException {
- this.path = path;
- this.outputStream = fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE);
- this.outputView = new DataOutputViewStreamWrapper(outputStream);
+ /** Cleans up all previously added records. */
+ public void cleanup() throws IOException {
+ finishCurrentSegmentIfAny();
+ for (Segment segment : finishedSegments) {
+ if (segment.getFsSize() > 0) {
+ fileSystem.delete(segment.getPath(), false);
+ }
+ if (segment.getCache() != null) {
+ segmentPool.returnAll(segment.getCache());
+ }
}
+ finishedSegments.clear();
+ }
- public void addRecord(T record) throws IOException {
- serializer.serialize(record, outputView);
- currentSegmentCount += 1;
- }
+ /** Write the segments in this writer to files on disk. */
+ public void writeSegmentsToFiles() throws IOException {
+ finishCurrentSegmentIfAny();
+ for (Segment segment : finishedSegments) {
+ if (segment.getFsSize() > 0) {
+ continue;
+ }
- public Optional<Segment> finish() throws IOException {
- this.outputStream.flush();
- long size = outputStream.getPos();
- this.outputStream.close();
-
- if (currentSegmentCount > 0) {
- return Optional.of(new Segment(path, currentSegmentCount,
size));
- } else {
- // If there are no records, we tend to directly delete this
file
- fileSystem.delete(path, false);
- return Optional.empty();
+ SegmentReader<T> reader = new MemorySegmentReader<>(serializer,
segment, 0);
+ SegmentWriter<T> writer = new FileSegmentWriter<>(serializer,
segment.getPath());
+ while (reader.hasNext()) {
+ writer.addRecord(reader.next());
}
+ writer.finish().ifPresent(x -> segment.setFsSize(x.getFsSize()));
Review Comment:
The writer.finish() is guaranteed to return a non-empty segment here, right?
How about we do `segment.setFsSize(writer.finish().get().getFsSize())` here
to simplify the code and detect bug?
##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/MemorySegmentWriter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.datacache.nonkeyed;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/** A class that writes cache data to memory segments. */
+@Internal
+class MemorySegmentWriter<T> implements SegmentWriter<T> {
+
+ /** The tool to serialize received records into bytes. */
+ private final TypeSerializer<T> serializer;
+
+ /** The pre-allocated path to hold cached records into file system. */
+ private final Path path;
+
+ /** The pool to allocate memory segments from. */
+ private final MemorySegmentPool segmentPool;
+
+ /** The output stream to write serialized content to memory segments. */
+ private final ManagedMemoryOutputStream outputStream;
+
+ /** The wrapper view of output stream to be used with TypeSerializer API.
*/
+ private final DataOutputView outputView;
+
+ /** The number of records added so far. */
+ private int count;
+
+ MemorySegmentWriter(
+ TypeSerializer<T> serializer,
+ Path path,
+ MemorySegmentPool segmentPool,
+ long expectedSize)
+ throws MemoryAllocationException {
+ this.serializer = serializer;
+ this.path = path;
+ this.segmentPool = segmentPool;
+ this.outputStream = new ManagedMemoryOutputStream(segmentPool,
expectedSize);
+ this.outputView = new DataOutputViewStreamWrapper(outputStream);
+ this.count = 0;
+ }
+
+ @Override
+ public boolean addRecord(T record) throws IOException {
+ if (outputStream.getPos() >= DataCacheWriter.MAX_SEGMENT_SIZE) {
+ return false;
+ }
+ try {
+ serializer.serialize(record, outputView);
+ count++;
+ return true;
+ } catch (IOException e) {
+ if (e.getCause() instanceof MemoryAllocationException) {
+ return false;
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Optional<Segment> finish() throws IOException {
+ if (count > 0) {
+ return Optional.of(new Segment(path, count,
outputStream.getSegments()));
+ } else {
+ segmentPool.returnAll(outputStream.getSegments());
+ return Optional.empty();
+ }
+ }
+
+ /** An output stream subclass that accepts bytes and writes them to memory
segments. */
+ private static class ManagedMemoryOutputStream extends OutputStream {
+
+ /** The pool to allocate memory segments from. */
+ private final MemorySegmentPool segmentPool;
+
+ /** The number of bytes in a memory segment. */
+ private final int pageSize;
+
+ /** The memory segments containing written bytes. */
+ private final List<MemorySegment> segments = new ArrayList<>();
+
+ /** The index of the segment that currently accepts written bytes. */
+ private int segmentIndex;
+
+ /** The number of bytes in the current segment that have been written.
*/
+ private int segmentOffset;
+
+ /** The number of bytes that have been written so far. */
+ private long globalOffset;
+
+ /** The number of bytes that have been allocated so far. */
+ private long allocatedBytes;
+
+ public ManagedMemoryOutputStream(MemorySegmentPool segmentPool, long
expectedSize)
+ throws MemoryAllocationException {
+ this.segmentPool = segmentPool;
+ this.pageSize = segmentPool.pageSize();
+
+ Preconditions.checkArgument(expectedSize >= 0);
+ ensureCapacity(Math.max(expectedSize, 1L));
+ }
+
+ public long getPos() {
+ return globalOffset;
+ }
+
+ public List<MemorySegment> getSegments() {
+ return segments;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[] {(byte) b}, 0, 1);
+ }
+
+ @Override
+ public void write(@Nullable byte[] b, int off, int len) throws
IOException {
+ try {
+ ensureCapacity(globalOffset + len);
+ } catch (MemoryAllocationException e) {
+ throw new IOException(e);
Review Comment:
Would it be simpler to make `MemoryAllocationException` subclass of
RuntimeException and throw/catch `MemoryAllocationException` directly, instead
of wrapping it inside `IOException`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]