[
https://issues.apache.org/jira/browse/IGNITE-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirill Tkalenko reassigned IGNITE-17990:
----------------------------------------
Assignee: (was: Kirill Tkalenko)
> Download RAFT snapshot without deleting original partition data
> ---------------------------------------------------------------
>
> Key: IGNITE-17990
> URL: https://issues.apache.org/jira/browse/IGNITE-17990
> Project: Ignite
> Issue Type: Improvement
> Reporter: Ivan Bessonov
> Priority: Major
> Labels: ignite-3
> Fix For: 3.0.0-beta2
>
>
> h3. Description
> In first design, full rebalance is implemented this way:
> * we drop partition data
> * we download partition data from the leader
> * we're done
> There's a problem with this approach - if download part failed, we lost one
> follower. This is bad, because technically new leader may have more data in
> the log and it could have uploaded it the follower, but now it makes no sense.
> Not only can it lead to hard-to-catch errors and introducing custom code to
> JRaft, it's also an unconditional data deletion without neither explicit
> user approval nor a copy of the data preserved durably.
> Such implementation is fine for POC and some tests, but it cannot be allowed
> in the release version of the product.
> h3. New proposed solution
> As trivial as it may seem, new solution is to _not deleting data_ before
> snapshot is fully downloaded and ready for swap. Why is it trivial? Because
> this is literally what RAFT demands to be done.
> Of course, there's a {*}but{*}. Snapshot application, when it's downloaded,
> should be {{O(1)}} when it comes to the number of rows in the partition and a
> number of transactions in a tx state store. This may not be fully achievable,
> depending on the implementation that we chose, more on that later.
> Following sections will describe all my concerns and possible
> implementations. Some sections can be skipped while reading. For example, if
> you're not interested in a specific storage engine, but want to read
> everything else.
> h3. TX state storage
> There's one really good thing about TX state storage. It has no storage
> engine, there's only a single RocksDB-based implementation. This makes
> possible the following approach:
> * when we stream data, we can write it into a SST file, almost like in
> snapshots of meta-storage ans CMG storages
> * once snapshot is downloaded, we ingest it into a storage
> What I like about this solution is that it's very simple. But, there are
> concerns:
> * ingesting leads to additional implicit data flush. Maybe it can be
> avoided, more on that later
> * it's not clear whether RocksDB creates a copy of SST file or not. I would
> assume that it does, because the file might be in other folder or on another
> device, for example. Although copying files is fast, it still takes time. Add
> to this a time required for the flush and we see a problem - operation may
> become unnecessarily long
> For these reasons, I don't think that such solution should be implemented.
> The point of this description was to show, that I thought about this
> alternative and consciously decided to use another one.
> I believe that TX state storage should use the same approach as a
> RocksDB-based partition storage. Its description can be found later in this
> issue.
> h3. MV storage - Test engine
> Test uses concurrent skip-list map for MV data and a bunch of other maps for
> indexes. While snapshots is being downloaded, we should insert all data into
> new maps, that have the same structure. In the end, we should have two
> versions of the partition: old and new.
> {{onSnapshotLoad}} should just swap all objects. After that, old partition
> data can be cleaned by the garbage collector.
> This is a good place to start implementation. I assume that some new API will
> be introduced. I have thoughts about it as well, they are described later in
> *API* section.
> h3. MV storage - RocksDB engine
> SST-based approach is described in a *TX state storage* section. There I
> describe why I don't think that this is a good solution. Same reasoning can
> be applied here just as effectively. This means that we should write data in
> the same RocksDB instance. This is a little bit tricky.
> The reason is that all stored data is merged together, and Columns Families
> are shared between different partitions. This makes it harder to find a place
> to write partition data while old partition data persists. As a reminder and
> an example, let's take a look at how data is stored in row storage:
> {code:java}
> +-------------------+-----+-------------------+
> | Partition 0 | ... | Partition MAX |
> +-------------------+-----+-------------------+
> | Row1 | ... | RowN | ... | Row1 | ... | RowN |
> +-------------------+-----+-------------------+{code}
> Logically, CF is split into a different "blocks", and each block represents a
> partition. Currently, each partition block is associated with an 2-bytes
> identifier that matches a partition number in Big Endian.
>
> We could add new CF with similar structure and write snapshot data in it, but
> then the snapshot load operation would require us to move data from one CF to
> another. The only option that I know of, that can do this, is SST ingestion.
> And I already explained why I don't like it.
> This leaves us with the necessity to write data into the same column family.
> Naturally occurring solution is to assign a new identifier to the "new"
> version of partition. This way replacing "old" partition with "new" would be
> implemented by replacing "oldPartId" to "newPartId" in table storage metadata.
> Sounds good. No flush is required, snapshot loading becomes pretty fast.
> The only thing to keep in mind is that there are multiple column families in
> each partition - row data, hash indexes and a CF for every sorted index.
> When "old" partition is deleted, we should probably somehow hint that RocksDB
> should merge some layers and remove a substantial amount of data from disk.
> But such optimization also relates to general partition eviction and is out
> of scope of the discussion.
> Last point: what is "oldPartId" and "newPartId"?
> As badly as I would want to avoid adding new bytes to keys, we should
> probably replace 2-bytes partition number to a 3-bytes. I propose the
> following:
> {code:java}
> | MSB | LSB | Generation |{code}
> Where MSB is a most significant byte of the partition number, LSB - least
> significant byte, and Generation is a 1-byte counter. Every time we need a
> rebalance, we increase the generation of the current partition.
>
> There are alternatives - we either have 256 possible generations (rotating,
> of course) or only 2 (0 and 1). Why is this important?
> Every time a partition is "started", it should, technically, perform a
> cleanup. Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we
> would have to cleanup following ranges (lower bound inclusive, upper bound
> exclusive):
> * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and
> {{(0,23,g+1):(0,24,0)}}
> * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}}
> This should be done for all indexes as well. Something tells me that recovery
> will be somewhat faster in the second case, but is it more convenient? I
> don't know.
> Lastly, why is it ok to add a byte to the prefix? Don't we increase a
> footprint? Yes and no. Right now the engine is not yet properly tuned, but in
> the future, we may set it up in such a way that RocksDB trims prefixes from
> the keys, so the size is kinda irrelevant. Will we configure a prefix to be a
> partition id or a pair (partId, rowId) - I don't know yet. Both options look
> good, but second may be better. We'll should do both and benchmark them.
> h3. MV storage - Persistent Page Memory
> This engine is way less complicated, but there are tricky parts as well.
> First of all, we can't have new partition ids like in RocksDB engine. It's 2
> bytes, period. Too much code depends on this size to be exactly two bytes.
> Second, unlike RocksDB, each partition is stored in its own set of files. Or,
> in other words, partitions are completely independent, which greatly
> simplifies the swapping procedure. I propose the following algorithm:
> * checkpoint everything before we start, or make sure that everything's
> checkpointed in the partition
> * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and
> close all file descriptors (page-stores)
> * rename all partition files to {{{}*.bak{}}}, for example
> * create new partition and upload data into it
> * when snapshot load is completed, remove bak files {_}on the next
> checkpoint{_}. Revert to bak files otherwise, using the same basic procedure
> Local storage recovery should be implemented carefully. I propose having a
> rebalance marker, that shows that rebalance is in progress. On recovery:
> * if marker is present, delete {{bin}} files and rename {{bak}} files to
> {{bin}}
> * if marker is absent, remove {{bak}} files
> For this to work, we should always delete marker files strictly before
> deleting {{bak}} files. And do it (I repeat) only when new partition files
> are checkpointed.
> h3. MV storage - Volatile Page Memory
> In case of a volatile storage, there are no files that can be preserved.
> Instead, we can preserve old partition "meta" - pointers to all trees, free
> lists or anything else useful. After that, partition can be deleted (not
> really) and we start writing data into a new partition. Pretty much like in
> Test storage engine, but offheap.
> When we're done, we can start deleting old partition data, using the same
> mechanism that's used in partition eviction (IGNITE-17833, not implemented
> yet).
> If snapshot downloading is interrupted, we return back the old meta and
> delete everything that we already downloaded asynchronously, again, reusing
> the partition eviction code. No memory leaks should be left from it.
> h3. Atomic snapshot load in case of multiple storages
> On every step, every operation may fail. But data consistency should be
> preserved no matter what. Here, in particular, we need to call two
> {{onSnapshotLoad}} methods atomically. Their implementations may be very
> different.
> On high level, operation may look like this:
> * write operation marker somewhere (table folder, vault, doesn't matter).
> Before doing so, we need to make sure that data is persisted on disk for both
> storages. Once marker is created, there's no way back. Old partition data
> will be destroyed
> * call both methods
> * remove marker when load is completed
> Pretty simple. Just do the same thing on recovery, if marker is present. One
> thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to
> work. If new partition is already loaded, nothing should break. Loading
> should effectively become a no-op in such case.
> h3. API
> I realize that MvPartitionStorage interface slowly becomes a mess. There's
> not much that we can do with it. But, rebalance is a good exception to the
> rule.
> Basically, we can implement something like this:
> {code:java}
> void performLocalRecovery();
> MvRebalanceDataWriter startDataRebalancing();
> interface MvRebalanceDataWriter {
> CompletableFuture<?> beforeWritingData();
> void addCommitted(...);
> void addUncommitted(...); // Doesn't return the old value, because that's
> actually pointless during rebalancing
> // lastAppliedIndex, runConsistently, etc.
> CompletableFuture<?> afterWritingData();
> void close();
> }{code}
> What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm
> not sure at the moment. I just hope that this brief description gives you a
> gist of what I would like to see in the code. I do believe that it will
> simplify the API at least a little bit.
> What about indexes? That's a good question. I would expect that old objects,
> created with {{{}getOrCreate*Index{}}}, should still be functional. It may be
> a pain in the rear, we may have to introduce default implementation with
> changeable delegates. It's hard to predict exactly, but this is definitely a
> part that also requires attention.
> h3. Conclusion
> I know that this is a pretty big task. I don't expect it to be done in one
> sitting. It should be split to 3 issues at least. Probably more.
> This is fine, just don't forget the link to this particular issue, because it
> has the overall description of what's going on.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)