[
https://issues.apache.org/jira/browse/SPARK-49374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Siying Dong updated SPARK-49374:
--------------------------------
Description:
h2. Motivation
We expect the new checkpoint structure would benefit micro-batch mode by
establishing characteristics of linear dependency between batch versions. Right
now, tasks can be executed multiple times for the same batchID (for speculative
execution or rerunning in ForEachBatch), and there can be multiple parallel
lineages of state stores going on. For example, in one of the issue with
ForEachBatch showed this lineage, which triggered a RocksDB file uploading bug:
Although we fixed all the bugs, this complexity always makes the system prone
to bugs. This non-linear lineage also presents a correctness risk, when some of
the Version is changelogs.
In the same example, suppose Version 17 is a Snapshot checkpoint and Version 18
is a changelog checkpoint. When we need to recover from a checkpoint, we need
to apply Version 17 and Version 18 together. However, Version 18 isn’t
generated on top of Version 17. This can happen either because Version 18 is
generated by a different worker from Version 17, or the same worker abandoned
Version 17, replay this batch and generated Version 17’. In most cases, it is
accurate, but we have identified some edge cases where users might be surprised
by the results, and there may already be correctness issues. See an example in
the
[Appendix|https://docs.google.com/document/d/1pT5bwW325VndVH09aZd7TykKXbBP5uCrNIQOC0uDoLc/edit#heading=h.ehd1najwmeos].
These correctness issues will become more prominent with transformWithState
due to the occurrence of partial updates. Note that the fixing state store
lineage only makes sure the state store is consistent, and doesn’t make sure
the state store is consistent with outputs to the sink. The inconsistency
between
Furthermore, the complex checkpoint version lineage makes it hard to reduce
overhead for old version cleanup. Even a long running state store has to read
metadata for all previous versions and list all files to do any cleanup safely,
which is expensive. It is necessary because any version can be written by a
different executor and references to RocksDB files that the current executor
isn’t aware of. In extreme cases, we can even corrupt the state store. The
chance that it happens is very low, but it’s not a good idea to leave unknown
correctness risk unfixed.
h2. Proposal sketch
The proposed checkpoint structure will ensure a linear dependency:
This stronger guarantee will be a good foundation for the problems above.
The proposal’s basic idea is to guarantee linear lineage by not allowing
checkpoint overwriting. All checkpoints are made with a new file name with a
uniqueID. When starting any batch, a task precisely identifies which checkpoint
to load with the uniqueID.
When a new state store checkpoint is generated, the checkpoint path name
includes a globally unique ID, so that it can never be updated. Here is an
example:
20_d8e2ca47.zip
21_ef6618c2.delta
21_f4d05ac9.delta
22_4489578d.delta
The name is stored in the commit log too. When the next batch is being
executed, those unique IDs are passed to the executors, where they make sure
they start to execute from this checkpoint. If the local state store isn’t in
this state, it will download the checkpoint from the cloud.
h1. Part II: Detailed design
h2. Architecture
Previously, a state store checkpoint was stored in a path that was only
determined by checkpoint root path, operatorID, partitionID and batchID. When a
stateful operator is executed, it is always able to construct the path with
that information. It will download the checkpoint path from the path of the
previous batchID and checkpoint to the path for this batchID. As mentioned
earlier, this flexibility comes with a cost: there is no way to distinguish
among checkpoints generated by different tasks rerunning for the same batchID.
In this new design, every checkpoint will go to a globally unique file. The
globally unique ID needs to be communicated between driver and the executors,
and stored in commit logs. The basic workflow is shown as following:
h3. File Structure Under Checkpoint Path
Currently, a checkpoint is stored in path
_<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog,
zip]._ The path structure will look like following:
__ 0 (operator ID)
+----+
| 0 (partitionID)
+-----+
| ……|
| 1 (partitionID)
+-----+
| | - default (storeName)|
| +-----+|
| | 20.zip|
| | 21.delta|
| | 22.delta|
| + 23.delta|
| 2 (partitionID)
+--- ……
The general structure will be intact, and we only change how files in the root
path. For example, in the example above, we will only change file names in the
file names in blue color. Instead of naming the files _<batchID>.zip_ or
{_}<batchID>.delta{_}, we will name it {_}<batchID>{_}<uniqueID>.zip_ or
{_}<batchID>{_}<uniqueID>.delta\{_}. It also means that for the same batchID,
there could be multiple files. Here is an example:
20_d8e2ca47.zip
21_ef6618c2.delta
21_f4d05ac9.delta
22_4489578d.delta
23_689aa6bd.delta
The randomID will be generated by the executor itself. In the first version, it
is just an UUID, which can be shortened later if needed. In the Appendix, we
will discuss a design decision on how to generate these random IDs. These
random numbers will be persistent in commit logs. When a new batch starts,
these randomIDs will be passed to executors as a part of the operator
executors. By the end of the batch, the IDs for the new checkpoints will be
passed to the driver through an accumulator, where drivers can persist to the
commit logs.
The lineage information is always managed in commit logs, but the lineage is
often needed in state store level when they download checkpoint or do cleanup.
To make a state store self-contained, some lineage information is also stored
in the .zip and .delta files themselves, so that in the level of a single state
store, we can always finish all the operators without relying on outside
information. This decision will be discussed in the appendix. Each delta file
will contain a unique checkpoint ID since all (presumed) snapshot checkpoints,
and all changelog checkpoints following it. Will discuss it in more detail.
There are still extra complexities to this problem and we will discuss those
issues in the following sections.
1. Recovery Case. The driver will tell executors which version to use, but how
do executors find old snapshots and deltas to use before this version?
# Cleanup old versions.
h3. Commit Log Format Change
We will add a new field to commit message {_}CommitMetadata{_}, and add a
field, which is a {_}Map[Map[Seq[String]]]{_}, which represents
operatorID→storeName→partitionID→checkpointUniqueID.
h3. Checkpoint File Format Change
In both changelog file and zip file, extra information on uniqueIDs since the
presumed last snapshot will be written. It is a presumed snapshot because
sometimes snapshotting can fail or be delayed so what in place of the presumed
last snapshot is only a changelog. In those cases, we can still reconstruct
further lineage by reading from that changelog file from the presumed snapshot.
By keeping reading lineage from previous snapshots, the whole lineage can
always be re-constructed.
When we need to download a checkpoint, we need to find the latest snapshot
checkpoint and all subsequence changelog files. Usually, this can be done by
only reading the delta file corresponding to the target unique checkpoint ID.
In the cases of snapshot uploading failure, we will need to keep reading more
than one delta file.
In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_}, which
contains a list of unique_IDs. These IDs represent unique IDs for version-1,
version-2, etc.
In the zip files, this list will be added to class
{_}RocksDBCheckpointMetadata{_}.
The information should always be available, as
# When initially loading a state store, we always have lineage information
loaded since the last snapshot.
# When we add a new changelog checkpoint, we add one checkpoint ID to the list
and it is still the full list.
# We can prune the list saved in memory based on presumed snapshot versions.
An alternative to change checkpoint file format is for the state store to read
commit log files to get the lineage, which has shortcomes: 1. it has to read
multiple commit logs which can be slow; 2. State store checkpoint directory
won’t be self contained and the state store has to understand the commit log.
h3. Passing UniqueID from Driver to Executors
We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID
to use. The field will be an array of strings. Each string is corresponding to
a uniqueID for one partition. This struct is already serialized to the
executors.
h3. Passing UniqueID to Driver
The uniqueID will be passed back in a similar approach as
{_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.
_StatefulOperator_ will have an optional accumulator, with which we receive all
the uniqueIDs from all tasks. In some cases, we may receive more than one
uniqueID for one partition. The driver can simply pick any of them as the one
to commit. Note that it never happens when the executors pass Marlin’s end
offset back, as only one attempt will be made. In microbatch mode, some of the
IDs might be from failed tasks, and we might pick it. It should still be
correct as even if the task fails, the checkpoint should still be valid to use.
One potential problem is that the checkpoint we use may not match the output
used in the sink. It is not a problem for now, as it happens today anyway
without the problem. We can revisit the decision if we see such a problem in
the future.
We will pass back not just a uniqueID for the new checkpoint based on V, but
also the unique ID used for V-1. This will help the driver to validate that
lineage is as expected. This step is necessary for Micro-Batch Pipelining, and
will also help with async state checkpointing. In both cases, the task may
start without knowing the uniqueID it is based on, so it needs to make a guess.
The driver needs to do an extra validation.
h3. State Store Checkpointing
When state store checkpointing starts, a unique ID (UUID) is generated, and
stored in the state store provider. This ID is used to construct delta and zip
file names for the checkpoint. The checkpoint procedure is mostly the same as
today. The only thing is that we need to preserve the unique ID and the lineage
information and make sure we use the correct one. Since the snapshot checkpoint
is done asynchronously, we need to make an appropriate copy to them to make
sure they are consistent with the snapshot.
Checkpointing would look like following:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
Assuming a snapshot checkpointing is scheduled too, so later snapshotting will
succeed and the snapshot file will show up:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
It is possible that {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.zip\{_} fails to be
uploaded. In this case, we will stay with a delta file only and continue from
there.
The following delta files may only contain lineage up to Version, but we can
further trace back to version 20 by reading
{_}23{_}{_}{{_}}689aa6bd{_}{{_}}.delta\{_}:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
24_32e3cc2a.delta (lineage in file: 689aa6bd)
It is possible that we have two executors doing a full snapshot. The one picked
up for lineage failed but the other one succeeded. In an example, (Version=23,
ID=8205c96f) got a full snapshot checkpoint. It will generate a result like
this:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
24_32e3cc2a.delta (lineage in file: 689aa6bd)
But since 23_8205c96f is never referenced by lineage in either commit log or
checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version
23, 23_689aa6bd.delta still is used.
Note that there is a chance where the same executor executed the same partition
of the same batchID twice and both were successful. This is common in the case
of ForEachBatch. We need to make sure we name the snapshot correctly. Here is
an example:
# Executed version 23, picked uniqueID 689aa6bd and successfully uploaded file
23_689aa6bd.delta.
# Make RocksDB checkpoint Ckp1.
# Return to the driver, and 23_689aa6bd is chosen to be committed.
# Executed version 23, picked uniqueID 8205c96f and successfully uploaded file
23_8205c96f.delta.
# Make RocksDB checkpoint Ckp2.
# Maintenance thread wakes up for snapshot checkpoints.
The maintenance thread needs to upload two snapshots. CKp1 → 23_689aa6bd.zip,
as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded as it doesn’t know
which one is the one to be committed to the commit log. If it always uploads
one, there is a chance that it always picks up the wrong one and we don’t have
snapshot checkpointing for a long time, or ever.
One potential optimization is for the snapshot to wait a little bit for the
next batch to start, so that it knows it is 23_689aa6bd that is picked up, so
that it can only upload 23_689aa6bd.zip and skip 23_8205c96f.delta. If the
next batch doesn’t start timely or ever, it will upload both.
h3. State Store Load
As normal cases, most of the time, executors already have the open state store
for the one to load. The executor just needs to validate the open state store
is at the checkpoint ID that matches the one sent from the driver. If the ID
matches, we already have the state store loaded. If not, we need to reload from
the cloud storage.
When we load a checkpoint from the state store, we first construct checkpoint
names {_}<batchID>{_}<uniqueID>.zip_ and {_}<batchID>{_}<uniqueID>.delta\{_}.
If the former file exists, we just load it. Otherwise, we load the former name,
read the first record, which is supposed to be the lineage metadata. Using the
metadata, we can download the last snapshot version and apply subsequent delta
files.
{*}Example 1{*}. The uniqueID has a snapshot.
In this example, we are trying to load (version=23, uniqueID=689aa6bd).
20_d8e2ca47.delta
20_d8e2ca47.zip
21_ef6618c2.delta
21_f4d05ac9.delta
22_4489578d.delta
23_689aa6bd.delta
23_689aa6bd.zip
We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.
{*}Example 2{*}. The uniqueID only contains a delta log file.
If we still load (version=23, uiqueID=689aa6bd), but there is only a delta file:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
case, we will read lineage from 23_689aa6bd.delta and apply these files:
20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The
outcome is the same even if there is snapshot file for the same version but
with different checkpoint:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.
{*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.
In the case where snapshot uploading fails, we will need to continue tracing
the lineage. In following example,
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
24_32e3cc2a.delta (lineage in file: 689aa6bd)
When we are loading (version=24, ID=32e3cc2a), we only have lineage up to
23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will
open 23_689aa6bd.delta, and trace back files. Eventually, we will need to apply
following files: 20_d8e2ca47.zip,21_f4d05ac9.delta,
22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.
Note that, even if snapshot file shows up for a uniqueID, we will ignored them,
and use the same
h3. Compatibility
The new format will not be consumed by previous releases. So the feature will
be turned off by default for microbatch mode. Later we will switch the default
to be on.
Within the same release, we will allow users to switch between the V1 and V2.
The driver will read the commit log and see whether uniqueID is available or
not. If it is available, it is sent to the executors so that they can load
checkpoints accordingly. Otherwise, it will assume it is V1. When switching
mode, we will need to make sure that the first checkpoint is a full snapshot
checkpoint synchronously. In this case, loading a checkpoint only needs to deal
with files generated by either V1 or V2, not a mixture of the two.
was:
h2. Motivation
We expect the new checkpoint structure would benefit micro-batch mode by
establishing characteristics of linear dependency between batch versions. Right
now, tasks can be executed multiple times for the same batchID (for speculative
execution or rerunning in ForEachBatch), and there can be multiple parallel
lineages of state stores going on. For example, in one of the issue with
ForEachBatch showed this lineage, which triggered a RocksDB file uploading bug:
Although we fixed all the bugs, this complexity always makes the system prone
to bugs. This non-linear lineage also presents a correctness risk, when some of
the Version is changelogs.
In the same example, suppose Version 17 is a Snapshot checkpoint and Version 18
is a changelog checkpoint. When we need to recover from a checkpoint, we need
to apply Version 17 and Version 18 together. However, Version 18 isn’t
generated on top of Version 17. This can happen either because Version 18 is
generated by a different worker from Version 17, or the same worker abandoned
Version 17, replay this batch and generated Version 17’. In most cases, it is
accurate, but we have identified some edge cases where users might be surprised
by the results, and there may already be correctness issues. See an example in
the
[Appendix|https://docs.google.com/document/d/1pT5bwW325VndVH09aZd7TykKXbBP5uCrNIQOC0uDoLc/edit#heading=h.ehd1najwmeos].
These correctness issues will become more prominent with transformWithState
due to the occurrence of partial updates. Note that the fixing state store
lineage only makes sure the state store is consistent, and doesn’t make sure
the state store is consistent with outputs to the sink. The inconsistency
between
Furthermore, the complex checkpoint version lineage makes it hard to reduce
overhead for old version cleanup. Even a long running state store has to read
metadata for all previous versions and list all files to do any cleanup safely,
which is expensive. It is necessary because any version can be written by a
different executor and references to RocksDB files that the current executor
isn’t aware of. In extreme cases, we can even corrupt the state store. The
chance that it happens is very low, but it’s not a good idea to leave unknown
correctness risk unfixed.
h2. Proposal sketch
The proposed checkpoint structure will ensure a linear dependency:
This stronger guarantee will be a good foundation for the problems above.
The proposal’s basic idea is to guarantee linear lineage by not allowing
checkpoint overwriting. All checkpoints are made with a new file name with a
uniqueID. When starting any batch, a task precisely identifies which checkpoint
to load with the uniqueID.
When a new state store checkpoint is generated, the checkpoint path name
includes a globally unique ID, so that it can never be updated. Here is an
example:
20_d8e2ca47.zip
21_ef6618c2.delta
21_f4d05ac9.delta
22_4489578d.delta
The name is stored in the commit log too. When the next batch is being
executed, those unique IDs are passed to the executors, where they make sure
they start to execute from this checkpoint. If the local state store isn’t in
this state, it will download the checkpoint from the cloud.
h1. Part II: Detailed design
h2. Architecture
Previously, a state store checkpoint was stored in a path that was only
determined by checkpoint root path, operatorID, partitionID and batchID. When a
stateful operator is executed, it is always able to construct the path with
that information. It will download the checkpoint path from the path of the
previous batchID and checkpoint to the path for this batchID. As mentioned
earlier, this flexibility comes with a cost: there is no way to distinguish
among checkpoints generated by different tasks rerunning for the same batchID.
In this new design, every checkpoint will go to a globally unique file. The
globally unique ID needs to be communicated between driver and the executors,
and stored in commit logs. The basic workflow is shown as following:
h3. File Structure Under Checkpoint Path
Currently, a checkpoint is stored in path
_<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog,
zip]._ The path structure will look like following:
__ 0 (operator ID)
+----+
| 0 (partitionID)
+-----+
| ……
| 1 (partitionID)
+-----+
| |- default (storeName)
| +-----+
| | 20.zip
| | 21.delta
| | 22.delta
| + 23.delta
| 2 (partitionID)
+--- ……
The general structure will be intact, and we only change how files in the root
path. For example, in the example above, we will only change file names in the
file names in blue color. Instead of naming the files _<batchID>.zip_ or
{_}<batchID>.delta{_}, we will name it _<batchID>_<uniqueID>.zip_ or
{_}<batchID>_<uniqueID>.delta{_}. It also means that for the same batchID,
there could be multiple files. Here is an example:
20_d8e2ca47.zip
21_ef6618c2.delta
21_f4d05ac9.delta
22_4489578d.delta
23_689aa6bd.delta
The randomID will be generated by the executor itself. In the first version, it
is just an UUID, which can be shortened later if needed. In the Appendix, we
will discuss a design decision on how to generate these random IDs. These
random numbers will be persistent in commit logs. When a new batch starts,
these randomIDs will be passed to executors as a part of the operator
executors. By the end of the batch, the IDs for the new checkpoints will be
passed to the driver through an accumulator, where drivers can persist to the
commit logs.
The lineage information is always managed in commit logs, but the lineage is
often needed in state store level when they download checkpoint or do cleanup.
To make a state store self-contained, some lineage information is also stored
in the .zip and .delta files themselves, so that in the level of a single state
store, we can always finish all the operators without relying on outside
information. This decision will be discussed in the appendix. Each delta file
will contain a unique checkpoint ID since all (presumed) snapshot checkpoints,
and all changelog checkpoints following it. Will discuss it in more detail.
There are still extra complexities to this problem and we will discuss those
issues in the following sections.
1. Recovery Case. The driver will tell executors which version to use, but how
do executors find old snapshots and deltas to use before this version?
# Cleanup old versions.
h3. Commit Log Format Change
We will add a new field to commit message {_}CommitMetadata{_}, and add a
field, which is a {_}Map[Map[Seq[String]]]{_}, which represents
operatorID→storeName→partitionID→checkpointUniqueID.
h3. Checkpoint File Format Change
In both changelog file and zip file, extra information on uniqueIDs since the
presumed last snapshot will be written. It is a presumed snapshot because
sometimes snapshotting can fail or be delayed so what in place of the presumed
last snapshot is only a changelog. In those cases, we can still reconstruct
further lineage by reading from that changelog file from the presumed snapshot.
By keeping reading lineage from previous snapshots, the whole lineage can
always be re-constructed.
When we need to download a checkpoint, we need to find the latest snapshot
checkpoint and all subsequence changelog files. Usually, this can be done by
only reading the delta file corresponding to the target unique checkpoint ID.
In the cases of snapshot uploading failure, we will need to keep reading more
than one delta file.
In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_}, which
contains a list of unique_IDs. These IDs represent unique IDs for version-1,
version-2, etc.
In the zip files, this list will be added to class
{_}RocksDBCheckpointMetadata{_}.
The information should always be available, as
# When initially loading a state store, we always have lineage information
loaded since the last snapshot.
# When we add a new changelog checkpoint, we add one checkpoint ID to the list
and it is still the full list.
# We can prune the list saved in memory based on presumed snapshot versions.
An alternative to change checkpoint file format is for the state store to read
commit log files to get the lineage, which has shortcomes: 1. it has to read
multiple commit logs which can be slow; 2. State store checkpoint directory
won’t be self contained and the state store has to understand the commit log.
h3. Passing UniqueID from Driver to Executors
We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID
to use. The field will be an array of strings. Each string is corresponding to
a uniqueID for one partition. This struct is already serialized to the
executors.
h3. Passing UniqueID to Driver
The uniqueID will be passed back in a similar approach as
{_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.
_StatefulOperator_ will have an optional accumulator, with which we receive all
the uniqueIDs from all tasks. In some cases, we may receive more than one
uniqueID for one partition. The driver can simply pick any of them as the one
to commit. Note that it never happens when the executors pass Marlin’s end
offset back, as only one attempt will be made. In microbatch mode, some of the
IDs might be from failed tasks, and we might pick it. It should still be
correct as even if the task fails, the checkpoint should still be valid to use.
One potential problem is that the checkpoint we use may not match the output
used in the sink. It is not a problem for now, as it happens today anyway
without the problem. We can revisit the decision if we see such a problem in
the future.
We will pass back not just a uniqueID for the new checkpoint based on V, but
also the unique ID used for V-1. This will help the driver to validate that
lineage is as expected. This step is necessary for Micro-Batch Pipelining, and
will also help with async state checkpointing. In both cases, the task may
start without knowing the uniqueID it is based on, so it needs to make a guess.
The driver needs to do an extra validation.
h3. State Store Checkpointing
When state store checkpointing starts, a unique ID (UUID) is generated, and
stored in the state store provider. This ID is used to construct delta and zip
file names for the checkpoint. The checkpoint procedure is mostly the same as
today. The only thing is that we need to preserve the unique ID and the lineage
information and make sure we use the correct one. Since the snapshot checkpoint
is done asynchronously, we need to make an appropriate copy to them to make
sure they are consistent with the snapshot.
Checkpointing would look like following:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
Assuming a snapshot checkpointing is scheduled too, so later snapshotting will
succeed and the snapshot file will show up:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
It is possible that {_}23_{_}{_}689aa6bd{_}{_}.zip{_} fails to be uploaded. In
this case, we will stay with a delta file only and continue from there.
The following delta files may only contain lineage up to Version, but we can
further trace back to version 20 by reading {_}23_{_}{_}689aa6bd{_}{_}.delta{_}:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
24_32e3cc2a.delta (lineage in file: 689aa6bd)
It is possible that we have two executors doing a full snapshot. The one picked
up for lineage failed but the other one succeeded. In an example, (Version=23,
ID=8205c96f) got a full snapshot checkpoint. It will generate a result like
this:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
24_32e3cc2a.delta (lineage in file: 689aa6bd)
But since 23_8205c96f is never referenced by lineage in either commit log or
checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version
23, 23_689aa6bd.delta still is used.
Note that there is a chance where the same executor executed the same partition
of the same batchID twice and both were successful. This is common in the case
of ForEachBatch. We need to make sure we name the snapshot correctly. Here is
an example:
# Executed version 23, picked uniqueID 689aa6bd and successfully uploaded file
23_689aa6bd.delta.
# Make RocksDB checkpoint Ckp1.
# Return to the driver, and 23_689aa6bd is chosen to be committed.
# Executed version 23, picked uniqueID 8205c96f and successfully uploaded file
23_8205c96f.delta.
# Make RocksDB checkpoint Ckp2.
# Maintenance thread wakes up for snapshot checkpoints.
The maintenance thread needs to upload two snapshots. CKp1 → 23_689aa6bd.zip,
as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded as it doesn’t know
which one is the one to be committed to the commit log. If it always uploads
one, there is a chance that it always picks up the wrong one and we don’t have
snapshot checkpointing for a long time, or ever.
One potential optimization is for the snapshot to wait a little bit for the
next batch to start, so that it knows it is 23_689aa6bd that is picked up, so
that it can only upload 23_689aa6bd.zip and skip 23_8205c96f.delta. If the
next batch doesn’t start timely or ever, it will upload both.
h3. State Store Load
As normal cases, most of the time, executors already have the open state store
for the one to load. The executor just needs to validate the open state store
is at the checkpoint ID that matches the one sent from the driver. If the ID
matches, we already have the state store loaded. If not, we need to reload from
the cloud storage.
When we load a checkpoint from the state store, we first construct checkpoint
names _<batchID>_<uniqueID>.zip_ and {_}<batchID>_<uniqueID>.delta{_}. If the
former file exists, we just load it. Otherwise, we load the former name, read
the first record, which is supposed to be the lineage metadata. Using the
metadata, we can download the last snapshot version and apply subsequent delta
files.
{*}Example 1{*}. The uniqueID has a snapshot.
In this example, we are trying to load (version=23, uniqueID=689aa6bd).
20_d8e2ca47.delta
20_d8e2ca47.zip
21_ef6618c2.delta
21_f4d05ac9.delta
22_4489578d.delta
23_689aa6bd.delta
23_689aa6bd.zip
We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.
{*}Example 2{*}. The uniqueID only contains a delta log file.
If we still load (version=23, uiqueID=689aa6bd), but there is only a delta file:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
case, we will read lineage from 23_689aa6bd.delta and apply these files:
20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The
outcome is the same even if there is snapshot file for the same version but
with different checkpoint:
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.
{*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.
In the case where snapshot uploading fails, we will need to continue tracing
the lineage. In following example,
20_d8e2ca47.delta (lineage in file: …)
20_d8e2ca47.zip (lineage in file: …)
21_ef6618c2.delta (lineage in file: d8e2ca47)
21_f4d05ac9.delta (lineage in file: d8e2ca47)
22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
24_32e3cc2a.delta (lineage in file: 689aa6bd)
When we are loading (version=24, ID=32e3cc2a), we only have lineage up to
23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will
open 23_689aa6bd.delta, and trace back files. Eventually, we will need to apply
following files: 20_d8e2ca47.zip,21_f4d05ac9.delta,
22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.
Note that, even if snapshot file shows up for a uniqueID, we will ignored them,
and use the same
h3. Compatibility
The new format will not be consumed by previous releases. So the feature will
be turned off by default for microbatch mode. Later we will switch the default
to be on.
Within the same release, we will allow users to switch between the V1 and V2.
The driver will read the commit log and see whether uniqueID is available or
not. If it is available, it is sent to the executors so that they can load
checkpoints accordingly. Otherwise, it will assume it is V1. When switching
mode, we will need to make sure that the first checkpoint is a full snapshot
checkpoint synchronously. In this case, loading a checkpoint only needs to deal
with files generated by either V1 or V2, not a mixture of the two.
> RocksDB State Store Checkpoint Structure V2
> -------------------------------------------
>
> Key: SPARK-49374
> URL: https://issues.apache.org/jira/browse/SPARK-49374
> Project: Spark
> Issue Type: Task
> Components: Structured Streaming
> Affects Versions: 4.0.0
> Reporter: Siying Dong
> Priority: Major
> Attachments: image-2024-08-23-14-28-56-418.png
>
>
> h2. Motivation
> We expect the new checkpoint structure would benefit micro-batch mode by
> establishing characteristics of linear dependency between batch versions.
> Right now, tasks can be executed multiple times for the same batchID (for
> speculative execution or rerunning in ForEachBatch), and there can be
> multiple parallel lineages of state stores going on. For example, in one of
> the issue with ForEachBatch showed this lineage, which triggered a RocksDB
> file uploading bug:
>
>
> Although we fixed all the bugs, this complexity always makes the system prone
> to bugs. This non-linear lineage also presents a correctness risk, when some
> of the Version is changelogs.
> In the same example, suppose Version 17 is a Snapshot checkpoint and Version
> 18 is a changelog checkpoint. When we need to recover from a checkpoint, we
> need to apply Version 17 and Version 18 together. However, Version 18 isn’t
> generated on top of Version 17. This can happen either because Version 18 is
> generated by a different worker from Version 17, or the same worker abandoned
> Version 17, replay this batch and generated Version 17’. In most cases, it is
> accurate, but we have identified some edge cases where users might be
> surprised by the results, and there may already be correctness issues. See an
> example in the
> [Appendix|https://docs.google.com/document/d/1pT5bwW325VndVH09aZd7TykKXbBP5uCrNIQOC0uDoLc/edit#heading=h.ehd1najwmeos].
> These correctness issues will become more prominent with transformWithState
> due to the occurrence of partial updates. Note that the fixing state store
> lineage only makes sure the state store is consistent, and doesn’t make sure
> the state store is consistent with outputs to the sink. The inconsistency
> between
>
> Furthermore, the complex checkpoint version lineage makes it hard to reduce
> overhead for old version cleanup. Even a long running state store has to read
> metadata for all previous versions and list all files to do any cleanup
> safely, which is expensive. It is necessary because any version can be
> written by a different executor and references to RocksDB files that the
> current executor isn’t aware of. In extreme cases, we can even corrupt the
> state store. The chance that it happens is very low, but it’s not a good idea
> to leave unknown correctness risk unfixed.
> h2. Proposal sketch
> The proposed checkpoint structure will ensure a linear dependency:
> This stronger guarantee will be a good foundation for the problems above.
>
> The proposal’s basic idea is to guarantee linear lineage by not allowing
> checkpoint overwriting. All checkpoints are made with a new file name with a
> uniqueID. When starting any batch, a task precisely identifies which
> checkpoint to load with the uniqueID.
> When a new state store checkpoint is generated, the checkpoint path name
> includes a globally unique ID, so that it can never be updated. Here is an
> example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> The name is stored in the commit log too. When the next batch is being
> executed, those unique IDs are passed to the executors, where they make sure
> they start to execute from this checkpoint. If the local state store isn’t in
> this state, it will download the checkpoint from the cloud.
> h1. Part II: Detailed design
> h2. Architecture
> Previously, a state store checkpoint was stored in a path that was only
> determined by checkpoint root path, operatorID, partitionID and batchID. When
> a stateful operator is executed, it is always able to construct the path with
> that information. It will download the checkpoint path from the path of the
> previous batchID and checkpoint to the path for this batchID. As mentioned
> earlier, this flexibility comes with a cost: there is no way to distinguish
> among checkpoints generated by different tasks rerunning for the same batchID.
>
> In this new design, every checkpoint will go to a globally unique file. The
> globally unique ID needs to be communicated between driver and the executors,
> and stored in commit logs. The basic workflow is shown as following:
> h3. File Structure Under Checkpoint Path
> Currently, a checkpoint is stored in path
> _<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog,
> zip]._ The path structure will look like following:
> __ 0 (operator ID)
> +----+
> | 0 (partitionID)
> +-----+
> | ……|
> | 1 (partitionID)
> +-----+
> | | - default (storeName)|
> | +-----+|
> | | 20.zip|
> | | 21.delta|
> | | 22.delta|
> | + 23.delta|
> | 2 (partitionID)
> +--- ……
>
> The general structure will be intact, and we only change how files in the
> root path. For example, in the example above, we will only change file names
> in the file names in blue color. Instead of naming the files _<batchID>.zip_
> or {_}<batchID>.delta{_}, we will name it {_}<batchID>{_}<uniqueID>.zip_ or
> {_}<batchID>{_}<uniqueID>.delta\{_}. It also means that for the same batchID,
> there could be multiple files. Here is an example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> The randomID will be generated by the executor itself. In the first version,
> it is just an UUID, which can be shortened later if needed. In the Appendix,
> we will discuss a design decision on how to generate these random IDs. These
> random numbers will be persistent in commit logs. When a new batch starts,
> these randomIDs will be passed to executors as a part of the operator
> executors. By the end of the batch, the IDs for the new checkpoints will be
> passed to the driver through an accumulator, where drivers can persist to the
> commit logs.
>
> The lineage information is always managed in commit logs, but the lineage is
> often needed in state store level when they download checkpoint or do
> cleanup. To make a state store self-contained, some lineage information is
> also stored in the .zip and .delta files themselves, so that in the level of
> a single state store, we can always finish all the operators without relying
> on outside information. This decision will be discussed in the appendix. Each
> delta file will contain a unique checkpoint ID since all (presumed) snapshot
> checkpoints, and all changelog checkpoints following it. Will discuss it in
> more detail.
>
> There are still extra complexities to this problem and we will discuss those
> issues in the following sections.
> 1. Recovery Case. The driver will tell executors which version to use, but
> how do executors find old snapshots and deltas to use before this version?
> # Cleanup old versions.
> h3. Commit Log Format Change
> We will add a new field to commit message {_}CommitMetadata{_}, and add a
> field, which is a {_}Map[Map[Seq[String]]]{_}, which represents
> operatorID→storeName→partitionID→checkpointUniqueID.
> h3. Checkpoint File Format Change
> In both changelog file and zip file, extra information on uniqueIDs since the
> presumed last snapshot will be written. It is a presumed snapshot because
> sometimes snapshotting can fail or be delayed so what in place of the
> presumed last snapshot is only a changelog. In those cases, we can still
> reconstruct further lineage by reading from that changelog file from the
> presumed snapshot. By keeping reading lineage from previous snapshots, the
> whole lineage can always be re-constructed.
>
> When we need to download a checkpoint, we need to find the latest snapshot
> checkpoint and all subsequence changelog files. Usually, this can be done by
> only reading the delta file corresponding to the target unique checkpoint ID.
> In the cases of snapshot uploading failure, we will need to keep reading more
> than one delta file.
>
> In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_},
> which contains a list of unique_IDs. These IDs represent unique IDs for
> version-1, version-2, etc.
>
> In the zip files, this list will be added to class
> {_}RocksDBCheckpointMetadata{_}.
>
> The information should always be available, as
> # When initially loading a state store, we always have lineage information
> loaded since the last snapshot.
> # When we add a new changelog checkpoint, we add one checkpoint ID to the
> list and it is still the full list.
> # We can prune the list saved in memory based on presumed snapshot versions.
>
> An alternative to change checkpoint file format is for the state store to
> read commit log files to get the lineage, which has shortcomes: 1. it has to
> read multiple commit logs which can be slow; 2. State store checkpoint
> directory won’t be self contained and the state store has to understand the
> commit log.
> h3. Passing UniqueID from Driver to Executors
> We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID
> to use. The field will be an array of strings. Each string is corresponding
> to a uniqueID for one partition. This struct is already serialized to the
> executors.
> h3. Passing UniqueID to Driver
> The uniqueID will be passed back in a similar approach as
> {_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.
> _StatefulOperator_ will have an optional accumulator, with which we receive
> all the uniqueIDs from all tasks. In some cases, we may receive more than one
> uniqueID for one partition. The driver can simply pick any of them as the one
> to commit. Note that it never happens when the executors pass Marlin’s end
> offset back, as only one attempt will be made. In microbatch mode, some of
> the IDs might be from failed tasks, and we might pick it. It should still be
> correct as even if the task fails, the checkpoint should still be valid to
> use. One potential problem is that the checkpoint we use may not match the
> output used in the sink. It is not a problem for now, as it happens today
> anyway without the problem. We can revisit the decision if we see such a
> problem in the future.
>
> We will pass back not just a uniqueID for the new checkpoint based on V, but
> also the unique ID used for V-1. This will help the driver to validate that
> lineage is as expected. This step is necessary for Micro-Batch Pipelining,
> and will also help with async state checkpointing. In both cases, the task
> may start without knowing the uniqueID it is based on, so it needs to make a
> guess. The driver needs to do an extra validation.
> h3. State Store Checkpointing
> When state store checkpointing starts, a unique ID (UUID) is generated, and
> stored in the state store provider. This ID is used to construct delta and
> zip file names for the checkpoint. The checkpoint procedure is mostly the
> same as today. The only thing is that we need to preserve the unique ID and
> the lineage information and make sure we use the correct one. Since the
> snapshot checkpoint is done asynchronously, we need to make an appropriate
> copy to them to make sure they are consistent with the snapshot.
>
> Checkpointing would look like following:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> Assuming a snapshot checkpointing is scheduled too, so later snapshotting
> will succeed and the snapshot file will show up:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> 23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> It is possible that {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.zip\{_} fails to be
> uploaded. In this case, we will stay with a delta file only and continue from
> there.
> The following delta files may only contain lineage up to Version, but we can
> further trace back to version 20 by reading
> {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.delta\{_}:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> It is possible that we have two executors doing a full snapshot. The one
> picked up for lineage failed but the other one succeeded. In an example,
> (Version=23, ID=8205c96f) got a full snapshot checkpoint. It will generate a
> result like this:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> But since 23_8205c96f is never referenced by lineage in either commit log or
> checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version
> 23, 23_689aa6bd.delta still is used.
>
> Note that there is a chance where the same executor executed the same
> partition of the same batchID twice and both were successful. This is common
> in the case of ForEachBatch. We need to make sure we name the snapshot
> correctly. Here is an example:
> # Executed version 23, picked uniqueID 689aa6bd and successfully uploaded
> file 23_689aa6bd.delta.
> # Make RocksDB checkpoint Ckp1.
> # Return to the driver, and 23_689aa6bd is chosen to be committed.
> # Executed version 23, picked uniqueID 8205c96f and successfully uploaded
> file 23_8205c96f.delta.
> # Make RocksDB checkpoint Ckp2.
> # Maintenance thread wakes up for snapshot checkpoints.
> The maintenance thread needs to upload two snapshots. CKp1 →
> 23_689aa6bd.zip, as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded
> as it doesn’t know which one is the one to be committed to the commit log. If
> it always uploads one, there is a chance that it always picks up the wrong
> one and we don’t have snapshot checkpointing for a long time, or ever.
> One potential optimization is for the snapshot to wait a little bit for the
> next batch to start, so that it knows it is 23_689aa6bd that is picked up, so
> that it can only upload 23_689aa6bd.zip and skip 23_8205c96f.delta. If the
> next batch doesn’t start timely or ever, it will upload both.
> h3. State Store Load
> As normal cases, most of the time, executors already have the open state
> store for the one to load. The executor just needs to validate the open state
> store is at the checkpoint ID that matches the one sent from the driver. If
> the ID matches, we already have the state store loaded. If not, we need to
> reload from the cloud storage.
>
> When we load a checkpoint from the state store, we first construct checkpoint
> names {_}<batchID>{_}<uniqueID>.zip_ and {_}<batchID>{_}<uniqueID>.delta\{_}.
> If the former file exists, we just load it. Otherwise, we load the former
> name, read the first record, which is supposed to be the lineage metadata.
> Using the metadata, we can download the last snapshot version and apply
> subsequent delta files.
>
> {*}Example 1{*}. The uniqueID has a snapshot.
> In this example, we are trying to load (version=23, uniqueID=689aa6bd).
> 20_d8e2ca47.delta
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> 23_689aa6bd.zip
> We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.
>
> {*}Example 2{*}. The uniqueID only contains a delta log file.
> If we still load (version=23, uiqueID=689aa6bd), but there is only a delta
> file:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> case, we will read lineage from 23_689aa6bd.delta and apply these files:
> 20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The
> outcome is the same even if there is snapshot file for the same version but
> with different checkpoint:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.
>
> {*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.
> In the case where snapshot uploading fails, we will need to continue tracing
> the lineage. In following example,
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> When we are loading (version=24, ID=32e3cc2a), we only have lineage up to
> 23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will
> open 23_689aa6bd.delta, and trace back files. Eventually, we will need to
> apply following files: 20_d8e2ca47.zip,21_f4d05ac9.delta,
> 22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.
> Note that, even if snapshot file shows up for a uniqueID, we will ignored
> them, and use the same
> h3. Compatibility
> The new format will not be consumed by previous releases. So the feature will
> be turned off by default for microbatch mode. Later we will switch the
> default to be on.
>
> Within the same release, we will allow users to switch between the V1 and V2.
> The driver will read the commit log and see whether uniqueID is available or
> not. If it is available, it is sent to the executors so that they can load
> checkpoints accordingly. Otherwise, it will assume it is V1. When switching
> mode, we will need to make sure that the first checkpoint is a full snapshot
> checkpoint synchronously. In this case, loading a checkpoint only needs to
> deal with files generated by either V1 or V2, not a mixture of the two.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]