AHeise commented on a change in pull request #15716:
URL: https://github.com/apache/flink/pull/15716#discussion_r622890960



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -930,6 +943,10 @@ private boolean queueChannelUnsafe(InputChannel channel, 
boolean priority) {
 
         InputChannel inputChannel = inputChannelsWithData.poll();
         enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
+        checkState(
+                
!channelsWithEndOfPartitionEvents.get(inputChannel.getChannelIndex()),
+                "Channel already received EndOfPartition %s",
+                inputChannel.getChannelInfo());

Review comment:
       The checkstate feels a bit overkill now, but as long as there is no 
impact on performance, I guess it's better to be safe than sorry.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -889,13 +898,17 @@ private boolean isOutdated(int sequenceNumber, int 
lastSequenceNumber) {
     }
 
     /**
-     * Queues the channel if not already enqueued, potentially raising the 
priority.
+     * Queues the channel if not already enqueued and not received 
EndOfPartition, potentially
+     * raising the priority.
      *
      * @return true iff it has been enqueued/prioritized = some change to 
{@link
      *     #inputChannelsWithData} happened
      */
     private boolean queueChannelUnsafe(InputChannel channel, boolean priority) 
{
         assert Thread.holdsLock(inputChannelsWithData);
+        if (channelsWithEndOfPartitionEvents.get(channel.getChannelIndex())) {

Review comment:
       We can now probably remove the `isReleased` check in `queueChannel` or 
replace it with the same check? One less volatile read...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to