pnowojski commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r664651808
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java
##########
@@ -69,6 +69,9 @@ Buffer allocatePooledNetworkBuffer(InputChannelID receiverId)
{
* @return The un-pooled network buffer.
*/
Buffer allocateUnPooledNetworkBuffer(int size, Buffer.DataType dataType) {
+ if (size <= 0) {
+ return null;
+ }
Review comment:
1. `@Nullable`
2. Do we really need to support `null` here? Can not we return empty
`Buffer`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException {
checkState(!isReleased.get(), "Channel released.");
checkPartitionRequestQueueInitialized();
+ if (initialCredit == 0) {
+ unannouncedCredit.set(0);
Review comment:
But how can this floating credit be assigned to this blocked
`RemoteInputChannel`? Wouldn't it cause the same deadlock, when floating
buffers are assigned to blocked channels and job/task can not make any progress?
It sounds like maybe this should have been handled sooner when trying to
increase `unannouncedCredit`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -315,6 +326,17 @@ BufferAndBacklog pollBuffer() {
if (buffer.readableBytes() > 0) {
break;
}
+
+ // if we have an empty finished buffer and the exclusive
credit is 0, we just return
+ // the empty buffer so that the downstream task can release
the allocated credit for
+ // this empty buffer, this happens in two main scenarios
currently:
+ // 1. all data of a buffer builder has been read and after
that the buffer builder
+ // is finished
+ // 2. in approximate recovery mode, a partial record takes a
whole buffer builder
+ if (buffersPerChannel == 0 && bufferConsumer.isFinished()) {
+ break;
+ }
+
Review comment:
Instead of this check here, can not we add similar check outside of the
`while (...)` loop? For example replace:
```
if (buffer == null) {
return null;
}
```
with something like:
```
if (buffer == null) {
if (buffersPerChannel == 0) {
return EMPTY_BUFFER;
}
else {
return null;
}
}
```
?
That way, we would avoid sending empty buffer if there are still more
buffers in the backlog that are already enqueued?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
}
}
+ /**
+ * Announces remaining backlog to the consumer after the available data
notification or data
+ * consumption resumption.
+ */
+ private void announceBacklog(NetworkSequenceViewReader reader) {
+ int backlog = reader.getRemainingBacklog();
+ if (backlog > 0) {
Review comment:
Secondly, `getRemainingBacklog()` is a very costly operation (additional
synchronisation) that I think could have been avoided:
1. Backlog can go up only as a result of
`org.apache.flink.runtime.io.network.netty.PartitionRequestQueue#notifyReaderNonEmpty(reader)`.
2. Backlog can go down, only as a result of polling the data from the
`reader`.
3. So instead of using thread safe `reader.getRemainingBacklog()`, we could
re-use existing synchronisation in 1. and 2., to maintain `remainingBacklog` in
the netty thread (in the `PartitionRequestQueue`.
But it would be even better to completely avoid this check (my comment
above).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -340,7 +355,15 @@ private void decodeBufferOrEvent(
RemoteInputChannel inputChannel, NettyMessage.BufferResponse
bufferOrEvent)
throws Throwable {
if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
- inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber,
bufferOrEvent.backlog);
+ try {
+ inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber,
bufferOrEvent.backlog);
+ } finally {
+ // recycle the empty buffer directly
+ Buffer buffer = bufferOrEvent.getBuffer();
+ if (buffer != null) {
+ buffer.recycleBuffer();
Review comment:
What do you mean by that @wsry ? That previously
`bufferOrEevnt.getBuffer()` was always `null`?
If so why do we need to keep suport for sending both empty buffers or null
buffer?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
}
}
+ /**
+ * Announces remaining backlog to the consumer after the available data
notification or data
+ * consumption resumption.
+ */
+ private void announceBacklog(NetworkSequenceViewReader reader) {
+ int backlog = reader.getRemainingBacklog();
+ if (backlog > 0) {
Review comment:
Have you tested this @wsry ? After all it seems like if reader is
available, it should have `backlog > 0`, shouldn't it?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -433,8 +459,8 @@ public void operationComplete(ChannelFuture future) throws
Exception {
@Override
public Object buildMessage() {
- return new AddCredit(
- inputChannel.getAndResetUnannouncedCredit(),
inputChannel.getInputChannelId());
+ int credits = inputChannel.getAndResetUnannouncedCredit();
+ return credits > 0 ? new AddCredit(credits,
inputChannel.getInputChannelId()) : null;
Review comment:
Why is this an issue? Is this an independent optimisation or is it
relate with the other parts of the PR?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -468,6 +485,11 @@ public void onBuffer(Buffer buffer, int sequenceNumber,
int backlog) throws IOEx
return;
}
+ if (buffer.getDataType().isBlockingUpstream()) {
+ onBlockingUpstream();
+ checkArgument(backlog == 0, "Illegal number of backlog.");
Review comment:
add the `backlog` value to the error message?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -433,8 +459,8 @@ public void operationComplete(ChannelFuture future) throws
Exception {
@Override
public Object buildMessage() {
- return new AddCredit(
- inputChannel.getAndResetUnannouncedCredit(),
inputChannel.getInputChannelId());
+ int credits = inputChannel.getAndResetUnannouncedCredit();
+ return credits > 0 ? new AddCredit(credits,
inputChannel.getInputChannelId()) : null;
Review comment:
Secondly, return type of the `buildMessage()` is not annotated
`@Nullable`, so if we need this code for correctness we need to add this
annotation (but it would be better to avoid `null` here)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -517,19 +539,20 @@ private void increaseBuffersInBacklog(BufferConsumer
buffer) {
}
}
- /**
- * Gets the number of non-event buffers in this subpartition.
- *
- * <p><strong>Beware:</strong> This method should only be used in tests in
non-concurrent access
- * scenarios since it does not make any concurrency guarantees.
- */
- @SuppressWarnings("FieldAccessNotGuarded")
- @VisibleForTesting
+ /** Gets the number of non-event buffers in this subpartition. */
public int getBuffersInBacklog() {
- if (flushRequested || isFinished) {
- return buffersInBacklog;
- } else {
- return Math.max(buffersInBacklog - 1, 0);
+ synchronized (buffers) {
Review comment:
`pollBuffer()` would be acquiring the lock twice, wouldn't it? If you
really need to make this method `public` you should keep `private int
getBuffersInBacklogUnsafe()` without any synchronisation.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
##########
@@ -91,10 +91,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {
updateStatistics(current);
- // We simply assume all the data are non-events for batch jobs to
avoid pre-fetching the
- // next header
- Buffer.DataType nextDataType =
- numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER :
Buffer.DataType.NONE;
+ // We simply assume all the data except for the last one
(EndOfPartitionEvent)
+ // are non-events for batch jobs to avoid pre-fetching the next header
+ Buffer.DataType nextDataType = Buffer.DataType.NONE;
+ if (numDataBuffers > 0) {
+ nextDataType = Buffer.DataType.DATA_BUFFER;
+ } else if (numDataAndEventBuffers > 0) {
+ nextDataType = Buffer.DataType.EVENT_BUFFER;
+ }
Review comment:
But what would be a problem with requesting for a credit for the
`EndOfPartitionEvent`? In other words, what's wrong with doing it as it was
done previously: always returning `DATA_BUFFER` or `NONE`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]