[ 
https://issues.apache.org/jira/browse/SPARK-49374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siying Dong updated SPARK-49374:
--------------------------------
    Attachment: image-2024-08-23-14-30-22-474.png

> RocksDB State Store Checkpoint Structure V2
> -------------------------------------------
>
>                 Key: SPARK-49374
>                 URL: https://issues.apache.org/jira/browse/SPARK-49374
>             Project: Spark
>          Issue Type: Task
>          Components: Structured Streaming
>    Affects Versions: 4.0.0
>            Reporter: Siying Dong
>            Priority: Major
>         Attachments: image-2024-08-23-14-28-56-418.png, 
> image-2024-08-23-14-29-19-443.png, image-2024-08-23-14-29-59-165.png, 
> image-2024-08-23-14-30-22-474.png
>
>
> h2. Motivation
> We expect the new checkpoint structure would benefit micro-batch mode by 
> establishing characteristics of linear dependency between batch versions. 
> Right now, tasks can be executed multiple times for the same batchID (for 
> speculative execution or rerunning in ForEachBatch), and there can be 
> multiple parallel lineages of state stores going on. For example, in one of 
> the issue with ForEachBatch showed this lineage, which triggered a RocksDB 
> file uploading bug:
>  
>  
> Although we fixed all the bugs, this complexity always makes the system prone 
> to bugs. This non-linear lineage also presents a correctness risk, when some 
> of the Version is changelogs. 
> In the same example, suppose Version 17 is a Snapshot checkpoint and Version 
> 18 is a changelog checkpoint. When we need to recover from a checkpoint, we 
> need to apply Version 17 and Version 18 together. However, Version 18 isn’t 
> generated on top of Version 17. This can happen either because Version 18 is 
> generated by a different worker from Version 17, or the same worker abandoned 
> Version 17, replay this batch and generated Version 17’. In most cases, it is 
> accurate, but we have identified some edge cases where users might be 
> surprised by the results, and there may already be correctness issues. See an 
> example in the 
> [Appendix|https://docs.google.com/document/d/1pT5bwW325VndVH09aZd7TykKXbBP5uCrNIQOC0uDoLc/edit#heading=h.ehd1najwmeos].
>  These correctness issues will become more prominent with transformWithState 
> due to the occurrence of partial updates. Note that the fixing state store 
> lineage only makes sure the state store is consistent, and doesn’t make sure 
> the state store is consistent with outputs to the sink. The inconsistency 
> between 
>  
> Furthermore, the complex checkpoint version lineage makes it hard to reduce 
> overhead for old version cleanup. Even a long running state store has to read 
> metadata for all previous versions and list all files to do any cleanup 
> safely, which is expensive. It is necessary because any version can be 
> written by a different executor and references to RocksDB files that the 
> current executor isn’t aware of. In extreme cases, we can even corrupt the 
> state store. The chance that it happens is very low, but it’s not a good idea 
> to leave unknown correctness risk unfixed.
> h2. Proposal sketch
> The proposed checkpoint structure will ensure a linear dependency:
> This stronger guarantee will be a good foundation for the problems above.
>  
> The proposal’s basic idea is to guarantee linear lineage by not allowing 
> checkpoint overwriting. All checkpoints are made with a new file name with a 
> uniqueID. When starting any batch, a task precisely identifies which 
> checkpoint to load with the uniqueID.
> When a new state store checkpoint is generated, the checkpoint path name 
> includes a globally unique ID, so that it can never be updated. Here is an 
> example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> The name is stored in the commit log too. When the next batch is being 
> executed, those unique IDs are passed to the executors, where they make sure 
> they start to execute from this checkpoint. If the local state store isn’t in 
> this state, it will download the checkpoint from the cloud.
> h1. Part II: Detailed design
> h2. Architecture
> Previously, a state store checkpoint was stored in a path that was only 
> determined by checkpoint root path, operatorID, partitionID and batchID. When 
> a stateful operator is executed, it is always able to construct the path with 
> that information. It will download the checkpoint path from the path of the 
> previous batchID and checkpoint to the path for this batchID. As mentioned 
> earlier, this flexibility comes with a cost: there is no way to distinguish 
> among checkpoints generated by different tasks rerunning for the same batchID.
>  
> In this new design, every checkpoint will go to a globally unique file. The 
> globally unique ID needs to be communicated between driver and the executors, 
> and stored in commit logs. The basic workflow is shown as following:
> h3. File Structure Under Checkpoint Path
> Currently, a checkpoint is stored in path 
> _<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog,
>  zip]._ The path structure will look like following:
>  __ 0 (operator ID)
> +----+
>      | 0 (partitionID)
>      +-----+
> |    ……|
>      | 1 (partitionID)
>      +-----+
> |   | - default (storeName)|
> |    +-----+|
> |         |  20.zip|
> |         |  21.delta|
> |         |  22.delta|
> |          +  23.delta|
>      | 2 (partitionID)
>      +--- ……
>  
> The general structure will be intact, and we only change how files in the 
> root path. For example, in the example above, we will only change file names 
> in the file names in blue color. Instead of naming the files _<batchID>.zip_ 
> or {_}<batchID>.delta{_}, we will name it {_}<batchID>{_}<uniqueID>.zip_ or 
> {_}<batchID>{_}<uniqueID>.delta\{_}. It also means that for the same batchID, 
> there could be multiple files. Here is an example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> The randomID will be generated by the executor itself. In the first version, 
> it is just an UUID, which can be shortened later if needed. In the Appendix, 
> we will discuss a design decision on how to generate these random IDs. These 
> random numbers will be persistent in commit logs. When a new batch starts, 
> these randomIDs will be passed to executors as a part of the operator 
> executors. By the end of the batch, the IDs for the new checkpoints will be 
> passed to the driver through an accumulator, where drivers can persist to the 
> commit logs.
>  
> The lineage information is always managed in commit logs, but the lineage is 
> often needed in state store level when they download checkpoint or do 
> cleanup. To make a state store self-contained, some lineage information is 
> also stored in the .zip and .delta files themselves, so that in the level of 
> a single state store, we can always finish all the operators without relying 
> on outside information. This decision will be discussed in the appendix. Each 
> delta file will contain a unique checkpoint ID since all (presumed) snapshot 
> checkpoints, and all changelog checkpoints following it. Will discuss it in 
> more detail.
>  
> There are still extra complexities to this problem and we will discuss those 
> issues in the following sections.
> 1. Recovery Case. The driver will tell executors which version to use, but 
> how do executors find old snapshots and deltas to use before this version? 
>  # Cleanup old versions.
> h3. Commit Log Format Change
> We will add a new field to commit message {_}CommitMetadata{_}, and add a 
> field, which is a {_}Map[Map[Seq[String]]]{_}, which represents 
> operatorID→storeName→partitionID→checkpointUniqueID. 
> h3. Checkpoint File Format Change
> In both changelog file and zip file, extra information on uniqueIDs since the 
> presumed last snapshot will be written. It is a presumed snapshot because 
> sometimes snapshotting can fail or be delayed so what in place of the 
> presumed last snapshot is only a changelog. In those cases, we can still 
> reconstruct further lineage by reading from that changelog file from the 
> presumed snapshot. By keeping reading lineage from previous snapshots, the 
> whole lineage can always be re-constructed.
>  
> When we need to download a checkpoint, we need to find the latest snapshot 
> checkpoint and all subsequence changelog files. Usually, this can be done by 
> only reading the delta file corresponding to the target unique checkpoint ID. 
> In the cases of snapshot uploading failure, we will need to keep reading more 
> than one delta file.
>  
> In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_}, 
> which contains a list of unique_IDs. These IDs represent unique IDs for 
> version-1, version-2, etc.
>  
> In the zip files, this list will be added to class 
> {_}RocksDBCheckpointMetadata{_}.
>  
> The information should always be available, as
>  # When initially loading a state store, we always have lineage information 
> loaded since the last snapshot.
>  # When we add a new changelog checkpoint, we add one checkpoint ID to the 
> list and it is still the full list.
>  # We can prune the list saved in memory based on presumed snapshot versions.
>  
> An alternative to change checkpoint file format is for the state store to 
> read commit log files to get the lineage, which has shortcomes: 1. it has to 
> read multiple commit logs which can be slow; 2. State store checkpoint 
> directory won’t be self contained and the state store has to understand the 
> commit log. 
> h3. Passing UniqueID from Driver to Executors
> We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID 
> to use. The field will be an array of strings. Each string is corresponding 
> to a uniqueID for one partition. This struct is already serialized to the 
> executors.
> h3. Passing UniqueID to Driver
> The uniqueID will be passed back in a similar approach as 
> {_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.  
> _StatefulOperator_ will have an optional accumulator, with which we receive 
> all the uniqueIDs from all tasks. In some cases, we may receive more than one 
> uniqueID for one partition. The driver can simply pick any of them as the one 
> to commit. Note that it never happens when the executors pass Marlin’s end 
> offset back, as only one attempt will be made. In microbatch mode, some  of 
> the IDs might be from failed tasks, and we might pick it. It should still be 
> correct as even if the task fails, the checkpoint should still be valid to 
> use. One potential problem is that the checkpoint we use may not match the 
> output used in the sink. It is not a problem for now, as it happens today 
> anyway without the problem. We can revisit the decision if we see such a 
> problem in the future.
>  
> We will pass back not just a uniqueID for the new checkpoint based on V, but 
> also the unique ID used for V-1. This will help the driver to validate that 
> lineage is as expected. This step is necessary for Micro-Batch Pipelining, 
> and will also help with async state checkpointing. In both cases, the task 
> may start without knowing the uniqueID it is based on, so it needs to make a 
> guess. The driver needs to do an extra validation. 
> h3. State Store Checkpointing
> When state store checkpointing starts, a unique ID (UUID) is generated, and 
> stored in the state store provider. This ID is used to construct delta and 
> zip file names for the checkpoint. The checkpoint procedure is mostly the 
> same as today. The only thing is that we need to preserve the unique ID and 
> the lineage information and make sure we use the correct one. Since the 
> snapshot checkpoint is done asynchronously, we need to make an appropriate 
> copy to them to make sure they are consistent with the snapshot.
>  
> Checkpointing would look like following:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> Assuming a snapshot checkpointing is scheduled too, so later snapshotting 
> will succeed and the snapshot file will show up:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> 23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> It is possible that {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.zip\{_} fails to be 
> uploaded. In this case, we will stay with a delta file only and continue from 
> there.
> The following delta files may only contain lineage up to Version, but we can 
> further trace back to version 20 by reading 
> {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.delta\{_}:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> It is possible that we have two executors doing a full snapshot. The one 
> picked up for lineage failed but the other one succeeded. In an example, 
> (Version=23, ID=8205c96f) got a full snapshot checkpoint. It will generate a 
> result like this:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> But since 23_8205c96f is never referenced by lineage in either commit log or 
> checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version 
> 23, 23_689aa6bd.delta still is used.
>  
> Note that there is a chance where the same executor executed the same 
> partition of the same batchID twice and both were successful. This is common 
> in the case of ForEachBatch. We need to make sure we name the snapshot 
> correctly. Here is an example:
>  # Executed version 23, picked uniqueID 689aa6bd and successfully uploaded 
> file 23_689aa6bd.delta. 
>  # Make RocksDB checkpoint Ckp1.
>  # Return to the driver, and 23_689aa6bd is chosen to be committed.
>  # Executed version 23, picked uniqueID 8205c96f and successfully uploaded 
> file 23_8205c96f.delta. 
>  # Make RocksDB checkpoint Ckp2.
>  # Maintenance thread wakes up for snapshot checkpoints.
> The maintenance thread needs to upload two snapshots. CKp1 →  
> 23_689aa6bd.zip, as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded 
> as it doesn’t know which one is the one to be committed to the commit log. If 
> it always uploads one, there is a chance that it always picks up the wrong 
> one and we don’t have snapshot checkpointing for a long time, or ever.
> One potential optimization is for the snapshot to wait a little bit for the 
> next batch to start, so that it knows it is 23_689aa6bd that is picked up, so 
> that it can only upload 23_689aa6bd.zip and skip  23_8205c96f.delta. If the 
> next batch doesn’t start timely or ever, it will upload both.
> h3. State Store Load
> As normal cases, most of the time, executors already have the open state 
> store for the one to load. The executor just needs to validate the open state 
> store is at the checkpoint ID that matches the one sent from the driver. If 
> the ID matches, we already have the state store loaded. If not, we need to 
> reload from the cloud storage.
>  
> When we load a checkpoint from the state store, we first construct checkpoint 
> names {_}<batchID>{_}<uniqueID>.zip_ and {_}<batchID>{_}<uniqueID>.delta\{_}. 
> If the former file exists, we just load it. Otherwise, we load the former 
> name, read the first record, which is supposed to be the lineage metadata. 
> Using the metadata, we can download the last snapshot version and apply 
> subsequent delta files.
>  
> {*}Example 1{*}. The uniqueID has a snapshot.
> In this example, we are trying to load (version=23, uniqueID=689aa6bd). 
> 20_d8e2ca47.delta
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> 23_689aa6bd.zip
> We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.
>  
> {*}Example 2{*}. The uniqueID only contains a delta log file.
> If we still load (version=23, uiqueID=689aa6bd), but there is only a delta 
> file:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> case, we will read lineage from 23_689aa6bd.delta and apply these files: 
> 20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The 
> outcome is the same even if there is snapshot file for the same version but 
> with different checkpoint:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.
>  
> {*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.
> In the case where snapshot uploading fails, we will need to continue tracing 
> the lineage. In following example,
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> When we are loading (version=24, ID=32e3cc2a), we only have lineage up to 
> 23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will 
> open 23_689aa6bd.delta, and trace back files. Eventually, we will need to 
> apply following files: 20_d8e2ca47.zip,21_f4d05ac9.delta, 
> 22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.
> Note that, even if snapshot file shows up for a uniqueID, we will ignored 
> them, and use the same 
> h3. Compatibility
> The new format will not be consumed by previous releases. So the feature will 
> be turned off by default for microbatch mode. Later we will switch the 
> default to be on.
>  
> Within the same release, we will allow users to switch between the V1 and V2. 
> The driver will read the commit log and see whether uniqueID is available or 
> not. If it is available, it is sent to the executors so that they can load 
> checkpoints accordingly. Otherwise, it will assume it is V1. When switching 
> mode, we will need to make sure that the first checkpoint is a full snapshot 
> checkpoint synchronously. In this case, loading a checkpoint only needs to 
> deal with files generated by either V1 or V2, not a mixture of the two.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to