[ 
https://issues.apache.org/jira/browse/IGNITE-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirill Tkalenko reassigned IGNITE-17990:
----------------------------------------

    Assignee: Kirill Tkalenko

> Download RAFT snapshot without deleting original partition data
> ---------------------------------------------------------------
>
>                 Key: IGNITE-17990
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17990
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ivan Bessonov
>            Assignee: Kirill Tkalenko
>            Priority: Major
>              Labels: ignite-3
>             Fix For: 3.0.0-beta2
>
>
> h3. Note
> This is an umbrella issue. It's split into several smaller issues.
> {color:red}See the first comment, it indicates how we end up doing a full 
> rebalancing.{color}
> h3. Description
> In first design, full rebalance is implemented this way:
>  * we drop partition data
>  * we download partition data from the leader
>  * we're done
> There's a problem with this approach - if download part failed, we lost one 
> follower. This is bad, because technically new leader may have more data in 
> the log and it could have uploaded it the follower, but now it makes no sense.
> Not only can it lead to hard-to-catch errors and introducing custom 
> work-around code to JRaft, it's also an unconditional data deletion without 
> neither explicit  user approval nor a copy of the data preserved durably.
> Such implementation is fine for POC and some tests, but it cannot be allowed 
> in the release version of the product.
> h3. New proposed solution
> As trivial as it may seem, new solution is to _not deleting data_ before 
> snapshot is fully downloaded and ready for swap. Why is it trivial? Because 
> this is literally what RAFT demands to be done.
> Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, 
> should be {{O(1)}} when it comes to the number of rows in the partition and a 
> number of transactions in a tx state store. This may not be fully achievable, 
> depending on the implementation that we chose, more on that later.
> Following sections will describe all my concerns and possible 
> implementations. Some sections can be skipped while reading. For example, if 
> you're not interested in a specific storage engine, but want to read 
> everything else.
> h3. TX state storage
> There's one really good thing about TX state storage. It has no storage 
> engine, there's only a single RocksDB-based implementation. This makes 
> possible the following approach:
>  * when we stream data, we can write it into a SST file, almost like in 
> snapshots of meta-storage and CMG storages
>  * once snapshot is downloaded, we ingest it into a storage
> What I like about this solution is that it's very simple. But, there are 
> concerns:
>  * ingesting leads to additional implicit data flush. Maybe it can be 
> avoided, more on that later
>  * it's not clear whether RocksDB creates a copy of SST file or not. I would 
> assume that it does, because the file might be in other folder or on another 
> device, for example. Although copying files is fast, it still takes time. Add 
> to this a time required for the flush and we see a problem - operation may 
> become unnecessarily long
> For these reasons, I don't think that such solution should be implemented. 
> The point of this description was to show, that I thought about this 
> alternative and consciously decided to use another one.
> I believe that TX state storage should use the same approach as a 
> RocksDB-based partition storage. Its description can be found later in this 
> issue.
> h3. MV storage - Test engine
> Test uses concurrent skip-list map for MV data and a bunch of other maps for 
> indexes. While snapshots is being downloaded, we should insert all data into 
> new maps, that have the same structure. In the end, we should have two 
> versions of the partition: old and new.
> {{onSnapshotLoad}} should just swap all objects. After that, old partition 
> data can be cleaned by the garbage collector.
> This is a good place to start implementation. I assume that some new API will 
> be introduced. I have thoughts about it as well, they are described later in 
> *API* section.
> h3. MV storage - RocksDB engine
> SST-based approach is described in a *TX state storage* section. There I 
> describe why I don't think that this is a good solution. Same reasoning can 
> be applied here just as effectively. This means that we should write data in 
> the same RocksDB instance. This is a little bit tricky.
> The reason is that all stored data is merged together, and Columns Families 
> are shared between different partitions. This makes it harder to find a place 
> to write partition data while old partition data persists. As a reminder and 
> an example, let's take a look at how data is stored in row storage:
> {code:java}
> +-------------------+-----+-------------------+
> |    Partition 0    | ... |   Partition MAX   |
> +-------------------+-----+-------------------+
> | Row1 | ... | RowN | ... | Row1 | ... | RowN |
> +-------------------+-----+-------------------+{code}
> Logically, CF is split into a different "blocks", and each block represents a 
> partition. Currently, each partition block is associated with an 2-bytes 
> identifier that matches a partition number in Big Endian.
>  
> We could add new CF with similar structure and write snapshot data in it, but 
> then the snapshot load operation would require us to move data from one CF to 
> another. The only option that I know of, that can do this, is SST ingestion. 
> And I already explained why I don't like it.
> This leaves us with the necessity to write data into the same column family. 
> Naturally occurring solution is to assign a new identifier to the "new" 
> version of partition. This way replacing "old" partition with "new" would be 
> implemented by replacing "oldPartId" to "newPartId" in table storage metadata.
> Sounds good. No flush is required, snapshot loading becomes pretty fast.
> The only thing to keep in mind is that there are multiple column families in 
> each partition - row data, hash indexes and a CF for every sorted index.
> When "old" partition is deleted, we should probably somehow hint that RocksDB 
> should merge some layers and remove a substantial amount of data from disk. 
> But such optimization also relates to general partition eviction and is out 
> of scope of the discussion.
> Last point: what is "oldPartId" and "newPartId"?
> Logically, partition id now becomes a tuple of partition number and its 
> generation. Physically it should be represented as an integer, where lower 
> bits are generation and higher bits are partition number. It it 2 or 3 bytes? 
> Good question, I'll answer it later.
> {code:java}
> | Partition Number | Generation |{code}
> Every time we need a rebalance, we increase the generation of the current 
> partition. Generation counter overflows without affecting the partition 
> number.
>  
> There are alternatives - we either have N possible generations (rotating, of 
> course) or only 2 (0 and 1). Why is this important?
> Every time a partition is "started", it should, technically, perform a 
> cleanup. Imagine (for example) we have partition {{{}(23,g){}}}. Then we 
> would have to cleanup following ranges (lower bound inclusive, upper bound 
> exclusive):
>  * for 256 generations - ranges {{(23,0):(23,g)}} and {{(23,g+1):(24,0)}}
>  * for 2 generations - range {{(23,1-g):(23,2-g)}}
> This should be done for all indexes as well. Something tells me that recovery 
> will be somewhat faster in the second case, but is it more convenient? I 
> don't know.
> Lastly, random thought on the prefix size. Don't we increase a footprint by 
> storing it in every key? Yes and no. Right now the engine is not yet properly 
> tuned, but in the future, we may set it up in such a way that RocksDB trims 
> prefixes from the keys, so the prefix is kind of irrelevant. Will we 
> configure a prefix to be a partition id or a pair (partId, rowId) - I don't 
> know yet. Both options look good, but second may be better. We'll should do 
> both and benchmark them.
> h3. MV storage - Persistent Page Memory
> This engine is way less complicated, but there are tricky parts as well. 
> First of all, we can't have new partition ids like in RocksDB engine. It's 2 
> bytes, period. Too much code depends on this size to be exactly two bytes. 
> There's a possibility of reducing a maximum number of partitions to a 32 
> thousands or so, leaving us with a single "free" bit to store generation, 
> like in proposed RocksDB implementation. 
> Second, unlike RocksDB, each partition is stored in its own set of files. Or, 
> in other words, partitions are completely independent, which greatly 
> simplifies the swapping procedure. I propose the following algorithm:
>  * create new partition generation and upload data into it
>  * checkpoint everything
>  * invalidate all partition pages in page memory (this is {{{}O(1){}}}) for 
> the old generation and close all file descriptors (page-stores)
>  * drop new partition files
> Local storage recovery should be implemented carefully. I propose having a 
> rebalance marker, that shows that rebalance is in progress. On recovery:
>  * if marker is present, delete "new" partition files
>  * if marker is absent, do nothing
> Marker should contain the generation information, otherwise there might be a 
> confusion. Real generation must be easy to determine.
> h3. MV storage - Volatile Page Memory
> In case of a volatile storage, there are no files to work with and there's 
> even no need to partition generations. Instead, we can preserve old partition 
> "meta" - pointers to all trees, free lists or anything else useful. After 
> that, we start writing data into a new partition with a new meta. Pretty much 
> like in Test storage engine, but offheap.
> When we're done, we can start deleting old partition data, using the same 
> mechanism that's used in partition eviction (IGNITE-17833, not implemented 
> yet).
> If snapshot downloading is interrupted, we delete everything that we already 
> downloaded asynchronously, again, reusing the partition eviction code. No 
> memory leaks should be left from it.
> h3. Atomic snapshot load in case of multiple storages
> On every step, every operation may fail. But data consistency should be 
> preserved no matter what. Here, in particular, we need to call two 
> {{onSnapshotLoad}} methods atomically. Their implementations may be very 
> different.
> On high level, operation may look like this:
>  * write operation marker somewhere (table folder, vault, doesn't matter). 
> Before doing so, we need to make sure that data is persisted on disk for both 
> storages. Once marker is created, there's no way back. Old partition data 
> will be destroyed
>  * call both methods
>  * remove marker when load is completed
> Pretty simple. Just do the same thing on recovery, if marker is present. One 
> thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to 
> work. If new partition is already loaded, nothing should break. Loading 
> should effectively become a no-op in such case.
> h3. API
> I realize that MvPartitionStorage interface slowly becomes a mess. There's 
> not much that we can do with it. But, rebalance is a good exception to the 
> rule.
> Basically, we can implement something like this:
> {code:java}
> void performLocalRecovery();
> MvRebalanceDataWriter startDataRebalancing();
> interface MvRebalanceDataWriter {
>     CompletableFuture<?> beforeWritingData();
>     void addCommitted(...);
>     void addUncommitted(...); // Doesn't return the old value, because that's 
> actually pointless during rebalancing
>     // lastAppliedIndex, runConsistently, etc.
>     CompletableFuture<?> afterWritingData();
>     void close();
> }{code}
> What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm 
> not sure at the moment. I just hope that this brief description gives you a 
> gist of what I would like to see in the code. I do believe that it will 
> simplify the API at least a little bit.
> What about indexes? That's a good question. I would expect that old objects, 
> created with {{{}getOrCreate*Index{}}}, should still be functional. It may be 
> a pain in the rear, we may have to introduce default implementation with 
> changeable delegates. It's hard to predict exactly, but this is definitely a 
> part that also requires attention. Same applies to partitions, actually.
> h3. Read Access
> One of the last things to discuss is a possibility to read data while 
> rebalance is in the process. As we know, "full rebalance" can theoretically 
> be performed on a live follower. RAFT works around this fact by allowing 
> reading data from leader only. We don't have such limitation. Instead we have 
> problems.
> Technically, it's very easy to allow reads until the {{onSnapshotLoad}} 
> happens. Everything's simple when it's completed also. Any read operation 
> during onSnapshotLoad should wait for its completion.
> That's good for "fast" read operations. But we also have scans. There are two 
> options:
>  * we cancel them
>  * we allow them continue working seamlessly
> I think the second option is better. When cursor is notified that the 
> "switch" is happening, it opens a new underlying cursor from the position 
> that it previously stopped on. It feels like each storage will have its own 
> version, they may have small differences.
> h3. Conclusion
> I know that this is a pretty big task. I don't expect it to be done in one 
> sitting. It should be split to 3 issues at least. Probably more.
> This is fine, just don't forget the link to this particular issue, because it 
> has the overall description of what's going on.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to