dawidwys commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r679161140
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -486,40 +489,48 @@ protected void endData() throws Exception {
this.endOfDataReceived = true;
}
- private void resetSynchronousSavepointId(long id, boolean succeeded) {
- if (!succeeded && activeSyncSavepointId != null &&
activeSyncSavepointId == id) {
- // allow to process further EndOfPartition events
- activeSyncSavepointId = null;
- operatorChain.setIgnoreEndOfInput(false);
- }
- syncSavepointId = null;
- }
-
- private void setSynchronousSavepointId(long checkpointId, boolean
ignoreEndOfInput) {
+ protected void setSynchronousSavepoint(long checkpointId, boolean isDrain)
{
checkState(
- syncSavepointId == null,
+ syncSavepointWithoutDrain == null
+ && (syncSavepointWithDrain == null
+ || (isDrain && syncSavepointWithDrain ==
checkpointId)),
Review comment:
Ha, I added a comment, but must've lost it when merging some
intermediate methods I had.
I don't like it, but I could not find a better solution. the problem is that
we use the `performCheckpoint` both in the `StreamSourceTask` and e.g.
`OneInputStreamTask`. In case of `OneInputStreamTask` it will be called only
once from within the `performCheckpoint`, but for `StreamSourceTask` and
`SourceOperatorStreamTask` it will be called twice:
1. just before emitting the endData
2. when calling the performCheckpoint after the endData
Let me look once again, if I can figure something out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]