[ https://issues.apache.org/jira/browse/IGNITE-17990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirill Tkalenko updated IGNITE-17990: ------------------------------------- Fix Version/s: 3.0.0-beta2 > Download RAFT snapshot without deleting original partition data > --------------------------------------------------------------- > > Key: IGNITE-17990 > URL: https://issues.apache.org/jira/browse/IGNITE-17990 > Project: Ignite > Issue Type: Improvement > Reporter: Ivan Bessonov > Priority: Major > Labels: ignite-3 > Fix For: 3.0.0-beta2 > > > h3. Description > In first design, full rebalance is implemented this way: > * we drop partition data > * we download partition data from the leader > * we're done > There's a problem with this approach - if download part failed, we lost one > follower. This is bad, because technically new leader may have more data in > the log and it could have uploaded it the follower, but now it makes no sense. > Not only can it lead to hard-to-catch errors and introducing custom code to > JRaft, it's also an unconditional data deletion without neither explicit > user approval nor a copy of the data preserved durably. > Such implementation is fine for POC and some tests, but it cannot be allowed > in the release version of the product. > h3. New proposed solution > As trivial as it may seem, new solution is to _not deleting data_ before > snapshot is fully downloaded and ready for swap. Why is it trivial? Because > this is literally what RAFT demands to be done. > Of course, there's a {*}but{*}. Snapshot application, when it's downloaded, > should be {{O(1)}} when it comes to the number of rows in the partition and a > number of transactions in a tx state store. This may not be fully achievable, > depending on the implementation that we chose, more on that later. > Following sections will describe all my concerns and possible > implementations. Some sections can be skipped while reading. For example, if > you're not interested in a specific storage engine, but want to read > everything else. > h3. TX state storage > There's one really good thing about TX state storage. It has no storage > engine, there's only a single RocksDB-based implementation. This makes > possible the following approach: > * when we stream data, we can write it into a SST file, almost like in > snapshots of meta-storage ans CMG storages > * once snapshot is downloaded, we ingest it into a storage > What I like about this solution is that it's very simple. But, there are > concerns: > * ingesting leads to additional implicit data flush. Maybe it can be > avoided, more on that later > * it's not clear whether RocksDB creates a copy of SST file or not. I would > assume that it does, because the file might be in other folder or on another > device, for example. Although copying files is fast, it still takes time. Add > to this a time required for the flush and we see a problem - operation may > become unnecessarily long > For these reasons, I don't think that such solution should be implemented. > The point of this description was to show, that I thought about this > alternative and consciously decided to use another one. > I believe that TX state storage should use the same approach as a > RocksDB-based partition storage. Its description can be found later in this > issue. > h3. MV storage - Test engine > Test uses concurrent skip-list map for MV data and a bunch of other maps for > indexes. While snapshots is being downloaded, we should insert all data into > new maps, that have the same structure. In the end, we should have two > versions of the partition: old and new. > {{onSnapshotLoad}} should just swap all objects. After that, old partition > data can be cleaned by the garbage collector. > This is a good place to start implementation. I assume that some new API will > be introduced. I have thoughts about it as well, they are described later in > *API* section. > h3. MV storage - RocksDB engine > SST-based approach is described in a *TX state storage* section. There I > describe why I don't think that this is a good solution. Same reasoning can > be applied here just as effectively. This means that we should write data in > the same RocksDB instance. This is a little bit tricky. > The reason is that all stored data is merged together, and Columns Families > are shared between different partitions. This makes it harder to find a place > to write partition data while old partition data persists. As a reminder and > an example, let's take a look at how data is stored in row storage: > {code:java} > +-------------------+-----+-------------------+ > | Partition 0 | ... | Partition MAX | > +-------------------+-----+-------------------+ > | Row1 | ... | RowN | ... | Row1 | ... | RowN | > +-------------------+-----+-------------------+{code} > Logically, CF is split into a different "blocks", and each block represents a > partition. Currently, each partition block is associated with an 2-bytes > identifier that matches a partition number in Big Endian. > > We could add new CF with similar structure and write snapshot data in it, but > then the snapshot load operation would require us to move data from one CF to > another. The only option that I know of, that can do this, is SST ingestion. > And I already explained why I don't like it. > This leaves us with the necessity to write data into the same column family. > Naturally occurring solution is to assign a new identifier to the "new" > version of partition. This way replacing "old" partition with "new" would be > implemented by replacing "oldPartId" to "newPartId" in table storage metadata. > Sounds good. No flush is required, snapshot loading becomes pretty fast. > The only thing to keep in mind is that there are multiple column families in > each partition - row data, hash indexes and a CF for every sorted index. > When "old" partition is deleted, we should probably somehow hint that RocksDB > should merge some layers and remove a substantial amount of data from disk. > But such optimization also relates to general partition eviction and is out > of scope of the discussion. > Last point: what is "oldPartId" and "newPartId"? > As badly as I would want to avoid adding new bytes to keys, we should > probably replace 2-bytes partition number to a 3-bytes. I propose the > following: > {code:java} > | MSB | LSB | Generation |{code} > Where MSB is a most significant byte of the partition number, LSB - least > significant byte, and Generation is a 1-byte counter. Every time we need a > rebalance, we increase the generation of the current partition. > > There are alternatives - we either have 256 possible generations (rotating, > of course) or only 2 (0 and 1). Why is this important? > Every time a partition is "started", it should, technically, perform a > cleanup. Imagine (for example) we have partition {{{}(0,23,g){}}}. Then we > would have to cleanup following ranges (lower bound inclusive, upper bound > exclusive): > * for 256 generations - ranges {{(0,23,0):(0,23,g)}} and > {{(0,23,g+1):(0,24,0)}} > * for 2 generations - range {{(0,23,1-g):(0,23,2-g)}} > This should be done for all indexes as well. Something tells me that recovery > will be somewhat faster in the second case, but is it more convenient? I > don't know. > Lastly, why is it ok to add a byte to the prefix? Don't we increase a > footprint? Yes and no. Right now the engine is not yet properly tuned, but in > the future, we may set it up in such a way that RocksDB trims prefixes from > the keys, so the size is kinda irrelevant. Will we configure a prefix to be a > partition id or a pair (partId, rowId) - I don't know yet. Both options look > good, but second may be better. We'll should do both and benchmark them. > h3. MV storage - Persistent Page Memory > This engine is way less complicated, but there are tricky parts as well. > First of all, we can't have new partition ids like in RocksDB engine. It's 2 > bytes, period. Too much code depends on this size to be exactly two bytes. > Second, unlike RocksDB, each partition is stored in its own set of files. Or, > in other words, partitions are completely independent, which greatly > simplifies the swapping procedure. I propose the following algorithm: > * checkpoint everything before we start, or make sure that everything's > checkpointed in the partition > * invalidate all partition pages in page memory (this is {{{}O(1){}}}) and > close all file descriptors (page-stores) > * rename all partition files to {{{}*.bak{}}}, for example > * create new partition and upload data into it > * when snapshot load is completed, remove bak files {_}on the next > checkpoint{_}. Revert to bak files otherwise, using the same basic procedure > Local storage recovery should be implemented carefully. I propose having a > rebalance marker, that shows that rebalance is in progress. On recovery: > * if marker is present, delete {{bin}} files and rename {{bak}} files to > {{bin}} > * if marker is absent, remove {{bak}} files > For this to work, we should always delete marker files strictly before > deleting {{bak}} files. And do it (I repeat) only when new partition files > are checkpointed. > h3. MV storage - Volatile Page Memory > In case of a volatile storage, there are no files that can be preserved. > Instead, we can preserve old partition "meta" - pointers to all trees, free > lists or anything else useful. After that, partition can be deleted (not > really) and we start writing data into a new partition. Pretty much like in > Test storage engine, but offheap. > When we're done, we can start deleting old partition data, using the same > mechanism that's used in partition eviction (IGNITE-17833, not implemented > yet). > If snapshot downloading is interrupted, we return back the old meta and > delete everything that we already downloaded asynchronously, again, reusing > the partition eviction code. No memory leaks should be left from it. > h3. Atomic snapshot load in case of multiple storages > On every step, every operation may fail. But data consistency should be > preserved no matter what. Here, in particular, we need to call two > {{onSnapshotLoad}} methods atomically. Their implementations may be very > different. > On high level, operation may look like this: > * write operation marker somewhere (table folder, vault, doesn't matter). > Before doing so, we need to make sure that data is persisted on disk for both > storages. Once marker is created, there's no way back. Old partition data > will be destroyed > * call both methods > * remove marker when load is completed > Pretty simple. Just do the same thing on recovery, if marker is present. One > thing to keep in mind - {{onSnapshotLoad}} should be idempotent for this to > work. If new partition is already loaded, nothing should break. Loading > should effectively become a no-op in such case. > h3. API > I realize that MvPartitionStorage interface slowly becomes a mess. There's > not much that we can do with it. But, rebalance is a good exception to the > rule. > Basically, we can implement something like this: > {code:java} > void performLocalRecovery(); > MvRebalanceDataWriter startDataRebalancing(); > interface MvRebalanceDataWriter { > CompletableFuture<?> beforeWritingData(); > void addCommitted(...); > void addUncommitted(...); // Doesn't return the old value, because that's > actually pointless during rebalancing > // lastAppliedIndex, runConsistently, etc. > CompletableFuture<?> afterWritingData(); > void close(); > }{code} > What exactly do we do in {{onSnapshotLoad}} and which interfaces it uses, I'm > not sure at the moment. I just hope that this brief description gives you a > gist of what I would like to see in the code. I do believe that it will > simplify the API at least a little bit. > What about indexes? That's a good question. I would expect that old objects, > created with {{{}getOrCreate*Index{}}}, should still be functional. It may be > a pain in the rear, we may have to introduce default implementation with > changeable delegates. It's hard to predict exactly, but this is definitely a > part that also requires attention. > h3. Conclusion > I know that this is a pretty big task. I don't expect it to be done in one > sitting. It should be split to 3 issues at least. Probably more. > This is fine, just don't forget the link to this particular issue, because it > has the overall description of what's going on. -- This message was sent by Atlassian Jira (v8.20.10#820010)