zhijiangW commented on a change in pull request #13523:
URL: https://github.com/apache/flink/pull/13523#discussion_r516452223



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -371,6 +368,22 @@ ByteBuf write(ByteBufAllocator allocator) throws 
IOException {
                        }
                }
 
+               private ByteBuf fillHeader(ByteBufAllocator allocator) {
+                       // in order to forward the buffer to netty, it needs an 
allocator set
+                       buffer.setAllocator(allocator);

Review comment:
       I ever had the same feeling before exactly as you said. The 
deduplication seems not worth here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -691,23 +672,12 @@ static CloseRequest readFrom(@SuppressWarnings("unused") 
ByteBuf buffer) throws
                }
 
                @Override
-               ByteBuf write(ByteBufAllocator allocator) throws IOException {
-                       ByteBuf result = null;
-
-                       try {
-                               result = allocateBuffer(allocator, ID, 4 + 16);
-                               result.writeInt(credit);
-                               receiverId.writeTo(result);
-
-                               return result;
-                       }
-                       catch (Throwable t) {
-                               if (result != null) {
-                                       result.release();
-                               }
-
-                               throw new IOException(t);
-                       }
+               void write(ChannelOutboundInvoker out, ChannelPromise promise, 
ByteBufAllocator allocator) throws IOException {
+                       Consumer<ByteBuf> consumer = (bb) -> {

Review comment:
       I indeed overlooked the performance concern for the frequent message.  
The `handleException` deduplication seems pretty good for me.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper for {@link 
org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} instance.
+ *
+ * <p>The file region can be sent via network channel which supports zero-copy 
file transfer,
+ * and it can also be read into memory segment locally via {@link 
#readInto(MemorySegment)}.
+ */
+public class FileRegionBuffer extends DefaultFileRegion implements Buffer {
+
+       /** The number of bytes to be read/transferred from this file region. */
+       private final int bufferSize;
+
+       private final FileChannel fileChannel;
+
+       /** The {@link DataType} this buffer represents. */
+       private final DataType dataType;
+
+       /** Whether the buffer is compressed or not. */
+       private final boolean isCompressed;
+
+       public FileRegionBuffer(
+                       FileChannel fileChannel,
+                       int bufferSize,
+                       DataType dataType,
+                       boolean isCompressed) throws IOException {
+
+               super(fileChannel, fileChannel.position(), bufferSize);
+
+               this.fileChannel = checkNotNull(fileChannel);
+               this.bufferSize = bufferSize;
+               this.dataType = checkNotNull(dataType);
+               this.isCompressed = isCompressed;
+       }
+
+       // 
------------------------------------------------------------------------
+       // Buffer override methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public boolean isBuffer() {
+               return dataType.isBuffer();
+       }
+
+       @Override
+       public MemorySegment getMemorySegment() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getMemorySegmentOffset() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) 
{
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getMaxCapacity() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public int getReaderIndex() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void setReaderIndex(int readerIndex) throws 
IndexOutOfBoundsException {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       /**
+        * This method is only implemented for tests at the moment.
+        */
+       @Override
+       public ByteBuffer getNioBufferReadable() {
+               return BufferReaderWriterUtil.tryReadByteBuffer(fileChannel, 
bufferSize);

Review comment:
       As I mentioned in javadoc for this method, it is only used for unit 
tests ATM, not for the core codes.
   Actually in the first version I made this method for 
`UnsupportedOperationException`, but I found many exiting tests rely on this 
method like `BoundedBlockingSubpartitionWriteReadTest`. So I implemented this 
method for avoiding refactoring the logic of unit tests for less efforts. Do 
you think we should refactor the unit test to avoid calling this way instead?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
##########
@@ -187,6 +205,17 @@ static ByteBuffer allocatedHeaderBuffer() {
                return new ByteBuffer[] { allocatedHeaderBuffer(), null };
        }
 
+       public static ByteBuffer tryReadByteBuffer(FileChannel channel, int 
size) {
+               ByteBuffer bb = ByteBuffer.allocate(size);
+               try {
+                       tryReadByteBuffer(channel, bb);
+                       bb.flip();
+               } catch (IOException ex) {
+                       ExceptionUtils.rethrow(ex);

Review comment:
       Agree, I really had the same concern before. Actually my previous way 
was throwing explicit `IOException` here. But I found it also need to adjust 
the interface method `Buffer#getNioBufferReadable` for throwing `IOException`, 
even it would further bring up many serious of changes for exiting usages.
   
   It was also caused by 
[comment](https://github.com/apache/flink/pull/13523#discussion_r516235008). 
Then let's confirm them together.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView 
{
+
+       /** The result subpartition that we read. */
+       private final BoundedBlockingSubpartition parent;
+
+       /** The reader/decoder to the file region with the data we currently 
read from. */
+       private final BoundedData.Reader dataReader;
+
+       /** The remaining number of data buffers (not events) in the result. */
+       private int numDataBuffers;
+
+       /** The remaining number of data buffers and events in the result. */
+       private int numDataAndEventBuffers;
+
+       /** Flag whether this reader is released. */
+       private boolean isReleased;
+
+       private int sequenceNumber;
+
+       BoundedBlockingSubpartitionView(
+               BoundedBlockingSubpartition parent,
+               Path filePath,
+               int numDataBuffers,
+               int numDataAndEventBuffers) throws IOException {
+
+               this.parent = checkNotNull(parent);
+
+               checkNotNull(filePath);
+               this.dataReader = new FileRegionReader(filePath);
+
+               checkArgument(numDataBuffers >= 0);
+               this.numDataBuffers = numDataBuffers;
+
+               checkArgument(numDataAndEventBuffers >= 0);
+               this.numDataAndEventBuffers = numDataAndEventBuffers;
+       }
+
+       @Nullable
+       @Override
+       public BufferAndBacklog getNextBuffer() throws IOException {
+               if (isReleased) {
+                       return null;
+               }
+
+               Buffer current = dataReader.nextBuffer();
+               if (current == null) {
+                       // as per contract, we must return null when the reader 
is empty,
+                       // but also in case the reader is disposed (rather than 
throwing an exception)
+                       return null;
+               }
+
+               updateStatistics(current);
+
+               // We simply assume all the data are non-events for batch jobs 
to avoid pre-fetching the next header
+               Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ? 
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextDataType, numDataBuffers, sequenceNumber++);
+       }
+
+       private void updateStatistics(Buffer buffer) {
+               if (buffer.isBuffer()) {
+                       numDataBuffers--;
+               }
+               numDataAndEventBuffers--;
+       }
+
+       @Override
+       public boolean isAvailable(int numCreditsAvailable) {
+               // We simply assume there are no events except 
EndOfPartitionEvent for bath jobs,
+               // then it has no essential effect to ignore the judgement of 
next event buffer.
+               return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+       }
+
+       @Override
+       public void releaseAllResources() throws IOException {
+               // it is not a problem if this method executes multiple times
+               isReleased = true;
+
+               IOUtils.closeQuietly(dataReader);
+
+               // Notify the parent that this one is released. This allows the 
parent to
+               // eventually release all resources (when all readers are done 
and the
+               // parent is disposed).
+               parent.releaseReaderReference(this);
+       }
+
+       @Override
+       public boolean isReleased() {
+               return isReleased;
+       }
+
+       @Override
+       public Throwable getFailureCause() {
+               // we can never throw an error after this was created
+               return null;
+       }
+
+       @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       }
+
+       @Override
+       public void notifyDataAvailable() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void resumeConsumption() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
+                       parent.parent.getPartitionId(),
+                       parent.getSubPartitionIndex());
+       }
+
+       /**
+        * The reader to read from {@link BoundedBlockingSubpartition} and 
return the wrapped
+        * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} 
based buffer.
+        */
+       static final class FileRegionReader implements BoundedData.Reader {
+
+               private final FileChannel fileChannel;
+
+               private final ByteBuffer headerBuffer;
+
+               private long lastPosition = -1L;
+
+               private int lastBufferSize;
+
+               FileRegionReader(Path filePath) throws IOException {
+                       this.fileChannel = FileChannel.open(filePath, 
StandardOpenOption.READ);
+                       this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+               }
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       // As the file region transfer via network channel will 
not modify the underlining file position,

Review comment:
       The current way is indeed a bit obscure. Actually it is involved in two 
different usages ATM:
   
   - For network file transfer, the underlining file position for data part is 
not changed while transferring.
   
   - For local input channel reading, it will change the respective underlining 
file position via `FileRegionBuffer#readInto`.
   
   So when the `nextBuffer` is called here, it does not know the caller is from 
local channel or network stack, then it can not always advance the position. 
Here I rely on the `lastPosition` for implicitly distinguishing the different 
usages.
   
   My first version PR seems a bit better here to always advance the position 
in the constructor of extended `FileRegion` instance, since it would be only 
used for network stack. But after the refactoring in the current version, I did 
not find another better place to advance the position reasonably. Or I 
misunderstood your proposed way?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView 
{
+
+       /** The result subpartition that we read. */
+       private final BoundedBlockingSubpartition parent;
+
+       /** The reader/decoder to the file region with the data we currently 
read from. */
+       private final BoundedData.Reader dataReader;
+
+       /** The remaining number of data buffers (not events) in the result. */
+       private int numDataBuffers;
+
+       /** The remaining number of data buffers and events in the result. */
+       private int numDataAndEventBuffers;
+
+       /** Flag whether this reader is released. */
+       private boolean isReleased;
+
+       private int sequenceNumber;
+
+       BoundedBlockingSubpartitionView(
+               BoundedBlockingSubpartition parent,
+               Path filePath,
+               int numDataBuffers,
+               int numDataAndEventBuffers) throws IOException {
+
+               this.parent = checkNotNull(parent);
+
+               checkNotNull(filePath);
+               this.dataReader = new FileRegionReader(filePath);
+
+               checkArgument(numDataBuffers >= 0);
+               this.numDataBuffers = numDataBuffers;
+
+               checkArgument(numDataAndEventBuffers >= 0);
+               this.numDataAndEventBuffers = numDataAndEventBuffers;
+       }
+
+       @Nullable
+       @Override
+       public BufferAndBacklog getNextBuffer() throws IOException {
+               if (isReleased) {
+                       return null;
+               }
+
+               Buffer current = dataReader.nextBuffer();
+               if (current == null) {
+                       // as per contract, we must return null when the reader 
is empty,
+                       // but also in case the reader is disposed (rather than 
throwing an exception)
+                       return null;
+               }
+
+               updateStatistics(current);
+
+               // We simply assume all the data are non-events for batch jobs 
to avoid pre-fetching the next header
+               Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ? 
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+               return BufferAndBacklog.fromBufferAndLookahead(current, 
nextDataType, numDataBuffers, sequenceNumber++);
+       }
+
+       private void updateStatistics(Buffer buffer) {
+               if (buffer.isBuffer()) {
+                       numDataBuffers--;
+               }
+               numDataAndEventBuffers--;
+       }
+
+       @Override
+       public boolean isAvailable(int numCreditsAvailable) {
+               // We simply assume there are no events except 
EndOfPartitionEvent for bath jobs,
+               // then it has no essential effect to ignore the judgement of 
next event buffer.
+               return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+       }
+
+       @Override
+       public void releaseAllResources() throws IOException {
+               // it is not a problem if this method executes multiple times
+               isReleased = true;
+
+               IOUtils.closeQuietly(dataReader);
+
+               // Notify the parent that this one is released. This allows the 
parent to
+               // eventually release all resources (when all readers are done 
and the
+               // parent is disposed).
+               parent.releaseReaderReference(this);
+       }
+
+       @Override
+       public boolean isReleased() {
+               return isReleased;
+       }
+
+       @Override
+       public Throwable getFailureCause() {
+               // we can never throw an error after this was created
+               return null;
+       }
+
+       @Override
+       public int unsynchronizedGetNumberOfQueuedBuffers() {
+               return parent.unsynchronizedGetNumberOfQueuedBuffers();
+       }
+
+       @Override
+       public void notifyDataAvailable() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public void resumeConsumption() {
+               throw new UnsupportedOperationException("Method should never be 
called.");
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
+                       parent.parent.getPartitionId(),
+                       parent.getSubPartitionIndex());
+       }
+
+       /**
+        * The reader to read from {@link BoundedBlockingSubpartition} and 
return the wrapped
+        * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} 
based buffer.
+        */
+       static final class FileRegionReader implements BoundedData.Reader {
+
+               private final FileChannel fileChannel;
+
+               private final ByteBuffer headerBuffer;
+
+               private long lastPosition = -1L;
+
+               private int lastBufferSize;
+
+               FileRegionReader(Path filePath) throws IOException {
+                       this.fileChannel = FileChannel.open(filePath, 
StandardOpenOption.READ);
+                       this.headerBuffer = 
BufferReaderWriterUtil.allocatedHeaderBuffer();
+               }
+
+               @Nullable
+               @Override
+               public Buffer nextBuffer() throws IOException {
+                       // As the file region transfer via network channel will 
not modify the underlining file position,

Review comment:
       Yeah, I agree with the fragile point. Let me consider how to refactor it 
a bit.
   
   The `FileRegionBuffer#readInto` relies on `FileChannel#read(ByteBuffer dst)` 
to read the data part, which will automatically updates the position. If I 
always advance the data part position here before actual reading, then for 
local channel reading, I need to roll back the respective position. It might be 
a bit better than current way since we do not need to maintain the 
`lastPosition` variable. My understanding is right?
   
   The realistic way is to only advance the position for network direct 
transfer, and leave the normal advance for local input channel while 
`FileChannel#read(ByteBuffer dst)`. I will further think whether it can be done 
in some place.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to