StephanEwen commented on a change in pull request #13523:
URL: https://github.com/apache/flink/pull/13523#discussion_r516206985
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -371,6 +368,22 @@ ByteBuf write(ByteBufAllocator allocator) throws
IOException {
}
}
+ private ByteBuf fillHeader(ByteBufAllocator allocator) {
+ // in order to forward the buffer to netty, it needs an
allocator set
+ buffer.setAllocator(allocator);
Review comment:
It is a bit confusing that this happens in a method called
`fillHeader()`. I would suggest to leave this out of this method.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -491,6 +505,7 @@ static ErrorResponse readFrom(ByteBuf buffer) throws
Exception {
static class PartitionRequest extends NettyMessage {
private static final byte ID = 2;
+ private static final int LENGTH = 20 + 40 + 4 + 16 + 4;
Review comment:
NIT: This is an unrelated change.
It also does not necessarily become easier this way. Before, the length
computation was inline, and one could see at a glance where the numbers came
from (lengths, IDs, etc.) because the contents was written in the lines after.
Now, this sum of numbers stands isolated and one does not see easily where the
number 4, 20, 16, etc. come from and what they mean.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView
{
Review comment:
Can we rename this to `BoundedBlockingSubpartitionDirectTransferReader`?
Then the naming scheme is similar to `BoundedBlockingSubpartitionReader`.
It is a bit confusing when some two different implementation on the same
level use different naming patterns.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper for {@link
org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} instance.
Review comment:
Would be good to explain a bit more why this class is like it is, and
how it works:
- This class implements `Buffer` so we can move it can be used like
another `Buffer`, although it behaves "read-only" style, so many methods throw
UnsupportedOperationException.
- This extends from Netty's `DefaultFileRegion` so we don't need to do any
special handling. Netty will internally treat this differently than the
`Buffer` that implements `ByteBuf`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -552,6 +556,7 @@ public String toString() {
static class TaskEventRequest extends NettyMessage {
private static final byte ID = 3;
+ private static final int LENGTH = 4 + 20 + 40 + 16;
Review comment:
See comment above.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
##########
@@ -691,23 +672,12 @@ static CloseRequest readFrom(@SuppressWarnings("unused")
ByteBuf buffer) throws
}
@Override
- ByteBuf write(ByteBufAllocator allocator) throws IOException {
- ByteBuf result = null;
-
- try {
- result = allocateBuffer(allocator, ID, 4 + 16);
- result.writeInt(credit);
- receiverId.writeTo(result);
-
- return result;
- }
- catch (Throwable t) {
- if (result != null) {
- result.release();
- }
-
- throw new IOException(t);
- }
+ void write(ChannelOutboundInvoker out, ChannelPromise promise,
ByteBufAllocator allocator) throws IOException {
+ Consumer<ByteBuf> consumer = (bb) -> {
Review comment:
Do you think we should avoid introducing another virtual/interface
method call here, and another object instantiation for the lambda argument
capture, or is that not important at this point?
Maybe for the most common messages that are on the performance path
(BufferResponse, AddCredit). BufferResponse is already avoiding this, maybe
AddCredit can as well.
If we want to avoid it, we could also go with this pattern:
```java
final ByteBuf buffer = allocateBuffer(allocator, ID, length);
try {
// same code as in consumer
buffer.writeInt(....);
...
out.write(result, promise);
} catch (Throwable t) {
handleException(buffer, t);
}
```
That way we only have the virtual call in the exception case, and no extra
object. And it looks minimally more verbose/duplicate.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
##########
@@ -187,6 +205,17 @@ static ByteBuffer allocatedHeaderBuffer() {
return new ByteBuffer[] { allocatedHeaderBuffer(), null };
}
+ public static ByteBuffer tryReadByteBuffer(FileChannel channel, int
size) {
+ ByteBuffer bb = ByteBuffer.allocate(size);
+ try {
+ tryReadByteBuffer(channel, bb);
+ bb.flip();
+ } catch (IOException ex) {
+ ExceptionUtils.rethrow(ex);
Review comment:
This does not feel quite right. A IO-related utility method that just
turns an IOException into an unchecked exception.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FileRegionBuffer.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper for {@link
org.apache.flink.shaded.netty4.io.netty.channel.FileRegion} instance.
+ *
+ * <p>The file region can be sent via network channel which supports zero-copy
file transfer,
+ * and it can also be read into memory segment locally via {@link
#readInto(MemorySegment)}.
+ */
+public class FileRegionBuffer extends DefaultFileRegion implements Buffer {
+
+ /** The number of bytes to be read/transferred from this file region. */
+ private final int bufferSize;
+
+ private final FileChannel fileChannel;
+
+ /** The {@link DataType} this buffer represents. */
+ private final DataType dataType;
+
+ /** Whether the buffer is compressed or not. */
+ private final boolean isCompressed;
+
+ public FileRegionBuffer(
+ FileChannel fileChannel,
+ int bufferSize,
+ DataType dataType,
+ boolean isCompressed) throws IOException {
+
+ super(fileChannel, fileChannel.position(), bufferSize);
+
+ this.fileChannel = checkNotNull(fileChannel);
+ this.bufferSize = bufferSize;
+ this.dataType = checkNotNull(dataType);
+ this.isCompressed = isCompressed;
+ }
+
+ //
------------------------------------------------------------------------
+ // Buffer override methods
+ //
------------------------------------------------------------------------
+
+ @Override
+ public boolean isBuffer() {
+ return dataType.isBuffer();
+ }
+
+ @Override
+ public MemorySegment getMemorySegment() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public int getMemorySegmentOffset() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length)
{
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public int getMaxCapacity() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public int getReaderIndex() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public void setReaderIndex(int readerIndex) throws
IndexOutOfBoundsException {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ /**
+ * This method is only implemented for tests at the moment.
+ */
+ @Override
+ public ByteBuffer getNioBufferReadable() {
+ return BufferReaderWriterUtil.tryReadByteBuffer(fileChannel,
bufferSize);
Review comment:
This method here is a little concerning, because if used wrong it voids
the whole implementation. It reads into a newly allocated buffer.
I assume this is mainly needed for events?
One alternative here could be that the reading of events from the file
channel produces a regular `NetworkBuffer`. Then this method here would not be
needed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView
{
+
+ /** The result subpartition that we read. */
+ private final BoundedBlockingSubpartition parent;
+
+ /** The reader/decoder to the file region with the data we currently
read from. */
+ private final BoundedData.Reader dataReader;
+
+ /** The remaining number of data buffers (not events) in the result. */
+ private int numDataBuffers;
+
+ /** The remaining number of data buffers and events in the result. */
+ private int numDataAndEventBuffers;
+
+ /** Flag whether this reader is released. */
+ private boolean isReleased;
+
+ private int sequenceNumber;
+
+ BoundedBlockingSubpartitionView(
+ BoundedBlockingSubpartition parent,
+ Path filePath,
+ int numDataBuffers,
+ int numDataAndEventBuffers) throws IOException {
+
+ this.parent = checkNotNull(parent);
+
+ checkNotNull(filePath);
+ this.dataReader = new FileRegionReader(filePath);
+
+ checkArgument(numDataBuffers >= 0);
+ this.numDataBuffers = numDataBuffers;
+
+ checkArgument(numDataAndEventBuffers >= 0);
+ this.numDataAndEventBuffers = numDataAndEventBuffers;
+ }
+
+ @Nullable
+ @Override
+ public BufferAndBacklog getNextBuffer() throws IOException {
+ if (isReleased) {
+ return null;
+ }
+
+ Buffer current = dataReader.nextBuffer();
+ if (current == null) {
+ // as per contract, we must return null when the reader
is empty,
+ // but also in case the reader is disposed (rather than
throwing an exception)
+ return null;
+ }
+
+ updateStatistics(current);
+
+ // We simply assume all the data are non-events for batch jobs
to avoid pre-fetching the next header
+ Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ?
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+ return BufferAndBacklog.fromBufferAndLookahead(current,
nextDataType, numDataBuffers, sequenceNumber++);
+ }
+
+ private void updateStatistics(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ numDataBuffers--;
+ }
+ numDataAndEventBuffers--;
+ }
+
+ @Override
+ public boolean isAvailable(int numCreditsAvailable) {
+ // We simply assume there are no events except
EndOfPartitionEvent for bath jobs,
+ // then it has no essential effect to ignore the judgement of
next event buffer.
+ return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+ }
+
+ @Override
+ public void releaseAllResources() throws IOException {
+ // it is not a problem if this method executes multiple times
+ isReleased = true;
+
+ IOUtils.closeQuietly(dataReader);
+
+ // Notify the parent that this one is released. This allows the
parent to
+ // eventually release all resources (when all readers are done
and the
+ // parent is disposed).
+ parent.releaseReaderReference(this);
+ }
+
+ @Override
+ public boolean isReleased() {
+ return isReleased;
+ }
+
+ @Override
+ public Throwable getFailureCause() {
+ // we can never throw an error after this was created
+ return null;
+ }
+
+ @Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return parent.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
+ public void notifyDataAvailable() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public void resumeConsumption() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Blocking Subpartition Reader: ID=%s,
index=%d",
+ parent.parent.getPartitionId(),
+ parent.getSubPartitionIndex());
+ }
+
+ /**
+ * The reader to read from {@link BoundedBlockingSubpartition} and
return the wrapped
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}
based buffer.
+ */
+ static final class FileRegionReader implements BoundedData.Reader {
+
+ private final FileChannel fileChannel;
+
+ private final ByteBuffer headerBuffer;
+
+ private long lastPosition = -1L;
+
+ private int lastBufferSize;
+
+ FileRegionReader(Path filePath) throws IOException {
+ this.fileChannel = FileChannel.open(filePath,
StandardOpenOption.READ);
+ this.headerBuffer =
BufferReaderWriterUtil.allocatedHeaderBuffer();
+ }
+
+ @Nullable
+ @Override
+ public Buffer nextBuffer() throws IOException {
+ // As the file region transfer via network channel will
not modify the underlining file position,
Review comment:
Maybe we can optimize this method a bit.
If you inline the code from BufferReaderWriterUtil.readFromByteChannel()
here, then you can avoid having the logic tracking `lastPosition` etc, and
simply advance the position always here.
It should also help to avoid some calls to `fileChannel.position()`, which
are not cheap.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
##########
@@ -74,7 +74,10 @@
/** All created and not yet released readers. */
@GuardedBy("lock")
- private final Set<BoundedBlockingSubpartitionReader> readers;
+ private final Set<ResultSubpartitionView> readers;
+
+ /** Flag whether SSL is enabled for network transfer with file type. */
+ private final boolean fileWithoutSSLEnabled;
Review comment:
How about renaming this to `useDirectFileTransfer` ?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView
{
+
+ /** The result subpartition that we read. */
+ private final BoundedBlockingSubpartition parent;
+
+ /** The reader/decoder to the file region with the data we currently
read from. */
+ private final BoundedData.Reader dataReader;
+
+ /** The remaining number of data buffers (not events) in the result. */
+ private int numDataBuffers;
+
+ /** The remaining number of data buffers and events in the result. */
+ private int numDataAndEventBuffers;
+
+ /** Flag whether this reader is released. */
+ private boolean isReleased;
+
+ private int sequenceNumber;
+
+ BoundedBlockingSubpartitionView(
+ BoundedBlockingSubpartition parent,
+ Path filePath,
+ int numDataBuffers,
+ int numDataAndEventBuffers) throws IOException {
+
+ this.parent = checkNotNull(parent);
+
+ checkNotNull(filePath);
+ this.dataReader = new FileRegionReader(filePath);
+
+ checkArgument(numDataBuffers >= 0);
+ this.numDataBuffers = numDataBuffers;
+
+ checkArgument(numDataAndEventBuffers >= 0);
+ this.numDataAndEventBuffers = numDataAndEventBuffers;
+ }
+
+ @Nullable
+ @Override
+ public BufferAndBacklog getNextBuffer() throws IOException {
+ if (isReleased) {
+ return null;
+ }
+
+ Buffer current = dataReader.nextBuffer();
+ if (current == null) {
+ // as per contract, we must return null when the reader
is empty,
+ // but also in case the reader is disposed (rather than
throwing an exception)
+ return null;
+ }
+
+ updateStatistics(current);
+
+ // We simply assume all the data are non-events for batch jobs
to avoid pre-fetching the next header
+ Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ?
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+ return BufferAndBacklog.fromBufferAndLookahead(current,
nextDataType, numDataBuffers, sequenceNumber++);
+ }
+
+ private void updateStatistics(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ numDataBuffers--;
+ }
+ numDataAndEventBuffers--;
+ }
+
+ @Override
+ public boolean isAvailable(int numCreditsAvailable) {
+ // We simply assume there are no events except
EndOfPartitionEvent for bath jobs,
+ // then it has no essential effect to ignore the judgement of
next event buffer.
+ return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+ }
+
+ @Override
+ public void releaseAllResources() throws IOException {
+ // it is not a problem if this method executes multiple times
+ isReleased = true;
+
+ IOUtils.closeQuietly(dataReader);
+
+ // Notify the parent that this one is released. This allows the
parent to
+ // eventually release all resources (when all readers are done
and the
+ // parent is disposed).
+ parent.releaseReaderReference(this);
+ }
+
+ @Override
+ public boolean isReleased() {
+ return isReleased;
+ }
+
+ @Override
+ public Throwable getFailureCause() {
+ // we can never throw an error after this was created
+ return null;
+ }
+
+ @Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return parent.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
+ public void notifyDataAvailable() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public void resumeConsumption() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Blocking Subpartition Reader: ID=%s,
index=%d",
+ parent.parent.getPartitionId(),
+ parent.getSubPartitionIndex());
+ }
+
+ /**
+ * The reader to read from {@link BoundedBlockingSubpartition} and
return the wrapped
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}
based buffer.
+ */
+ static final class FileRegionReader implements BoundedData.Reader {
+
+ private final FileChannel fileChannel;
+
+ private final ByteBuffer headerBuffer;
+
+ private long lastPosition = -1L;
+
+ private int lastBufferSize;
+
+ FileRegionReader(Path filePath) throws IOException {
+ this.fileChannel = FileChannel.open(filePath,
StandardOpenOption.READ);
+ this.headerBuffer =
BufferReaderWriterUtil.allocatedHeaderBuffer();
+ }
+
+ @Nullable
+ @Override
+ public Buffer nextBuffer() throws IOException {
+ // As the file region transfer via network channel will
not modify the underlining file position,
Review comment:
I think this is very fragile, using the `lastPosition` to distinguish
the uses.
Can we say that this part here (creating regions from the file) always
advances the position, and the actual reading part never advances the position?
For that, the `FileRegionBuffer#readInto(...)` would also need to take the
offset/length fields stored in the region to handle this.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionView.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition based on
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}.
+ */
+public class BoundedBlockingSubpartitionView implements ResultSubpartitionView
{
+
+ /** The result subpartition that we read. */
+ private final BoundedBlockingSubpartition parent;
+
+ /** The reader/decoder to the file region with the data we currently
read from. */
+ private final BoundedData.Reader dataReader;
+
+ /** The remaining number of data buffers (not events) in the result. */
+ private int numDataBuffers;
+
+ /** The remaining number of data buffers and events in the result. */
+ private int numDataAndEventBuffers;
+
+ /** Flag whether this reader is released. */
+ private boolean isReleased;
+
+ private int sequenceNumber;
+
+ BoundedBlockingSubpartitionView(
+ BoundedBlockingSubpartition parent,
+ Path filePath,
+ int numDataBuffers,
+ int numDataAndEventBuffers) throws IOException {
+
+ this.parent = checkNotNull(parent);
+
+ checkNotNull(filePath);
+ this.dataReader = new FileRegionReader(filePath);
+
+ checkArgument(numDataBuffers >= 0);
+ this.numDataBuffers = numDataBuffers;
+
+ checkArgument(numDataAndEventBuffers >= 0);
+ this.numDataAndEventBuffers = numDataAndEventBuffers;
+ }
+
+ @Nullable
+ @Override
+ public BufferAndBacklog getNextBuffer() throws IOException {
+ if (isReleased) {
+ return null;
+ }
+
+ Buffer current = dataReader.nextBuffer();
+ if (current == null) {
+ // as per contract, we must return null when the reader
is empty,
+ // but also in case the reader is disposed (rather than
throwing an exception)
+ return null;
+ }
+
+ updateStatistics(current);
+
+ // We simply assume all the data are non-events for batch jobs
to avoid pre-fetching the next header
+ Buffer.DataType nextDataType = numDataAndEventBuffers > 0 ?
Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
+ return BufferAndBacklog.fromBufferAndLookahead(current,
nextDataType, numDataBuffers, sequenceNumber++);
+ }
+
+ private void updateStatistics(Buffer buffer) {
+ if (buffer.isBuffer()) {
+ numDataBuffers--;
+ }
+ numDataAndEventBuffers--;
+ }
+
+ @Override
+ public boolean isAvailable(int numCreditsAvailable) {
+ // We simply assume there are no events except
EndOfPartitionEvent for bath jobs,
+ // then it has no essential effect to ignore the judgement of
next event buffer.
+ return numCreditsAvailable > 0 && numDataAndEventBuffers > 0;
+ }
+
+ @Override
+ public void releaseAllResources() throws IOException {
+ // it is not a problem if this method executes multiple times
+ isReleased = true;
+
+ IOUtils.closeQuietly(dataReader);
+
+ // Notify the parent that this one is released. This allows the
parent to
+ // eventually release all resources (when all readers are done
and the
+ // parent is disposed).
+ parent.releaseReaderReference(this);
+ }
+
+ @Override
+ public boolean isReleased() {
+ return isReleased;
+ }
+
+ @Override
+ public Throwable getFailureCause() {
+ // we can never throw an error after this was created
+ return null;
+ }
+
+ @Override
+ public int unsynchronizedGetNumberOfQueuedBuffers() {
+ return parent.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
+ public void notifyDataAvailable() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public void resumeConsumption() {
+ throw new UnsupportedOperationException("Method should never be
called.");
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Blocking Subpartition Reader: ID=%s,
index=%d",
+ parent.parent.getPartitionId(),
+ parent.getSubPartitionIndex());
+ }
+
+ /**
+ * The reader to read from {@link BoundedBlockingSubpartition} and
return the wrapped
+ * {@link org.apache.flink.shaded.netty4.io.netty.channel.FileRegion}
based buffer.
+ */
+ static final class FileRegionReader implements BoundedData.Reader {
+
+ private final FileChannel fileChannel;
+
+ private final ByteBuffer headerBuffer;
+
+ private long lastPosition = -1L;
+
+ private int lastBufferSize;
+
+ FileRegionReader(Path filePath) throws IOException {
+ this.fileChannel = FileChannel.open(filePath,
StandardOpenOption.READ);
+ this.headerBuffer =
BufferReaderWriterUtil.allocatedHeaderBuffer();
+ }
+
+ @Nullable
+ @Override
+ public Buffer nextBuffer() throws IOException {
+ // As the file region transfer via network channel will
not modify the underlining file position,
Review comment:
If you want, I can try and create a suggestion for this change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]