[
https://issues.apache.org/jira/browse/SPARK-49374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Siying Dong updated SPARK-49374:
--------------------------------
Attachment: image-2024-08-23-14-29-59-165.png
> RocksDB State Store Checkpoint Structure V2
> -------------------------------------------
>
> Key: SPARK-49374
> URL: https://issues.apache.org/jira/browse/SPARK-49374
> Project: Spark
> Issue Type: Task
> Components: Structured Streaming
> Affects Versions: 4.0.0
> Reporter: Siying Dong
> Priority: Major
> Attachments: image-2024-08-23-14-28-56-418.png,
> image-2024-08-23-14-29-19-443.png, image-2024-08-23-14-29-59-165.png,
> image-2024-08-23-14-30-22-474.png
>
>
> h2. Motivation
> We expect the new checkpoint structure would benefit micro-batch mode by
> establishing characteristics of linear dependency between batch versions.
> Right now, tasks can be executed multiple times for the same batchID (for
> speculative execution or rerunning in ForEachBatch), and there can be
> multiple parallel lineages of state stores going on. For example, in one of
> the issue with ForEachBatch showed this lineage, which triggered a RocksDB
> file uploading bug:
>
>
> Although we fixed all the bugs, this complexity always makes the system prone
> to bugs. This non-linear lineage also presents a correctness risk, when some
> of the Version is changelogs.
> In the same example, suppose Version 17 is a Snapshot checkpoint and Version
> 18 is a changelog checkpoint. When we need to recover from a checkpoint, we
> need to apply Version 17 and Version 18 together. However, Version 18 isn’t
> generated on top of Version 17. This can happen either because Version 18 is
> generated by a different worker from Version 17, or the same worker abandoned
> Version 17, replay this batch and generated Version 17’. In most cases, it is
> accurate, but we have identified some edge cases where users might be
> surprised by the results, and there may already be correctness issues. See an
> example in the
> [Appendix|https://docs.google.com/document/d/1pT5bwW325VndVH09aZd7TykKXbBP5uCrNIQOC0uDoLc/edit#heading=h.ehd1najwmeos].
> These correctness issues will become more prominent with transformWithState
> due to the occurrence of partial updates. Note that the fixing state store
> lineage only makes sure the state store is consistent, and doesn’t make sure
> the state store is consistent with outputs to the sink. The inconsistency
> between
>
> Furthermore, the complex checkpoint version lineage makes it hard to reduce
> overhead for old version cleanup. Even a long running state store has to read
> metadata for all previous versions and list all files to do any cleanup
> safely, which is expensive. It is necessary because any version can be
> written by a different executor and references to RocksDB files that the
> current executor isn’t aware of. In extreme cases, we can even corrupt the
> state store. The chance that it happens is very low, but it’s not a good idea
> to leave unknown correctness risk unfixed.
> h2. Proposal sketch
> The proposed checkpoint structure will ensure a linear dependency:
> This stronger guarantee will be a good foundation for the problems above.
>
> The proposal’s basic idea is to guarantee linear lineage by not allowing
> checkpoint overwriting. All checkpoints are made with a new file name with a
> uniqueID. When starting any batch, a task precisely identifies which
> checkpoint to load with the uniqueID.
> When a new state store checkpoint is generated, the checkpoint path name
> includes a globally unique ID, so that it can never be updated. Here is an
> example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> The name is stored in the commit log too. When the next batch is being
> executed, those unique IDs are passed to the executors, where they make sure
> they start to execute from this checkpoint. If the local state store isn’t in
> this state, it will download the checkpoint from the cloud.
> h1. Part II: Detailed design
> h2. Architecture
> Previously, a state store checkpoint was stored in a path that was only
> determined by checkpoint root path, operatorID, partitionID and batchID. When
> a stateful operator is executed, it is always able to construct the path with
> that information. It will download the checkpoint path from the path of the
> previous batchID and checkpoint to the path for this batchID. As mentioned
> earlier, this flexibility comes with a cost: there is no way to distinguish
> among checkpoints generated by different tasks rerunning for the same batchID.
>
> In this new design, every checkpoint will go to a globally unique file. The
> globally unique ID needs to be communicated between driver and the executors,
> and stored in commit logs. The basic workflow is shown as following:
> h3. File Structure Under Checkpoint Path
> Currently, a checkpoint is stored in path
> _<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog,
> zip]._ The path structure will look like following:
> __ 0 (operator ID)
> +----+
> | 0 (partitionID)
> +-----+
> | ……|
> | 1 (partitionID)
> +-----+
> | | - default (storeName)|
> | +-----+|
> | | 20.zip|
> | | 21.delta|
> | | 22.delta|
> | + 23.delta|
> | 2 (partitionID)
> +--- ……
>
> The general structure will be intact, and we only change how files in the
> root path. For example, in the example above, we will only change file names
> in the file names in blue color. Instead of naming the files _<batchID>.zip_
> or {_}<batchID>.delta{_}, we will name it {_}<batchID>{_}<uniqueID>.zip_ or
> {_}<batchID>{_}<uniqueID>.delta\{_}. It also means that for the same batchID,
> there could be multiple files. Here is an example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> The randomID will be generated by the executor itself. In the first version,
> it is just an UUID, which can be shortened later if needed. In the Appendix,
> we will discuss a design decision on how to generate these random IDs. These
> random numbers will be persistent in commit logs. When a new batch starts,
> these randomIDs will be passed to executors as a part of the operator
> executors. By the end of the batch, the IDs for the new checkpoints will be
> passed to the driver through an accumulator, where drivers can persist to the
> commit logs.
>
> The lineage information is always managed in commit logs, but the lineage is
> often needed in state store level when they download checkpoint or do
> cleanup. To make a state store self-contained, some lineage information is
> also stored in the .zip and .delta files themselves, so that in the level of
> a single state store, we can always finish all the operators without relying
> on outside information. This decision will be discussed in the appendix. Each
> delta file will contain a unique checkpoint ID since all (presumed) snapshot
> checkpoints, and all changelog checkpoints following it. Will discuss it in
> more detail.
>
> There are still extra complexities to this problem and we will discuss those
> issues in the following sections.
> 1. Recovery Case. The driver will tell executors which version to use, but
> how do executors find old snapshots and deltas to use before this version?
> # Cleanup old versions.
> h3. Commit Log Format Change
> We will add a new field to commit message {_}CommitMetadata{_}, and add a
> field, which is a {_}Map[Map[Seq[String]]]{_}, which represents
> operatorID→storeName→partitionID→checkpointUniqueID.
> h3. Checkpoint File Format Change
> In both changelog file and zip file, extra information on uniqueIDs since the
> presumed last snapshot will be written. It is a presumed snapshot because
> sometimes snapshotting can fail or be delayed so what in place of the
> presumed last snapshot is only a changelog. In those cases, we can still
> reconstruct further lineage by reading from that changelog file from the
> presumed snapshot. By keeping reading lineage from previous snapshots, the
> whole lineage can always be re-constructed.
>
> When we need to download a checkpoint, we need to find the latest snapshot
> checkpoint and all subsequence changelog files. Usually, this can be done by
> only reading the delta file corresponding to the target unique checkpoint ID.
> In the cases of snapshot uploading failure, we will need to keep reading more
> than one delta file.
>
> In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_},
> which contains a list of unique_IDs. These IDs represent unique IDs for
> version-1, version-2, etc.
>
> In the zip files, this list will be added to class
> {_}RocksDBCheckpointMetadata{_}.
>
> The information should always be available, as
> # When initially loading a state store, we always have lineage information
> loaded since the last snapshot.
> # When we add a new changelog checkpoint, we add one checkpoint ID to the
> list and it is still the full list.
> # We can prune the list saved in memory based on presumed snapshot versions.
>
> An alternative to change checkpoint file format is for the state store to
> read commit log files to get the lineage, which has shortcomes: 1. it has to
> read multiple commit logs which can be slow; 2. State store checkpoint
> directory won’t be self contained and the state store has to understand the
> commit log.
> h3. Passing UniqueID from Driver to Executors
> We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID
> to use. The field will be an array of strings. Each string is corresponding
> to a uniqueID for one partition. This struct is already serialized to the
> executors.
> h3. Passing UniqueID to Driver
> The uniqueID will be passed back in a similar approach as
> {_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.
> _StatefulOperator_ will have an optional accumulator, with which we receive
> all the uniqueIDs from all tasks. In some cases, we may receive more than one
> uniqueID for one partition. The driver can simply pick any of them as the one
> to commit. Note that it never happens when the executors pass Marlin’s end
> offset back, as only one attempt will be made. In microbatch mode, some of
> the IDs might be from failed tasks, and we might pick it. It should still be
> correct as even if the task fails, the checkpoint should still be valid to
> use. One potential problem is that the checkpoint we use may not match the
> output used in the sink. It is not a problem for now, as it happens today
> anyway without the problem. We can revisit the decision if we see such a
> problem in the future.
>
> We will pass back not just a uniqueID for the new checkpoint based on V, but
> also the unique ID used for V-1. This will help the driver to validate that
> lineage is as expected. This step is necessary for Micro-Batch Pipelining,
> and will also help with async state checkpointing. In both cases, the task
> may start without knowing the uniqueID it is based on, so it needs to make a
> guess. The driver needs to do an extra validation.
> h3. State Store Checkpointing
> When state store checkpointing starts, a unique ID (UUID) is generated, and
> stored in the state store provider. This ID is used to construct delta and
> zip file names for the checkpoint. The checkpoint procedure is mostly the
> same as today. The only thing is that we need to preserve the unique ID and
> the lineage information and make sure we use the correct one. Since the
> snapshot checkpoint is done asynchronously, we need to make an appropriate
> copy to them to make sure they are consistent with the snapshot.
>
> Checkpointing would look like following:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> Assuming a snapshot checkpointing is scheduled too, so later snapshotting
> will succeed and the snapshot file will show up:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> 23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> It is possible that {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.zip\{_} fails to be
> uploaded. In this case, we will stay with a delta file only and continue from
> there.
> The following delta files may only contain lineage up to Version, but we can
> further trace back to version 20 by reading
> {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.delta\{_}:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> It is possible that we have two executors doing a full snapshot. The one
> picked up for lineage failed but the other one succeeded. In an example,
> (Version=23, ID=8205c96f) got a full snapshot checkpoint. It will generate a
> result like this:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> But since 23_8205c96f is never referenced by lineage in either commit log or
> checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version
> 23, 23_689aa6bd.delta still is used.
>
> Note that there is a chance where the same executor executed the same
> partition of the same batchID twice and both were successful. This is common
> in the case of ForEachBatch. We need to make sure we name the snapshot
> correctly. Here is an example:
> # Executed version 23, picked uniqueID 689aa6bd and successfully uploaded
> file 23_689aa6bd.delta.
> # Make RocksDB checkpoint Ckp1.
> # Return to the driver, and 23_689aa6bd is chosen to be committed.
> # Executed version 23, picked uniqueID 8205c96f and successfully uploaded
> file 23_8205c96f.delta.
> # Make RocksDB checkpoint Ckp2.
> # Maintenance thread wakes up for snapshot checkpoints.
> The maintenance thread needs to upload two snapshots. CKp1 →
> 23_689aa6bd.zip, as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded
> as it doesn’t know which one is the one to be committed to the commit log. If
> it always uploads one, there is a chance that it always picks up the wrong
> one and we don’t have snapshot checkpointing for a long time, or ever.
> One potential optimization is for the snapshot to wait a little bit for the
> next batch to start, so that it knows it is 23_689aa6bd that is picked up, so
> that it can only upload 23_689aa6bd.zip and skip 23_8205c96f.delta. If the
> next batch doesn’t start timely or ever, it will upload both.
> h3. State Store Load
> As normal cases, most of the time, executors already have the open state
> store for the one to load. The executor just needs to validate the open state
> store is at the checkpoint ID that matches the one sent from the driver. If
> the ID matches, we already have the state store loaded. If not, we need to
> reload from the cloud storage.
>
> When we load a checkpoint from the state store, we first construct checkpoint
> names {_}<batchID>{_}<uniqueID>.zip_ and {_}<batchID>{_}<uniqueID>.delta\{_}.
> If the former file exists, we just load it. Otherwise, we load the former
> name, read the first record, which is supposed to be the lineage metadata.
> Using the metadata, we can download the last snapshot version and apply
> subsequent delta files.
>
> {*}Example 1{*}. The uniqueID has a snapshot.
> In this example, we are trying to load (version=23, uniqueID=689aa6bd).
> 20_d8e2ca47.delta
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> 23_689aa6bd.zip
> We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.
>
> {*}Example 2{*}. The uniqueID only contains a delta log file.
> If we still load (version=23, uiqueID=689aa6bd), but there is only a delta
> file:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> case, we will read lineage from 23_689aa6bd.delta and apply these files:
> 20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The
> outcome is the same even if there is snapshot file for the same version but
> with different checkpoint:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.
>
> {*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.
> In the case where snapshot uploading fails, we will need to continue tracing
> the lineage. In following example,
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> When we are loading (version=24, ID=32e3cc2a), we only have lineage up to
> 23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will
> open 23_689aa6bd.delta, and trace back files. Eventually, we will need to
> apply following files: 20_d8e2ca47.zip,21_f4d05ac9.delta,
> 22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.
> Note that, even if snapshot file shows up for a uniqueID, we will ignored
> them, and use the same
> h3. Compatibility
> The new format will not be consumed by previous releases. So the feature will
> be turned off by default for microbatch mode. Later we will switch the
> default to be on.
>
> Within the same release, we will allow users to switch between the V1 and V2.
> The driver will read the commit log and see whether uniqueID is available or
> not. If it is available, it is sent to the executors so that they can load
> checkpoints accordingly. Otherwise, it will assume it is V1. When switching
> mode, we will need to make sure that the first checkpoint is a full snapshot
> checkpoint synchronously. In this case, loading a checkpoint only needs to
> deal with files generated by either V1 or V2, not a mixture of the two.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]