wsry commented on a change in pull request #13924:
URL: https://github.com/apache/flink/pull/13924#discussion_r601053647
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
##########
@@ -61,104 +56,69 @@
/** Next data region to be read. */
private int nextRegionToRead;
+ /** Next file offset to be read. */
+ private long nextOffsetToRead;
+
/** Number of remaining buffers in the current data region read. */
private int currentRegionRemainingBuffers;
- /** Whether this partitioned file reader is closed. */
- private boolean isClosed;
-
- public PartitionedFileReader(PartitionedFile partitionedFile, int
targetSubpartition)
+ public PartitionedFileReader(
+ PartitionedFile partitionedFile,
+ int targetSubpartition,
+ FileChannel dataFileChannel,
+ FileChannel indexFileChannel)
throws IOException {
+ checkArgument(
+ dataFileChannel.isOpen() && indexFileChannel.isOpen(),
+ "Both data file channel and index file channel must be
opened.");
+
this.partitionedFile = checkNotNull(partitionedFile);
this.targetSubpartition = targetSubpartition;
+ this.dataFileChannel = checkNotNull(dataFileChannel);
+ this.indexFileChannel = checkNotNull(indexFileChannel);
this.indexEntryBuf =
ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
BufferReaderWriterUtil.configureByteBuffer(indexEntryBuf);
-
- this.dataFileChannel =
openFileChannel(partitionedFile.getDataFilePath());
- try {
- this.indexFileChannel =
openFileChannel(partitionedFile.getIndexFilePath());
- } catch (Throwable throwable) {
- IOUtils.closeQuietly(dataFileChannel);
- throw throwable;
- }
}
- private FileChannel openFileChannel(Path path) throws IOException {
- return FileChannel.open(path, StandardOpenOption.READ);
- }
-
- private boolean moveToNextReadableRegion() throws IOException {
- if (currentRegionRemainingBuffers > 0) {
- return true;
- }
-
- while (nextRegionToRead < partitionedFile.getNumRegions()) {
+ private void moveToNextReadableRegion() throws IOException {
+ while (currentRegionRemainingBuffers <= 0
+ && nextRegionToRead < partitionedFile.getNumRegions()) {
partitionedFile.getIndexEntry(
indexFileChannel, indexEntryBuf, nextRegionToRead,
targetSubpartition);
- long dataOffset = indexEntryBuf.getLong();
+ nextOffsetToRead = indexEntryBuf.getLong();
currentRegionRemainingBuffers = indexEntryBuf.getInt();
++nextRegionToRead;
-
- if (currentRegionRemainingBuffers > 0) {
- dataFileChannel.position(dataOffset);
- return true;
- }
}
-
- return false;
}
/**
- * Reads a buffer from the {@link PartitionedFile} and moves the read
position forward.
+ * Reads a buffer from the current region of the target {@link
PartitionedFile} and moves the
+ * read position forward.
*
* <p>Note: The caller is responsible for recycling the target buffer if
any exception occurs.
*/
@Nullable
- public Buffer readBuffer(MemorySegment target, BufferRecycler recycler)
throws IOException {
- checkState(!isClosed, "File reader is already closed.");
-
- if (moveToNextReadableRegion()) {
- --currentRegionRemainingBuffers;
- return readFromByteChannel(dataFileChannel, headerBuf, target,
recycler);
+ public Buffer readCurrentRegion(MemorySegment target, BufferRecycler
recycler)
+ throws IOException {
+ if (currentRegionRemainingBuffers == 0) {
Review comment:
As the method name indicates, this method only read data from current
region. I will make the methods package private.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]