dawidwys commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r678823514
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
##########
@@ -151,7 +152,11 @@ protected DataInputStatus processEvent(BufferOrEvent
bufferOrEvent) {
final AbstractEvent event = bufferOrEvent.getEvent();
// TODO: with checkpointedInputGate.isFinished() we might not need to
support any events on
// this level.
- if (event.getClass() == EndOfPartitionEvent.class) {
+ if (event.getClass() == EndOfData.class) {
+ if (checkpointedInputGate.hasReceivedEndOfData()) {
+ return DataInputStatus.END_OF_DATA;
+ }
Review comment:
Ok, you're right that all but last `EndOfPartitionEvent` are returned.
There is a slight difference though. Once we received the last
`EndOfPartitionEvent`, the `InputGate` will keep returning `Optional.empty()`
from now on. Therefore we need the logic in
`AbstractStreamTaskNetworkInput#emitNext:121`. We could copy that logic over
to `processEvent` and do
```
protected DataInputStatus processEvent(BufferOrEvent bufferOrEvent) {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() == EndOfData.class) {
// release the record deserializer immediately,
// which is very valuable in case of bounded stream
releaseDeserializer(bufferOrEvent.getChannelInfo());
if (checkpointedInputGate.hasReceivedEndOfData()) {
return DataInputStatus.END_OF_DATA;
}
} else if (event.getClass() == EndOfPartitionEvent.class) {
if (checkpointedInputGate.isFinished()) {
return DataInputStatus.END_OF_INPUT;
}
} else if (event.getClass() == EndOfChannelStateEvent.class) {
if (checkpointedInputGate.allChannelsRecovered()) {
return DataInputStatus.END_OF_RECOVERY;
}
}
return DataInputStatus.MORE_AVAILABLE;
}
```
This would I guess save us one invocation of `emitNext()` as I believe in
the current master we must call the `emitNext` one more time after we received
the last `EndOfPartitionEvent` to finally return `END_OF_INPUT` status. If we
don't mind duplicating the check, I am fine adding this branch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]