pnowojski commented on a change in pull request #6555: [FLINK-10142][network] 
reduce locking around credit notification
URL: https://github.com/apache/flink/pull/6555#discussion_r210917887
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##########
 @@ -509,33 +514,40 @@ void onSenderBacklog(int backlog) throws IOException {
        }
 
        public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) 
throws IOException {
-               boolean success = false;
+               boolean recycleBuffer = true;
 
                try {
+
+                       final boolean wasEmpty;
                        synchronized (receivedBuffers) {
-                               if (!isReleased.get()) {
-                                       if (expectedSequenceNumber == 
sequenceNumber) {
-                                               int available = 
receivedBuffers.size();
+                               // Similar to notifyBufferAvailable(), make 
sure that we never add a buffer
+                               // after releaseAllResources() released all 
buffers from receivedBuffers
+                               // (see above for details).
+                               if (isReleased.get()) {
+                                       return;
+                               }
 
-                                               receivedBuffers.add(buffer);
-                                               expectedSequenceNumber++;
+                               if (expectedSequenceNumber != sequenceNumber) {
+                                       onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
+                                       return;
+                               }
 
-                                               if (available == 0) {
-                                                       notifyChannelNonEmpty();
-                                               }
+                               wasEmpty = receivedBuffers.isEmpty();
+                               receivedBuffers.add(buffer);
+                               recycleBuffer = false;
+                       }
 
-                                               success = true;
-                                       } else {
-                                               onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
-                                       }
-                               }
+                       ++expectedSequenceNumber;
+
+                       if (wasEmpty) {
+                               notifyChannelNonEmpty();
 
 Review comment:
   has moving this line from under the lock improved performance in some case? 
If not, commit title
   > optimisations reducing lock contention
   
   is misleading and I would change it to something about refactor/clean up.
   
   Btw, again I do not like such squashing multiple changes in one commit. It 
actually took me ~10 minutes to realise and make sure what this change is all 
about and if there are any other meaningful changes or not here. While if you 
split this commit into `[hotfix][network] refactor/cleanups` and 
`[FLINK-10141][network] move notifyChannelNonEmpty outside of synchronised 
block` it would be much much easier to review...
   
   Could you split this before merging?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to