wsry commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r665059027



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionDirectTransferReader.java
##########
@@ -91,10 +91,14 @@ public BufferAndBacklog getNextBuffer() throws IOException {
 
         updateStatistics(current);
 
-        // We simply assume all the data are non-events for batch jobs to 
avoid pre-fetching the
-        // next header
-        Buffer.DataType nextDataType =
-                numDataAndEventBuffers > 0 ? Buffer.DataType.DATA_BUFFER : 
Buffer.DataType.NONE;
+        // We simply assume all the data except for the last one 
(EndOfPartitionEvent)
+        // are non-events for batch jobs to avoid pre-fetching the next header
+        Buffer.DataType nextDataType = Buffer.DataType.NONE;
+        if (numDataBuffers > 0) {
+            nextDataType = Buffer.DataType.DATA_BUFFER;
+        } else if (numDataAndEventBuffers > 0) {
+            nextDataType = Buffer.DataType.EVENT_BUFFER;
+        }

Review comment:
       I think this is just an optimization. Without this change, the backlog 
announced to the downstream does not include the event and EVENT_BUFFER and 
DATA_BUFFER are not distinguished, this means the event buffer also need a 
credit to send. If there is no exclusive buffer, no enough buffer will be 
allocated for the event, because we only announce the data buffer backlog to 
the downstream. As a result, some tests will hang for there is no credit for 
the event buffer. To solve this dead lock, there are two simple ways:
   1. Announce both data buffer and event buffer backlog to the downstream, 
this lead to allocate more buffers than needed, these buffers will be released 
when the EOF is received at the downstream task.
   2. Distinguish EVENT_BUFFER and DATA_BUFFER just like what is doing now.
   
   These choices are both acceptable to me. I chose the second one because 
EVENT_BUFFER and DATA_BUFFER are distinguished at the downstream task and we 
can allocate one less buffer.
   
   Both of the choices need to do some change to 
BoundedBlockingSubpartitionDirectTransferReader. Which one do you prefer? I 
think both are acceptable for me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to