pnowojski commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r664651808



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java
##########
@@ -69,6 +69,9 @@ Buffer allocatePooledNetworkBuffer(InputChannelID receiverId) 
{
      * @return The un-pooled network buffer.
      */
     Buffer allocateUnPooledNetworkBuffer(int size, Buffer.DataType dataType) {
+        if (size <= 0) {
+            return null;
+        }

Review comment:
       1. `@Nullable`
   2. Do we really need to support `null` here? Can not we return empty 
`Buffer`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -357,11 +359,26 @@ public void resumeConsumption() throws IOException {
         checkState(!isReleased.get(), "Channel released.");
         checkPartitionRequestQueueInitialized();
 
+        if (initialCredit == 0) {
+            unannouncedCredit.set(0);

Review comment:
       But how can this floating credit be assigned to this blocked 
`RemoteInputChannel`? Wouldn't it cause the same deadlock, when floating 
buffers are assigned to blocked channels and job/task can not make any progress?
   
   It sounds like maybe this should have been handled sooner when trying to 
increase `unannouncedCredit`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -315,6 +326,17 @@ BufferAndBacklog pollBuffer() {
                 if (buffer.readableBytes() > 0) {
                     break;
                 }
+
+                // if we have an empty finished buffer and the exclusive 
credit is 0, we just return
+                // the empty buffer so that the downstream task can release 
the allocated credit for
+                // this empty buffer, this happens in two main scenarios 
currently:
+                // 1. all data of a buffer builder has been read and after 
that the buffer builder
+                // is finished
+                // 2. in approximate recovery mode, a partial record takes a 
whole buffer builder
+                if (buffersPerChannel == 0 && bufferConsumer.isFinished()) {
+                    break;
+                }
+

Review comment:
       Instead of this check here, can not we add similar check outside of the 
`while (...)` loop? For example replace:
   
   ```
   if (buffer == null) {
       return null;
   }
   ```
   with something like:
   ```
   if (buffer == null) {
      if (buffersPerChannel == 0) {
          return EMPTY_BUFFER;
       }
       else {
          return null;
       }
   }
   ```
   ?
   That way, we would avoid sending empty buffer if there are still more 
buffers in the backlog that are already enqueued?
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
         }
     }
 
+    /**
+     * Announces remaining backlog to the consumer after the available data 
notification or data
+     * consumption resumption.
+     */
+    private void announceBacklog(NetworkSequenceViewReader reader) {
+        int backlog = reader.getRemainingBacklog();
+        if (backlog > 0) {

Review comment:
       Secondly, `getRemainingBacklog()` is a very costly operation (additional 
synchronisation) that I think could have been avoided:
   
   1. Backlog can go up only as a result of 
`org.apache.flink.runtime.io.network.netty.PartitionRequestQueue#notifyReaderNonEmpty(reader)`.
 
   2. Backlog can go down, only as a result of polling the data from the 
`reader`.
   3. So instead of using thread safe `reader.getRemainingBacklog()`, we could 
re-use existing synchronisation in 1. and 2., to maintain `remainingBacklog` in 
the netty thread (in the `PartitionRequestQueue`.
   
   
   But it would be even better to completely avoid this check (my comment 
above).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -340,7 +355,15 @@ private void decodeBufferOrEvent(
             RemoteInputChannel inputChannel, NettyMessage.BufferResponse 
bufferOrEvent)
             throws Throwable {
         if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
-            inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, 
bufferOrEvent.backlog);
+            try {
+                inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, 
bufferOrEvent.backlog);
+            } finally {
+                // recycle the empty buffer directly
+                Buffer buffer = bufferOrEvent.getBuffer();
+                if (buffer != null) {
+                    buffer.recycleBuffer();

Review comment:
       What do you mean by that @wsry ? That previously 
`bufferOrEevnt.getBuffer()` was always `null`?
   
   If so why do we need to keep suport for sending both empty buffers or null 
buffer?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
         }
     }
 
+    /**
+     * Announces remaining backlog to the consumer after the available data 
notification or data
+     * consumption resumption.
+     */
+    private void announceBacklog(NetworkSequenceViewReader reader) {
+        int backlog = reader.getRemainingBacklog();
+        if (backlog > 0) {

Review comment:
       Have you tested this @wsry ? After all it seems like if reader is 
available, it should have `backlog > 0`, shouldn't it?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -433,8 +459,8 @@ public void operationComplete(ChannelFuture future) throws 
Exception {
 
         @Override
         public Object buildMessage() {
-            return new AddCredit(
-                    inputChannel.getAndResetUnannouncedCredit(), 
inputChannel.getInputChannelId());
+            int credits = inputChannel.getAndResetUnannouncedCredit();
+            return credits > 0 ? new AddCredit(credits, 
inputChannel.getInputChannelId()) : null;

Review comment:
       Why is this an issue? Is this an independent optimisation or is it 
relate with the other parts of the PR?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -468,6 +485,11 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                 return;
             }
 
+            if (buffer.getDataType().isBlockingUpstream()) {
+                onBlockingUpstream();
+                checkArgument(backlog == 0, "Illegal number of backlog.");

Review comment:
       add the `backlog` value to the error message?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
##########
@@ -433,8 +459,8 @@ public void operationComplete(ChannelFuture future) throws 
Exception {
 
         @Override
         public Object buildMessage() {
-            return new AddCredit(
-                    inputChannel.getAndResetUnannouncedCredit(), 
inputChannel.getInputChannelId());
+            int credits = inputChannel.getAndResetUnannouncedCredit();
+            return credits > 0 ? new AddCredit(credits, 
inputChannel.getInputChannelId()) : null;

Review comment:
       Secondly, return type of the `buildMessage()` is not annotated 
`@Nullable`, so if we need this code for correctness we need to add this 
annotation (but it would be better to avoid `null` here)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -517,19 +539,20 @@ private void increaseBuffersInBacklog(BufferConsumer 
buffer) {
         }
     }
 
-    /**
-     * Gets the number of non-event buffers in this subpartition.
-     *
-     * <p><strong>Beware:</strong> This method should only be used in tests in 
non-concurrent access
-     * scenarios since it does not make any concurrency guarantees.
-     */
-    @SuppressWarnings("FieldAccessNotGuarded")
-    @VisibleForTesting
+    /** Gets the number of non-event buffers in this subpartition. */
     public int getBuffersInBacklog() {
-        if (flushRequested || isFinished) {
-            return buffersInBacklog;
-        } else {
-            return Math.max(buffersInBacklog - 1, 0);
+        synchronized (buffers) {

Review comment:
       `pollBuffer()` would be acquiring the lock twice, wouldn't it? If you 
really need to make this method `public` you should keep `private int 
getBuffersInBacklogUnsafe()` without any synchronisation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
##########
@@ -91,10 +91,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {
 
         updateStatistics(current);
 
-        // We simply assume all the data are non-events for batch jobs to 
avoid pre-fetching the
-        // next header
-        Buffer.DataType nextDataType =
-                numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER : 
Buffer.DataType.NONE;
+        // We simply assume all the data except for the last one 
(EndOfPartitionEvent)
+        // are non-events for batch jobs to avoid pre-fetching the next header
+        Buffer.DataType nextDataType = Buffer.DataType.NONE;
+        if (numDataBuffers > 0) {
+            nextDataType = Buffer.DataType.DATA_BUFFER;
+        } else if (numDataAndEventBuffers > 0) {
+            nextDataType = Buffer.DataType.EVENT_BUFFER;
+        }

Review comment:
       But what would be a problem with requesting for a credit for the 
`EndOfPartitionEvent`? In other words, what's wrong with doing it as it was 
done previously: always returning `DATA_BUFFER` or `NONE`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to