StephanEwen commented on a change in pull request #13523:
URL: https://github.com/apache/flink/pull/13523#discussion_r516206985



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -371,6 +368,22 @@ ByteBuf write(ByteBufAllocator allocator) throws 
IOException {
                        }
                }
 
+               private ByteBuf fillHeader(ByteBufAllocator allocator) {
+                       // in order to forward the buffer to netty, it needs an 
allocator set
+                       buffer.setAllocator(allocator);

Review comment:
       It is a bit confusing that this happens in a method called 
`fillHeader()`. I would suggest to leave this out of this method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -491,6 +505,7 @@ static ErrorResponse readFrom(ByteBuf buffer) throws 
Exception {
        static class PartitionRequest extends NettyMessage {
 
                private static final byte ID = 2;
+               private static final int LENGTH = 20 + 40 + 4 + 16 + 4;

Review comment:
       NIT: This is an unrelated change.
   
   It also does not necessarily become easier this way. Before, the length 
computation was inline, and one could see at a glance where the numbers came 
from (lengths, IDs, etc.) because the contents was written in the lines after. 
Now, this sum of numbers stands isolated and one does not see easily where the 
number 4, 20, 16, etc. come from and what they mean.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView 
{

Review comment:
       Can we rename this to `BoundedBlockingSubpartitionDirectTransferReader`? 
Then the naming scheme is similar to `BoundedBlockingSubpartitionReader`.
   
   It is a bit confusing when some two different implementation on the same 
level use different naming patterns.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper for {@link 
org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} instance.

Review comment:
       Would be good to explain a bit more why this class is like it is, and 
how it works:
   
     - This class implements `Buffer` so we can move it can be used like 
another `Buffer`, although it behaves "read-only" style, so many methods throw 
UnsupportedOperationException.
   
     - This extends from Netty's `DefaultFileRegion` so we don't need to do any 
special handling. Netty will internally treat this differently than the 
`Buffer` that implements `ByteBuf`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -552,6 +556,7 @@ public String toString() {
        static class TaskEventRequest extends NettyMessage {
 
                private static final byte ID = 3;
+               private static final int LENGTH = 4 + 20 + 40 + 16;

Review comment:
       See comment above.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -691,23 +672,12 @@ static CloseRequest readFrom(@SuppressWarnings("unused") 
ByteBuf buffer) throws
                }
 
                @Override
-               ByteBuf write(ByteBufAllocator allocator) throws IOException {
-                       ByteBuf result = null;
-
-                       try {
-                               result = allocateBuffer(allocator, ID, 4 + 16);
-                               result.writeInt(credit);
-                               receiverId.writeTo(result);
-
-                               return result;
-                       }
-                       catch (Throwable t) {
-                               if (result != null) {
-                                       result.release();
-                               }
-
-                               throw new IOException(t);
-                       }
+               void write(ChannelOutboundInvoker out, ChannelPromise promise, 
ByteBufAllocator allocator) throws IOException {
+                       Consumer<ByteBuf> consumer = (bb) -> {

Review comment:
       Do you think we should avoid introducing another virtual/interface 
method call here, and another object instantiation for the lambda argument 
capture, or is that not important at this point?
   
   Maybe for the most common messages that are on the performance path 
(BufferResponse, AddCredit). BufferResponse is already avoiding this, maybe 
AddCredit can as well.
   
   If we want to avoid it, we could also go with this pattern:
   
   ```java
   final ByteBuf buffer = allocateBuffer(allocator, ID, length);
   try {
       // same code as in consumer
       buffer.writeInt(....);
       ...
       out.write(result, promise);
   } catch (Throwable t) {
       handleException(buffer, t);
   }
   ```
   That way we only have the virtual call in the exception case, and no extra 
object. And it looks minimally more verbose/duplicate.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
##########
@@ -187,6 +205,17 @@ static ByteBuffer allocatedHeaderBuffer() {
                return new ByteBuffer[] { allocatedHeaderBuffer(), null };
        }
 
+       public static ByteBuffer tryReadByteBuffer(FileChannel channel, int 
size) {
+               ByteBuffer bb = ByteBuffer.allocate(size);
+               try {
+                       tryReadByteBuffer(channel, bb);
+                       bb.flip();
+               } catch (IOException ex) {
+                       ExceptionUtils.rethrow(ex);

Review comment:
       This does not feel quite right. A IO-related utility method that just 
turns an IOException into an unchecked exception.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper for {@link 
org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} instance.
+ *
+ * <p>The file region can be sent via network channel which supports zero-copy 
file transfer,
+ * and it can also be read into memory segment locally via {@link 
#readInto(MemorySegment)}.
+ */
+public class FileRegionBuffer extends DefaultFileRegion implements Buffer {
+
+       /** The number of bytes to be read/transferred from this file region. */
+       private final int bufferSize;
+
+       private final FileChannel fileChannel;
+
+       /** The {@link DataType} this buffer represents. */
+       private final DataType dataType;
+
+       /** Whether the buffer is compressed or not. */
+       private final boolean isCompressed;
+
+       public FileRegionBuffer(
+                       FileChannel fileChannel,
+                       int bufferSize,
+                       DataType dataType,
+                       boolean isCompressed) throws IOException {
+
+               super(fileChannel, fileChannel.position(), bufferSize);
+
+               this.fileChannel = checkNotNull(fileChannel);
+               this.bufferSize = bufferSize;
+               this.dataType = checkNotNull(dataType);
+               this.isCompressed = isCompressed;
+       }
+
+       // 
------------------------------------------------------------------------
+       // Buffer override methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isBuffer() {
+               return dataType.isBuffer();
+       }
+
+       @Override
+       public MemorySegment getMemorySegment() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getMemorySegmentOffset() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) 
{
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getMaxCapacity() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getReaderIndex() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void setReaderIndex(int readerIndex) throws 
IndexOutOfBoundsException {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       /**
+        * This method is only implemented for tests at the moment.
+        */
+       @Override
+       public ByteBuffer getNioBufferReadable() {
+               return BufferReaderWriterUtil.tryReadByteBuffer(fileChannel, 
bufferSize);

Review comment:
       This method here is a little concerning, because if used wrong it voids 
the whole implementation. It reads into a newly allocated buffer.
   
   I assume this is mainly needed for events?
   
   One alternative here could be that the reading of events from the file 
channel produces a regular `NetworkBuffer`. Then this method here would not be 
needed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView 
{
+
+       /** The result subpartition that we read. */
+       private final BoundedBlockingSubpartition parent;
+
+       /** The reader/decoder to the file region with the data we currently 
read from. */
+       private final BoundedData.Reader dataReader;
+
+       /** The remaining number of data buffers (not events) in the result. */
+       private int numDataBuffers;
+
+       /** The remaining number of data buffers and events in the result. */
+       private int numDataAndEventBuffers;
+
+       /** Flag whether this reader is released. */
+       private boolean isReleased;
+
+       private int sequenceNumber;
+
+       BoundedBlockingSubpartitionView(
+               BoundedBlockingSubpartition parent,
+               Path filePath,
+               int numDataBuffers,
+               int numDataAndEventBuffers) throws IOException {
+
+               this.parent = checkNotNull(parent);
+
+               checkNotNull(filePath);
+               this.dataReader = new FileRegionReader(filePath);
+
+               checkArgument(numDataBuffers >= 0);
+               this.numDataBuffers = numDataBuffers;
+
+               checkArgument(numDataAndEventBuffers >= 0);
+               this.numDataAndEventBuffers = numDataAndEventBuffers;
+       }
+
+       @Nullable
+       @Override
+       public BufferAndBacklog getNextBuffer() throws IOException {
+               if (isReleased) {
+                       return null;
+               }
+
+               Buffer current = dataReader.nextBuffer();
+               if (current == null) {
+                       // as per contract, we must return null when the reader 
is empty,
+                       // but also in case the reader is disposed (rather than 
throwing an exception)
+                       return null;
+               }
+
+               updateStatistics(current);
+
+               // We simply assume all the data are non-events for batch jobs 
to avoid pre-fetching the next header
+               Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ? 
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextDataType, numDataBuffers, sequenceNumber++);
+       }
+
+       private void updateStatistics(Buffer buffer) {
+               if (buffer.isBuffer()) {
+                       numDataBuffers--;
+               }
+               numDataAndEventBuffers--;
+       }
+
+       @Override
+       public boolean isAvailable(int numCreditsAvailable) {
+               // We simply assume there are no events except 
EndOfPartitionEvent for bath jobs,
+               // then it has no essential effect to ignore the judgement of 
next event buffer.
+               return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+       }
+
+       @Override
+       public void releaseAllResources() throws IOException {
+               // it is not a problem if this method executes multiple times
+               isReleased = true;
+
+               IOUtils.closeQuietly(dataReader);
+
+               // Notify the parent that this one is released. This allows the 
parent to
+               // eventually release all resources (when all readers are done 
and the
+               // parent is disposed).
+               parent.releaseReaderReference(this);
+       }
+
+       @Override
+       public boolean isReleased() {
+               return isReleased;
+       }
+
+       @Override
+       public Throwable getFailureCause() {
+               // we can never throw an error after this was created
+               return null;
+       }
+
+       @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       }
+
+       @Override
+       public void notifyDataAvailable() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void resumeConsumption() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
+                       parent.parent.getPartitionId(),
+                       parent.getSubPartitionIndex());
+       }
+
+       /**
+        * The reader to read from {@link BoundedBlockingSubpartition} and 
return the wrapped
+        * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} 
based buffer.
+        */
+       static final class FileRegionReader implements BoundedData.Reader {
+
+               private final FileChannel fileChannel;
+
+               private final ByteBuffer headerBuffer;
+
+               private long lastPosition = -1L;
+
+               private int lastBufferSize;
+
+               FileRegionReader(Path filePath) throws IOException {
+                       this.fileChannel = FileChannel.open(filePath, 
StandardOpenOption.READ);
+                       this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+               }
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       // As the file region transfer via network channel will 
not modify the underlining file position,

Review comment:
       Maybe we can optimize this method a bit.
   
   If you inline the code from BufferReaderWriterUtil.readFromByteChannel() 
here, then you can avoid having the logic tracking `lastPosition` etc, and 
simply advance the position always here.
   
   It should also help to avoid some calls to `fileChannel.position()`, which 
are not cheap.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
##########
@@ -74,7 +74,10 @@
 
        /** All created and not yet released readers. */
        @GuardedBy("lock")
-       private final Set<BoundedBlockingSubpartitionReader> readers;
+       private final Set<ResultSubpartitionView> readers;
+
+       /** Flag whether SSL is enabled for network transfer with file type. */
+       private final boolean fileWithoutSSLEnabled;

Review comment:
       How about renaming this to `useDirectFileTransfer` ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView 
{
+
+       /** The result subpartition that we read. */
+       private final BoundedBlockingSubpartition parent;
+
+       /** The reader/decoder to the file region with the data we currently 
read from. */
+       private final BoundedData.Reader dataReader;
+
+       /** The remaining number of data buffers (not events) in the result. */
+       private int numDataBuffers;
+
+       /** The remaining number of data buffers and events in the result. */
+       private int numDataAndEventBuffers;
+
+       /** Flag whether this reader is released. */
+       private boolean isReleased;
+
+       private int sequenceNumber;
+
+       BoundedBlockingSubpartitionView(
+               BoundedBlockingSubpartition parent,
+               Path filePath,
+               int numDataBuffers,
+               int numDataAndEventBuffers) throws IOException {
+
+               this.parent = checkNotNull(parent);
+
+               checkNotNull(filePath);
+               this.dataReader = new FileRegionReader(filePath);
+
+               checkArgument(numDataBuffers >= 0);
+               this.numDataBuffers = numDataBuffers;
+
+               checkArgument(numDataAndEventBuffers >= 0);
+               this.numDataAndEventBuffers = numDataAndEventBuffers;
+       }
+
+       @Nullable
+       @Override
+       public BufferAndBacklog getNextBuffer() throws IOException {
+               if (isReleased) {
+                       return null;
+               }
+
+               Buffer current = dataReader.nextBuffer();
+               if (current == null) {
+                       // as per contract, we must return null when the reader 
is empty,
+                       // but also in case the reader is disposed (rather than 
throwing an exception)
+                       return null;
+               }
+
+               updateStatistics(current);
+
+               // We simply assume all the data are non-events for batch jobs 
to avoid pre-fetching the next header
+               Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ? 
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextDataType, numDataBuffers, sequenceNumber++);
+       }
+
+       private void updateStatistics(Buffer buffer) {
+               if (buffer.isBuffer()) {
+                       numDataBuffers--;
+               }
+               numDataAndEventBuffers--;
+       }
+
+       @Override
+       public boolean isAvailable(int numCreditsAvailable) {
+               // We simply assume there are no events except 
EndOfPartitionEvent for bath jobs,
+               // then it has no essential effect to ignore the judgement of 
next event buffer.
+               return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+       }
+
+       @Override
+       public void releaseAllResources() throws IOException {
+               // it is not a problem if this method executes multiple times
+               isReleased = true;
+
+               IOUtils.closeQuietly(dataReader);
+
+               // Notify the parent that this one is released. This allows the 
parent to
+               // eventually release all resources (when all readers are done 
and the
+               // parent is disposed).
+               parent.releaseReaderReference(this);
+       }
+
+       @Override
+       public boolean isReleased() {
+               return isReleased;
+       }
+
+       @Override
+       public Throwable getFailureCause() {
+               // we can never throw an error after this was created
+               return null;
+       }
+
+       @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       }
+
+       @Override
+       public void notifyDataAvailable() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void resumeConsumption() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
+                       parent.parent.getPartitionId(),
+                       parent.getSubPartitionIndex());
+       }
+
+       /**
+        * The reader to read from {@link BoundedBlockingSubpartition} and 
return the wrapped
+        * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} 
based buffer.
+        */
+       static final class FileRegionReader implements BoundedData.Reader {
+
+               private final FileChannel fileChannel;
+
+               private final ByteBuffer headerBuffer;
+
+               private long lastPosition = -1L;
+
+               private int lastBufferSize;
+
+               FileRegionReader(Path filePath) throws IOException {
+                       this.fileChannel = FileChannel.open(filePath, 
StandardOpenOption.READ);
+                       this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+               }
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       // As the file region transfer via network channel will 
not modify the underlining file position,

Review comment:
       I think this is very fragile, using the `lastPosition` to distinguish 
the uses.
   
   Can we say that this part here (creating regions from the file) always 
advances the position, and the actual reading part never advances the position? 
For that, the `FileRegionBuffer#readInto(...)` would also need to take the 
offset/length fields stored in the region to handle this.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView 
{
+
+       /** The result subpartition that we read. */
+       private final BoundedBlockingSubpartition parent;
+
+       /** The reader/decoder to the file region with the data we currently 
read from. */
+       private final BoundedData.Reader dataReader;
+
+       /** The remaining number of data buffers (not events) in the result. */
+       private int numDataBuffers;
+
+       /** The remaining number of data buffers and events in the result. */
+       private int numDataAndEventBuffers;
+
+       /** Flag whether this reader is released. */
+       private boolean isReleased;
+
+       private int sequenceNumber;
+
+       BoundedBlockingSubpartitionView(
+               BoundedBlockingSubpartition parent,
+               Path filePath,
+               int numDataBuffers,
+               int numDataAndEventBuffers) throws IOException {
+
+               this.parent = checkNotNull(parent);
+
+               checkNotNull(filePath);
+               this.dataReader = new FileRegionReader(filePath);
+
+               checkArgument(numDataBuffers >= 0);
+               this.numDataBuffers = numDataBuffers;
+
+               checkArgument(numDataAndEventBuffers >= 0);
+               this.numDataAndEventBuffers = numDataAndEventBuffers;
+       }
+
+       @Nullable
+       @Override
+       public BufferAndBacklog getNextBuffer() throws IOException {
+               if (isReleased) {
+                       return null;
+               }
+
+               Buffer current = dataReader.nextBuffer();
+               if (current == null) {
+                       // as per contract, we must return null when the reader 
is empty,
+                       // but also in case the reader is disposed (rather than 
throwing an exception)
+                       return null;
+               }
+
+               updateStatistics(current);
+
+               // We simply assume all the data are non-events for batch jobs 
to avoid pre-fetching the next header
+               Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ? 
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextDataType, numDataBuffers, sequenceNumber++);
+       }
+
+       private void updateStatistics(Buffer buffer) {
+               if (buffer.isBuffer()) {
+                       numDataBuffers--;
+               }
+               numDataAndEventBuffers--;
+       }
+
+       @Override
+       public boolean isAvailable(int numCreditsAvailable) {
+               // We simply assume there are no events except 
EndOfPartitionEvent for bath jobs,
+               // then it has no essential effect to ignore the judgement of 
next event buffer.
+               return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+       }
+
+       @Override
+       public void releaseAllResources() throws IOException {
+               // it is not a problem if this method executes multiple times
+               isReleased = true;
+
+               IOUtils.closeQuietly(dataReader);
+
+               // Notify the parent that this one is released. This allows the 
parent to
+               // eventually release all resources (when all readers are done 
and the
+               // parent is disposed).
+               parent.releaseReaderReference(this);
+       }
+
+       @Override
+       public boolean isReleased() {
+               return isReleased;
+       }
+
+       @Override
+       public Throwable getFailureCause() {
+               // we can never throw an error after this was created
+               return null;
+       }
+
+       @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       }
+
+       @Override
+       public void notifyDataAvailable() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void resumeConsumption() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
+                       parent.parent.getPartitionId(),
+                       parent.getSubPartitionIndex());
+       }
+
+       /**
+        * The reader to read from {@link BoundedBlockingSubpartition} and 
return the wrapped
+        * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} 
based buffer.
+        */
+       static final class FileRegionReader implements BoundedData.Reader {
+
+               private final FileChannel fileChannel;
+
+               private final ByteBuffer headerBuffer;
+
+               private long lastPosition = -1L;
+
+               private int lastBufferSize;
+
+               FileRegionReader(Path filePath) throws IOException {
+                       this.fileChannel = FileChannel.open(filePath, 
StandardOpenOption.READ);
+                       this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+               }
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       // As the file region transfer via network channel will 
not modify the underlining file position,

Review comment:
       If you want, I can try and create a suggestion for this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to