zhuzhurk commented on code in PR #545: URL: https://github.com/apache/flink-web/pull/545#discussion_r898738184
########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -9,44 +9,44 @@ authors: name: "Dawid Wysakowicz" - Daisy Tsang: name: "Daisy Tsang" -excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finished. +excerpt: This post briefly describes the motivation and changes made by the final checkpoint mechanism, including the changes to the checkpoint procedure and how tasks finish. --- # Motivation Flink is a distributed processing engine for both unbounded and bounded streams of data. In recent versions, Flink has unified the DataStream API and the Table / SQL API to support both streaming and batch cases. -Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing +Since most users require both types of data processing pipelines, the unification helps reduce the complexity of developing, operating, and maintaining consistency between streaming and batch backfilling jobs, like [the case for Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020). +Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode. +The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources. +The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the +tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the +preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal +with records containing retraction or exploit the property that data is roughly sorted by event times in streaming mode +(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover, +users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require streaming execution mode. + <center> <img vspace="20" style="width:70%" src="{{site.baseurl}}/img/blog/2022-06-01-final-checkpoint/stream_batch_cmp.png" /> <p style="font-size: 0.6em;text-align:left;margin-top:-1em;margin-bottom: 4em"> - Figure 1. A comparison of the Stream mode and Batch mode for the example count operator. For streaming mode, the arrived + Figure 1. A comparison of the Streaming mode and Batch mode for the example Count operator. For streaming mode, the arrived elements are not sorted, the operator would read / write the state corresponding to the element for computation. For batch mode, the arrived elements are first sorted as a whole and then processed. </p> </center> -Flink provides two execution modes under the unified programming API: the streaming mode and the batch mode. -The streaming mode processes records incrementally based on the states, thus it supports both bounded and unbounded sources. -The batch mode works with bounded sources and usually has a better performance for bounded jobs because it executes all the -tasks in topological order and avoids random state access by pre-sorting the input records. Although batch mode is often the -preferred mode to process bounded jobs, streaming mode is also required for various reasons. For example, users may want to deal -with records containing retraction or exploit the property that data is roughly sorted by event times in stream mode -(like the case in [Kappa+ Architecture](https://www.youtube.com/watch?t=666&v=4qSlsYogALo&feature=youtu.be)). Moreover, -users often have mixed jobs involving both unbounded streams and bounded side-inputs, which also require stream execution mode. - In streaming mode, [checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/) -is the vital component in supporting exactly-once guarantees. By periodically snapshotting the +is the vital mechanism in supporting exactly-once guarantees. By periodically snapshotting the aligned states of operators, Flink can recover from the latest checkpoint and continue execution when failover happens. However, -previously Flink could not take checkpoints if any tasks finished. This would cause problems for jobs with both bounded and unbounded +previously Flink could not take checkpoints if any tasks gets finished. This would cause problems for jobs with both bounded and unbounded Review Comment: tasks -> task ########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure. Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots, then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might -already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent -tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce -which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. +have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running +precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of +an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states, +the operator would be marked as fully finished. The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing -finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully +finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully finished operators have running precedents after restarting, which conflicts with the design that tasks finished in topological order. # Revise the Process of Finishing Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit -operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs +operators could not commit all the data when running in streaming mode. As the background, Flink jobs have two ways to finish: -1. All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished. -2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set, -the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all -records processed (like `endInput()`). +1. All sources are bound and they processed all the input records. The job will finish after all the sources are finished. Review Comment: The statement is not accurate that the job finishes when all sources have finished. The previous statement `start to finish` seems better. But maybe it's better to say that "The job will finish after all the input records are processed and all the result records are committed to external sinks." ########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure. Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots, then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might -already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent -tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce -which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. +have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running +precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of +an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states, +the operator would be marked as fully finished. The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing -finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully +finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully finished operators have running precedents after restarting, which conflicts with the design that tasks finished in topological order. # Revise the Process of Finishing Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit -operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs +operators could not commit all the data when running in streaming mode. As the background, Flink jobs have two ways to finish: -1. All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished. -2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set, -the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all -records processed (like `endInput()`). +1. All sources are bound and they processed all the input records. The job will finish after all the sources are finished. +2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job +will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might +be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be +recovered from the savepoint. -Ideally we should ensure exactly-once semantics in both cases. +Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once, +two-phase-commit operators only commit data after a checkpoint following this piece of data succeed. +However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish, +and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since +if there are failovers after that (like due to other unfinished tasks get failed), the data will be replayed and cause replication. -To achieve exactly-once, currently two-phase-commit operators only commit data when a checkpoint after all these -data succeed. However, for the bounded source case, currently there is no suitable time point to commit the data -between the last periodic checkpoint and the task get finished: if we commit them directly on task get finished, -then if there are failovers after that (like due to other unfinished tasks get failed), these records would be -re-emitted and cause data duplication; if we discard these records as now, these records would be lost and cause data loss. +The case of `stop-with-savepoint --drain` also has problems. The previous implementation first stalls the execution and +take a savepoint. After the savepoint succeeds, all the source tasks would stop actively. Although the savepoint seems to +provide the opportunity to commit all the data, some processing logic is in fact executed during the job getting stopped, +and the records produced would be deserted by mistake. For example, calling `endInput()` method for operators happens during Review Comment: deserted -> do you mean discarded? ########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure. Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots, then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might -already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent -tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce -which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. +have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running +precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of +an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states, +the operator would be marked as fully finished. The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing -finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully +finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully finished operators have running precedents after restarting, which conflicts with the design that tasks finished in topological order. # Revise the Process of Finishing Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit -operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs +operators could not commit all the data when running in streaming mode. As the background, Flink jobs have two ways to finish: -1. All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished. -2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set, -the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all -records processed (like `endInput()`). +1. All sources are bound and they processed all the input records. The job will finish after all the sources are finished. +2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job +will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might +be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be +recovered from the savepoint. -Ideally we should ensure exactly-once semantics in both cases. +Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once, +two-phase-commit operators only commit data after a checkpoint following this piece of data succeed. +However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish, +and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since +if there are failovers after that (like due to other unfinished tasks get failed), the data will be replayed and cause replication. Review Comment: replication -> duplication ########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -142,26 +144,26 @@ which might prolong the total execution time for a long time. </p> </center> -For the case of `stop-with-savepoint [--drain]`, the intuitive idea is also flawed since different tasks have to +For the case of `stop-with-savepoint [--drain]`, the intuitive idea does not work since different tasks have to wait for different checkpoints / savepoints, thus we could not finish the job with a specific savepoint. -To further overcome these issues, we’d better decouple "finishing operator logic" and "finishing tasks": if we could first -let all the operators finish their execution logic as a whole, including calling lifecycle methods `endInput()`, then each operator -could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we could also simply make all the operators to -wait for one specific savepoint taken after operators finishing their execution logic. Therefore, in this way the two types of -finish process could be unified. +Therefore, we do not take the intuitive option. Instead, we decoupled *"finishing operator logic"* and *"finishing tasks"*: +all the operators would first finish their execution logic as a whole, including calling lifecycle methods like `endInput()`, +then each operator could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we also reverted the current +implementation similarly: all the tasks will first finish executing the operators' logic, then they simply wait for the next savepoint +to happen before finish. Therefore, in this way the finish processes are unified and the data could be fully committed for all the cases. Based on this thought, as shown in the right part of Figure 3, to decoupled the process of "finishing operator logic" -and "finishing tasks", we introduced a new `EndOfDataEvent`. For each task, after executing all the operator logic it would first notify -the descendants with an `EndOfDataEvent` so that the descendants also have chances to finish executing the operator logic. Then all +and "finishing tasks", we introduced a new `EndOfData` event. For each task, after executing all the operator logic it would first notify +the descendants with an `EndOfData` event so that the descendants also have chances to finish executing the operator logic. Then all the tasks could wait for the next checkpoint or the specified savepoint concurrently to commit all the remaining data before getting finished. At last, it is also worthy to mention we have clarified and renamed the `close()` and `dispose()` methods in the operators’ lifecycle. The two methods are in fact different since `close()` is only called when the task finishes normally and dispose() is called in both cases of normal finishing and failover. However, this was not clear from their names. Therefore, we rename the two methods to `finish()` and `close()`: - `finish()` marks the termination of the operator and no more records are allowed after `finish()` is called. It should -- only be called when sources are finished or when the `-–drain` parameter is specified. + only be called when sources are finished or when the `-–drain` parameter is specified. - `close()` is used to do cleanup and release all the holding resources. Review Comment: holding -> held? ########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure. Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots, then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might -already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent -tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce -which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. +have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running +precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of +an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states, +the operator would be marked as fully finished. The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing -finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully +finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully finished operators have running precedents after restarting, which conflicts with the design that tasks finished in topological order. # Revise the Process of Finishing Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit -operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs +operators could not commit all the data when running in streaming mode. As the background, Flink jobs have two ways to finish: -1. All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished. -2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set, -the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all -records processed (like `endInput()`). +1. All sources are bound and they processed all the input records. The job will finish after all the sources are finished. +2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job +will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might +be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be +recovered from the savepoint. -Ideally we should ensure exactly-once semantics in both cases. +Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once, +two-phase-commit operators only commit data after a checkpoint following this piece of data succeed. +However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish, Review Comment: finish -> finishment or finishing? ########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -142,26 +144,26 @@ which might prolong the total execution time for a long time. </p> </center> -For the case of `stop-with-savepoint [--drain]`, the intuitive idea is also flawed since different tasks have to +For the case of `stop-with-savepoint [--drain]`, the intuitive idea does not work since different tasks have to wait for different checkpoints / savepoints, thus we could not finish the job with a specific savepoint. -To further overcome these issues, we’d better decouple "finishing operator logic" and "finishing tasks": if we could first -let all the operators finish their execution logic as a whole, including calling lifecycle methods `endInput()`, then each operator -could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we could also simply make all the operators to -wait for one specific savepoint taken after operators finishing their execution logic. Therefore, in this way the two types of -finish process could be unified. +Therefore, we do not take the intuitive option. Instead, we decoupled *"finishing operator logic"* and *"finishing tasks"*: +all the operators would first finish their execution logic as a whole, including calling lifecycle methods like `endInput()`, +then each operator could wait for the next checkpoint concurrently. Besides, for stop-with-savepoint we also reverted the current +implementation similarly: all the tasks will first finish executing the operators' logic, then they simply wait for the next savepoint +to happen before finish. Therefore, in this way the finish processes are unified and the data could be fully committed for all the cases. Review Comment: finish -> finishing ########## _posts/2022-06-01-final-checkpoint-part1.md: ########## @@ -81,59 +81,61 @@ running subtasks. The states could be repartitioned on restarting and all the ne To support creating such a checkpoint for jobs with finished tasks, we extended the checkpoint procedure. Previously the checkpoint coordinator inside the JobManager first notifies all the sources to report snapshots, then all the sources further notify their descendants via broadcasting barrier events. Since now the sources might -already finish, the checkpoint coordinator would instead treat the running tasks who do not have running precedent -tasks as “new sources”, and notifies these tasks to initiate the checkpoints. The checkpoint could then deduce -which operator is fully finished based on the task states when triggering checkpoint and the received snapshots. +have already finished, the checkpoint coordinator would instead treat the running tasks who also do not have running +precedent tasks as "new sources", and it notifies these tasks to initiate the checkpoints. Finally, if the subtasks of +an operator are either finished on triggering checkpoint or have finished processing all the data on snapshotting states, +the operator would be marked as fully finished. The changes of the checkpoint procedure are transparent to users except that for checkpoints indeed containing -finished tasks, we disallowed adding new operators before the fully finished ones, since it would make the fully +finished tasks, we disallowed adding new operators as precedents of the fully finished ones, since it would make the fully finished operators have running precedents after restarting, which conflicts with the design that tasks finished in topological order. # Revise the Process of Finishing Based on the ability to take checkpoints with finished tasks, we could then solve the issue that two-phase-commit -operators could not commit all the data when running in streaming mode. As the background, currently Flink jobs +operators could not commit all the data when running in streaming mode. As the background, Flink jobs have two ways to finish: -1. All sources are bound and they processed all the input records. The job will start to finish after all the sources are finished. -2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. If the `–-drain` parameter is not set, -the savepoint might be used to start new jobs and the operators will not flush all the event times or call methods marking all -records processed (like `endInput()`). +1. All sources are bound and they processed all the input records. The job will finish after all the sources are finished. +2. Users execute `stop-with-savepoint [--drain]`. The job will take a savepoint and then finish. With `–-drain`, the job +will be stopped permanently and is also required to commit all the data. However, without `--drain` the job might +be resumed from the savepoint later, thus not all data are required to be committed, as long as the state of the data could be +recovered from the savepoint. -Ideally we should ensure exactly-once semantics in both cases. +Let's first have a look at the case of bounded sources. To achieve end-to-end exactly-once, +two-phase-commit operators only commit data after a checkpoint following this piece of data succeed. +However, previously there is no such an opportunity for the data between the last periodic checkpoint and job finish, +and the data finally gets lost. Note that it is also not correct if we directly commit the data on job finished, since +if there are failovers after that (like due to other unfinished tasks get failed), the data will be replayed and cause replication. -To achieve exactly-once, currently two-phase-commit operators only commit data when a checkpoint after all these -data succeed. However, for the bounded source case, currently there is no suitable time point to commit the data -between the last periodic checkpoint and the task get finished: if we commit them directly on task get finished, -then if there are failovers after that (like due to other unfinished tasks get failed), these records would be -re-emitted and cause data duplication; if we discard these records as now, these records would be lost and cause data loss. +The case of `stop-with-savepoint --drain` also has problems. The previous implementation first stalls the execution and +take a savepoint. After the savepoint succeeds, all the source tasks would stop actively. Although the savepoint seems to Review Comment: take -> takes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
