dawidwys commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r680016741
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -472,40 +486,60 @@ protected void
processInput(MailboxDefaultAction.Controller controller) throws E
new
ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
}
- private void resetSynchronousSavepointId(long id, boolean succeeded) {
- if (!succeeded && activeSyncSavepointId != null &&
activeSyncSavepointId == id) {
- // allow to process further EndOfPartition events
- activeSyncSavepointId = null;
- operatorChain.setIgnoreEndOfInput(false);
+ protected void endData() throws Exception {
+ advanceToEndOfEventTime();
+ // finish all operators in a chain effect way
+ operatorChain.finishOperators(actionExecutor);
+
+ for (ResultPartitionWriter partitionWriter :
getEnvironment().getAllWriters()) {
+ partitionWriter.notifyEndOfData();
}
- syncSavepointId = null;
+
+ this.endOfDataReceived = true;
}
Review comment:
I don't quite get the second part of the comment.
We're calling` endInput` in a single place. Only in the `endData` (plus the
special handling for `StreamSource` which was explicitly requested so that it
can be dropped along with `StreamSourceTask` and was done that way before as
well). The entire purpose of this task is to call it not when the Task finishes
but when it receives the `endData`.
If I understand the first part correctly you're referring solely to the
`SourceOperatorStreamTask`, am I right? All other tasks call `endData` from a
single path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]