sunhaibotb opened a new pull request #10151: [FLINK-14231] Handle the processing-time timers before closing operator to properly support endInput URL: https://github.com/apache/flink/pull/10151 ## What is the purpose of the change This pull request properly handles the processing-time timers before closing operator to properly support endInput. For the non-tail operators in the operator chain, when the upstream operator is closed, the input of its downstream operator should arrive at the end. So for an operator chain `OP1 -> OP2 -> ...`, after the (source/network) inputs of `OP1` are finished, the logic of closing operators in the task should be as follows: 1. quiesce ProcessingTimeService to disallow `OP1` from registering new timers. 2. wait for the pending (registered) timers of `OP1` to finish. 3. call `OP1#close` 4. call `OP2#endInput` 5. quiesce `ProcessingTimeService` to disallow `OP2` from registering new timers. 6. wait for the pending (registered) timers of `OP2` to finish. 7. call `OP2#close` ... ## Brief change log - Modifies `StreamTask` to bind a `ProcessingTimeServiceImpl` instance for each operator. - Changes `ProcessingTimeServiceImpl` to support quiescing and getting the `CompletableFuture` which marks the finish of all pending timers after quiescing - Changes `StreamTask` to close all operators in the operator chain one by one through the mailbox thread and implement the proper processing-time timer deal before the operator is closed. ## Verifying this change This change added tests and can be verified as follows: - Added test for `ProcessingTimeServiceImpl` that validates the quiesce state and the correctness of the `CompletableFuture` marking the done of all pending timers after quiescing. - Added tests for `ProcessingTimeServiceImpl` that validates the correctness of the map stored the pending timers when registering timers, cancelling timers, and the registered timer throws an execution exception. - Extended test for `SourceStreamTask` to verify that the timer registered in `endInput` is completed normally, while those registered in `close` is not triggered. - Extended test for `OneInputStreamTask` to verify that the timer registered in `endInput` is completed normally, while those registered in `close` is not triggered. - Extended test for `TwoInputStreamTask` to verify that the timer registered in `endInput` is completed normally, while those registered in `close` is not triggered. - Added tests for `StreamTask` to verify whether the task is marked as failed when the non-header operators reported an error in `endInput` or any operator reported an error in `close`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
