rkhachatryan commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r543676375
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -569,14 +569,23 @@ public void convertToPriorityEvent(int sequenceNumber)
throws IOException {
"Attempted to convertToPriorityEvent an event
[%s] that has already been prioritized [%s]",
toPrioritize,
numPriorityElementsBeforeRemoval);
+ // set the priority flag (checked on poll)
+ // don't convert the barrier itself (barrier controller
might not have been switched yet)
+ AbstractEvent e =
EventSerializer.fromBuffer(toPrioritize.buffer,
this.getClass().getClassLoader());
+ toPrioritize.buffer.setReaderIndex(0);
+ toPrioritize = new
SequenceBuffer(EventSerializer.toBuffer(e, true), toPrioritize.sequenceNumber);
firstPriorityEvent = addPriorityBuffer(toPrioritize);
// note that only position of the element is changed
// converting the event
itself would require switching the controller sooner
}
if (firstPriorityEvent) {
- notifyPriorityEvent(sequenceNumber);
+ notifyPriorityEventForce(); // use force here because
the barrier SQN might be seen by gate during the announcement
Review comment:
Rephrased as:
```
// forcibly notify about the priority event
// instead of passing barrier SQN to be checked
// because this SQN might have be seen by the input gate during the
announcement
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]