gaoyunhaii opened a new pull request #16905:
URL: https://github.com/apache/flink/pull/16905
## What is the purpose of the change
This PR fixes the concurrent issue between triggering savepoint and tasks
finish. When triggering stop-with-savepoint --drain, we would first stop the
operators and then trigger a savepoint. However, currently for all the three
tasks, namely `SourceStreamTask`, `SourceOperatorStreamTask` and
`MultipleInputStreamTask`, after stopping (done in the main thread),
`triggerCheckpointAsync()` is done in non-mailbox thread, if the thread run
slowly, when `triggerCheckpointAsync` adds the mail for triggering savepoint,
the task may already execute its finish process and close the mailbox, which
would cause the trigger fail. Since currently we assert the trigger throws no
exceptions, thus trigger failure would cause fatal error and make the TM exit.
To fix this issue, we would like to ensure the trigger happens immediately
after the operators are "finished", before the tasks head to execute the logic
in `afterInvoke()`. We would discuss in detail in the following.
**SourceOperatorStreamTask and MultipleInputStreamTask**
For these two tasks, the `mainOperator.stop()` happens in the akka thread,
this might cause issues since this method would modify some variables that also
access in the mailbox thread.
Besides, `stop()` return a future, which would be completed after finish()
method is called, and akka thread would add a new stage to call
`triggerCheckpointAsync()`. If when the stage is added, the finish() method is
not called, then there should be no problem since the `finish()` method is
called in the mailbox thread, and the trigger happens immediately after
`finish()`. But if when the stage is added `finish()` method is already called,
then the stage would be executed directly inside the akka thread and the order
with task's finish process is not determined.
To fix this issue, we might make the whole process, namely stop operator,
add stage, in the mailbox thread. In this way we ensures the checkpoint trigger
happens right after the `finish()` method is called.
**SourceStreamTask**
The source stream task currently add a new stage to `triggerCheckpointAsync`
to the `sourceThread.getCompletionFuture()`, however, the stage is add in the
akka thread and the future is completed in the legacy source thread, both of
them could not guarantee the mail is added before tasks finish.
To fix this issue, we introduce a new `allDataFinishedFuture` that is
completed inside the mailbox, right before the legacy source thread completed.
Then similar to the other two tasks. the stage to trigger checkpoint could be
added to this future.
Some other options might not be preferred due to:
1. Only move stopping job and triggering checkpoint inside mailbox thread:
since the `completionFuture` is not completed in the mailbox thread, thus there
should be still problem.
2. Completes `completionFuture` directly in the mailbox: if we move all
positions into mailbox thread, there might be deadlocks in cancel / failed
case: the mailbox throw an error and head to cleanup invoke, which would also
wait for the `completionFuture`. If we instead only move to the mailbox thread
if the `finishReason` is `STOP_WITH_SAVEPOINT_DRAIN`, we might need to have
different logic for this future and might cause some complexity.
**Do we need assert no exceptions ?**
I think perhaps we do not need to assert no exceptions here? If the task is
still running when triggering, the trigger would be expected to succeed, but if
the task is finished / canceled / failed, the trigger would be expected to
fail. Currently we already have logic to deal with the second case that the
checkpoint would be declined and in JM side a global failover would be
triggered to keep consistent. Thus it seems to me we could remove the assertion
? otherwise the trigger failure would cause the TM process to exit, which might
not be expected.
## Brief change log
- 5e7725f41ca06cbd05f9d7e7a3978937a02639aa fixes the issue.
## Verifying this change
It seems to me that it would be hard to add a special test case for this
issue. Thus we only add generalized UT for triggering stop-with-savepoint
--drain.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **yes**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **not applicable**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]