[ 
https://issues.apache.org/jira/browse/IGNITE-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirill Tkalenko updated IGNITE-17990:
-------------------------------------
    Fix Version/s: 3.0.0-beta2

> Download RAFT snapshot without deleting original partition data
> ---------------------------------------------------------------
>
>                 Key: IGNITE-17990
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17990
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ivan Bessonov
>            Priority: Major
>              Labels: ignite-3
>             Fix For: 3.0.0-beta2
>
>
> h3. Description
> In first design, full rebalance is implemented this way:
>  * we drop partition data
>  * we download partition data from the leader
>  * we're done
> There's a problem with this approach - if download part failed, we lost one 
> follower. This is bad, because technically new leader may have more data in 
> the log and it could have uploaded it the follower, but now it makes no sense.
> Not only can it lead to hard-to-catch errors and introducing custom code to 
> JRaft, it's also an unconditional data deletion without neither explicit  
> user approval nor a copy of the data preserved durably.
> Such implementation is fine for POC and some tests, but it cannot be allowed 
> in the release version of the product.
> h3. New proposed solution
> As trivial as it may seem, new solution is to _not deleting data_ before 
> snapshot is fully downloaded and ready for swap. Why is it trivial? Because 
> this is literally what RAFT demands to be done.
> Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, 
> should be {{O(1)}} when it comes to the number of rows in the partition and a 
> number of transactions in a tx state store. This may not be fully achievable, 
> depending on the implementation that we chose, more on that later.
> Following sections will describe all my concerns and possible 
> implementations. Some sections can be skipped while reading. For example, if 
> you're not interested in a specific storage engine, but want to read 
> everything else.
> h3. TX state storage
> There's one really good thing about TX state storage. It has no storage 
> engine, there's only a single RocksDB-based implementation. This makes 
> possible the following approach:
>  * when we stream data, we can write it into a SST file, almost like in 
> snapshots of meta-storage ans CMG storages
>  * once snapshot is downloaded, we ingest it into a storage
> What I like about this solution is that it's very simple. But, there are 
> concerns:
>  * ingesting leads to additional implicit data flush. Maybe it can be 
> avoided, more on that later
>  * it's not clear whether RocksDB creates a copy of SST file or not. I would 
> assume that it does, because the file might be in other folder or on another 
> device, for example. Although copying files is fast, it still takes time. Add 
> to this a time required for the flush and we see a problem - operation may 
> become unnecessarily long
> For these reasons, I don't think that such solution should be implemented. 
> The point of this description was to show, that I thought about this 
> alternative and consciously decided to use another one.
> I believe that TX state storage should use the same approach as a 
> RocksDB-based partition storage. Its description can be found later in this 
> issue.
> h3. MV storage - Test engine
> Test uses concurrent skip-list map for MV data and a bunch of other maps for 
> indexes. While snapshots is being downloaded, we should insert all data into 
> new maps, that have the same structure. In the end, we should have two 
> versions of the partition: old and new.
> {{onSnapshotLoad}} should just swap all objects. After that, old partition 
> data can be cleaned by the garbage collector.
> This is a good place to start implementation. I assume that some new API will 
> be introduced. I have thoughts about it as well, they are described later in 
> *API* section.
> h3. MV storage - RocksDB engine
> SST-based approach is described in a *TX state storage* section. There I 
> describe why I don't think that this is a good solution. Same reasoning can 
> be applied here just as effectively. This means that we should write data in 
> the same RocksDB instance. This is a little bit tricky.
> The reason is that all stored data is merged together, and Columns Families 
> are shared between different partitions. This makes it harder to find a place 
> to write partition data while old partition data persists. As a reminder and 
> an example, let's take a look at how data is stored in row storage:
> {code:java}
> +-------------------+-----+-------------------+
> |    Partition 0    | ... |   Partition MAX   |
> +-------------------+-----+-------------------+
> | Row1 | ... | RowN | ... | Row1 | ... | RowN |
> +-------------------+-----+-------------------+{code}
> Logically, CF is split into a different "blocks", and each block represents a 
> partition. Currently, each partition block is associated with an 2-bytes 
> identifier that matches a partition number in Big Endian.
>  
> We could add new CF with similar structure and write snapshot data in it, but 
> then the snapshot load operation would require us to move data from one CF to 
> another. The only option that I know of, that can do this, is SST ingestion. 
> And I already explained why I don't like it.
> This leaves us with the necessity to write data into the same column family. 
> Naturally occurring solution is to assign a new identifier to the "new" 
> version of partition. This way replacing "old" partition with "new" would be 
> implemented by replacing "oldPartId" to "newPartId" in table storage metadata.
> Sounds good. No flush is required, snapshot loading becomes pretty fast.
> The only thing to keep in mind is that there are multiple column families in 
> each partition - row data, hash indexes and a CF for every sorted index.
> When "old" partition is deleted, we should probably somehow hint that RocksDB 
> should merge some layers and remove a substantial amount of data from disk. 
> But such optimization also relates to general partition eviction and is out 
> of scope of the discussion.
> Last point: what is "oldPartId" and "newPartId"?
> As badly as I would want to avoid adding new bytes to keys, we should 
> probably replace 2-bytes partition number to a 3-bytes. I propose the 
> following:
> {code:java}
> | MSB | LSB | Generation |{code}
> Where MSB is a most significant byte of the partition number, LSB - least 
> significant byte, and Generation is a 1-byte counter. Every time we need a 
> rebalance, we increase the generation of the current partition.
>  
> There are alternatives - we either have 256 possible generations (rotating, 
> of course) or only 2 (0 and 1). Why is this important?
> Every time a partition is "started", it should, technically, perform a 
> cleanup. Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we 
> would have to cleanup following ranges (lower bound inclusive, upper bound 
> exclusive):
>  * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and 
> {{(0,23,g+1):(0,24,0)}}
>  * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}}
> This should be done for all indexes as well. Something tells me that recovery 
> will be somewhat faster in the second case, but is it more convenient? I 
> don't know.
> Lastly, why is it ok to add a byte to the prefix? Don't we increase a 
> footprint? Yes and no. Right now the engine is not yet properly tuned, but in 
> the future, we may set it up in such a way that RocksDB trims prefixes from 
> the keys, so the size is kinda irrelevant. Will we configure a prefix to be a 
> partition id or a pair (partId, rowId) - I don't know yet. Both options look 
> good, but second may be better. We'll should do both and benchmark them.
> h3. MV storage - Persistent Page Memory
> This engine is way less complicated, but there are tricky parts as well. 
> First of all, we can't have new partition ids like in RocksDB engine. It's 2 
> bytes, period. Too much code depends on this size to be exactly two bytes.
> Second, unlike RocksDB, each partition is stored in its own set of files. Or, 
> in other words, partitions are completely independent, which greatly 
> simplifies the swapping procedure. I propose the following algorithm:
>  * checkpoint everything before we start, or make sure that everything's 
> checkpointed in the partition
>  * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and 
> close all file descriptors (page-stores)
>  * rename all partition files to {{{}*.bak{}}}, for example
>  * create new partition and upload data into it
>  * when snapshot load is completed, remove bak files {_}on the next 
> checkpoint{_}. Revert to bak files otherwise, using the same basic procedure
> Local storage recovery should be implemented carefully. I propose having a 
> rebalance marker, that shows that rebalance is in progress. On recovery:
>  * if marker is present, delete {{bin}} files and rename {{bak}} files to 
> {{bin}}
>  * if marker is absent, remove {{bak}} files
> For this to work, we should always delete marker files strictly before 
> deleting {{bak}} files. And do it (I repeat) only when new partition files 
> are checkpointed.
> h3. MV storage - Volatile Page Memory
> In case of a volatile storage, there are no files that can be preserved. 
> Instead, we can preserve old partition "meta" - pointers to all trees, free 
> lists or anything else useful. After that, partition can be deleted (not 
> really) and we start writing data into a new partition. Pretty much like in 
> Test storage engine, but offheap.
> When we're done, we can start deleting old partition data, using the same 
> mechanism that's used in partition eviction (IGNITE-17833, not implemented 
> yet).
> If snapshot downloading is interrupted, we return back the old meta and 
> delete everything that we already downloaded asynchronously, again, reusing 
> the partition eviction code. No memory leaks should be left from it.
> h3. Atomic snapshot load in case of multiple storages
> On every step, every operation may fail. But data consistency should be 
> preserved no matter what. Here, in particular, we need to call two 
> {{onSnapshotLoad}} methods atomically. Their implementations may be very 
> different.
> On high level, operation may look like this:
>  * write operation marker somewhere (table folder, vault, doesn't matter). 
> Before doing so, we need to make sure that data is persisted on disk for both 
> storages. Once marker is created, there's no way back. Old partition data 
> will be destroyed
>  * call both methods
>  * remove marker when load is completed
> Pretty simple. Just do the same thing on recovery, if marker is present. One 
> thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to 
> work. If new partition is already loaded, nothing should break. Loading 
> should effectively become a no-op in such case.
> h3. API
> I realize that MvPartitionStorage interface slowly becomes a mess. There's 
> not much that we can do with it. But, rebalance is a good exception to the 
> rule.
> Basically, we can implement something like this:
> {code:java}
> void performLocalRecovery();
> MvRebalanceDataWriter startDataRebalancing();
> interface MvRebalanceDataWriter {
>     CompletableFuture<?> beforeWritingData();
>     void addCommitted(...);
>     void addUncommitted(...); // Doesn't return the old value, because that's 
> actually pointless during rebalancing
>     // lastAppliedIndex, runConsistently, etc.
>     CompletableFuture<?> afterWritingData();
>     void close();
> }{code}
> What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm 
> not sure at the moment. I just hope that this brief description gives you a 
> gist of what I would like to see in the code. I do believe that it will 
> simplify the API at least a little bit.
> What about indexes? That's a good question. I would expect that old objects, 
> created with {{{}getOrCreate*Index{}}}, should still be functional. It may be 
> a pain in the rear, we may have to introduce default implementation with 
> changeable delegates. It's hard to predict exactly, but this is definitely a 
> part that also requires attention.
> h3. Conclusion
> I know that this is a pretty big task. I don't expect it to be done in one 
> sitting. It should be split to 3 issues at least. Probably more.
> This is fine, just don't forget the link to this particular issue, because it 
> has the overall description of what's going on.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to