[ 
https://issues.apache.org/jira/browse/IGNITE-17081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Bessonov updated IGNITE-17081:
-----------------------------------
    Description: 
Please refer to https://issues.apache.org/jira/browse/IGNITE-16907 for 
prerequisites.

Please also familiarize yourself with 
https://issues.apache.org/jira/browse/IGNITE-17077 for better understanding, 
the description is continued from there.

For RocksDB based storage the recovery process is trivial, because RocksDB has 
its own WAL. So, for testing purposes, it would be enough to just store update 
index in meta column family.

Immediately we have a write amplification issue, on top of possible performance 
degradation. Obvious solution is inherently bad and needs to be improved.
h2. General idea & implementation

Obviously, WAL needs to be disabled (WriteOptions#setDisableWAL). This kinda 
breaks RocksDB recovery procedure, we need to take measures to avoid it.

The only feasible way to do so is to use DBOptions#setAtomicFlush in 
conjunction with org.rocksdb.WriteBatchWithIndex. This allows RocksDB to save 
all column families consistently, if you have batches that cover several CFs. 
Basically, {{acquireConsistencyLock()}} would create a thread-local write 
batch, that's applied on locks release. Most of RocksDbMvPartitionStorage will 
be affected by this change.

NOTE: I believe that scans with unapplied batches should be prohibited for now  
(gladly, there's a WriteBatchInterface#count() to check). I don't see any 
practical value and a proper way of implementing it, considering how spread-out 
in time the scan process is.
h2. Callbacks and RAFT snapshots

Simply storing and reading update index is easy. Reading committed index is 
more challenging, I propose caching it and update only from the closure, that 
can also be used by RAFT to truncate the log.

For a closure, there are several things to account for during the 
implementation:
 * DBOptions#setListeners. We need two events - ON_FLUSH_BEGIN and 
ON_FLUSH_COMPLETED. All "completed" events go after all "begin" events in 
atomic flush mode. And, once you have your first "completed" event ,you have a 
guarantee that *all* memtables are already persisted.
This allows easy tracking of RocksDB flushes, monitoring events alteration is 
all that's needed.
 * Unlike PDS implementation, here we will be writing updateIndex value into a 
memtable every time. This makes it harder to find persistedIndex values for 
partitions. Gladly, considering the events that we have, during the time 
between first "completed" and the very next "begin", the state on disk is fully 
consistent. And there's a way to read data from storage avoiding memtable 
completely - ReadOptions#setReadTier(PERSISTED_TIER).

Summarizing everything from the above, we should implement following protocol:

 
{code:java}
During table start: read latest values of update indexes. Store them in an 
in-memory structure.
Set "lastEventType = ON_FLUSH_COMPLETED;".

onFlushBegin:
  if (lastEventType == ON_FLUSH_BEGIN)
    return;

  waitForLastAsyncUpdateIndexesRead();

  lastEventType = ON_FLUSH_BEGIN;

onFlushCompleted:
  if (lastEventType == ON_FLUSH_COMPLETED)
    return;

  asyncReadUpdateIndexesFromDisk();

  lastEventType = ON_FLUSH_COMPLETED;{code}
Reading values from disk must be performed asynchronously to not stall flushing 
process. We don't control locks that RocksDb holds while calling listener's 
methods.

That asynchronous process would invoke closures that provide presisted 
updateIndex values to other components.

NODE: One might say that we should call "waitForLastAsyncUpdateIndexesRead();" 
as late as possible just in case. But my implementation says calling it during 
the first event. This is fine. I noticed that column families are flushed in 
order of their internal ids. These ids correspond to a sequence number of CFs, 
and the "default" CF is always created first. This is the exact CF that we use 
to store meta. Maybe we're going to change this and create a separate meta CF. 
Only then we could start optimizing this part, and only if we'll have an actual 
proof that there's a stall in this exact place.

  was:
Please refer to https://issues.apache.org/jira/browse/IGNITE-16907 for 
prerequisites.

Please also familiarize yourself with 
https://issues.apache.org/jira/browse/IGNITE-17077 for better understanding, 
the description is continued from there.

For RocksDB based storage the recovery process is trivial, because RocksDB has 
its own WAL. So, for testing purposes, it would be enough to just store update 
index in meta column family.

Immediately we have a write amplification issue, on top of possible performance 
degradation. Obvious solution is inherently bad and needs to be improved.
h2. General idea & implementation

Obviously, WAL needs to be disabled (WriteOptions#setDisableWAL). This kinda 
breaks RocksDB recovery procedure, we need to take measures to avoid it.

The only feasible way to do so is to use DBOptions#setAtomicFlush in 
conjunction with org.rocksdb.WriteBatchWithIndex. This allows RocksDB to save 
all column families consistently, if you have batches that cover several CFs. 
Basically, {{acquireConsistencyLock()}} would create a thread-local write 
batch, that's applied on locks release. Most of RocksDbMvPartitionStorage will 
be affected by this change.

NOTE: I believe that scans with unapplied batches should be prohibited for now  
(gladly, there's a WriteBatchInterface#count() to check). I don't see any 
practical value and a proper way of implementing it, considering how spread-out 
in time the scan process is.
h2. Callbacks and RAFT snapshots

Simply storing and reading update index is easy. Reading committed index is 
more challenging, I propose caching it and update only from the closure, that 
can also be used by RAFT to truncate the log.

For a closure, there are several things to account for during the 
implementation:
 * DBOptions#setListeners. We need two events - ON_FLUSH_BEGIN and 
ON_FLUSH_COMPLETED. All "completed" events go after all "begin" events in 
atomic flush mode. And, once you have your first "completed" event ,you have a 
guarantee that *all* memtables are already persisted.
This allows easy tracking of RocksDB flushes, monitoring events alteration is 
all that's needed.
 * Unlike PDS implementation, here we will be writing updateIndex value into a 
memtable every time. This makes it harder to find persistedIndex values for 
partitions. Gladly, considering the events that we have, during the time 
between first "completed" and the very next "begin", the state on disk is fully 
consistent. And there's a way to read data from storage avoiding memtable 
completely - ReadOptions#setReadTier(PERSISTED_TIER).

Summarizing everything from the above, we should implement following protocol:

 
{code:java}
During table start: read latest values of update indexes. Store them in an 
in-memory structure.
Set "lastEventType = ON_FLUSH_COMPLETED;".

onFlushBegin:
  if (lastEventType == ON_FLUSH_BEGIN)
    return;

  waitForLastAsyncUpdateIndexesRead();

  lastEventType = ON_FLUSH_BEGIN;

onFlushCompleted:
  if (lastEventType == ON_FLUSH_COMPLETED)
    return;

  asyncReadUpdateIndexesFromDisk();

  lastEventType = ON_FLUSH_COMPLETED;{code}
Reading values from disk must be performed asynchronously to not stall flushing 
process. We don't control locks that RocksDb holds while calling listener's 
methods.

That asynchronous process would invoke closures that provide presisted 
updateIndex  values to other components.

NODE: One might say that we should call "waitForLastAsyncUpdateIndexesRead();" 
as late as possible just in case. But my implementation says calling it during 
the first event. This is fine. I noticed that column families are flushed in 
order of their internal ids. These ids correspond to a sequence number of CFs, 
and the "default" CF is always created first. This is the exact CF that we use 
to store meta. Maybe we're going to change this and create a separate meta CF. 
Only then we could start optimizing this part, and only if we'll have an actual 
proof that there's a stall in this exact place.


> Implement checkpointIndex for RocksDB
> -------------------------------------
>
>                 Key: IGNITE-17081
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17081
>             Project: Ignite
>          Issue Type: Improvement
>            Reporter: Ivan Bessonov
>            Priority: Major
>              Labels: ignite-3
>
> Please refer to https://issues.apache.org/jira/browse/IGNITE-16907 for 
> prerequisites.
> Please also familiarize yourself with 
> https://issues.apache.org/jira/browse/IGNITE-17077 for better understanding, 
> the description is continued from there.
> For RocksDB based storage the recovery process is trivial, because RocksDB 
> has its own WAL. So, for testing purposes, it would be enough to just store 
> update index in meta column family.
> Immediately we have a write amplification issue, on top of possible 
> performance degradation. Obvious solution is inherently bad and needs to be 
> improved.
> h2. General idea & implementation
> Obviously, WAL needs to be disabled (WriteOptions#setDisableWAL). This kinda 
> breaks RocksDB recovery procedure, we need to take measures to avoid it.
> The only feasible way to do so is to use DBOptions#setAtomicFlush in 
> conjunction with org.rocksdb.WriteBatchWithIndex. This allows RocksDB to save 
> all column families consistently, if you have batches that cover several CFs. 
> Basically, {{acquireConsistencyLock()}} would create a thread-local write 
> batch, that's applied on locks release. Most of RocksDbMvPartitionStorage 
> will be affected by this change.
> NOTE: I believe that scans with unapplied batches should be prohibited for 
> now  (gladly, there's a WriteBatchInterface#count() to check). I don't see 
> any practical value and a proper way of implementing it, considering how 
> spread-out in time the scan process is.
> h2. Callbacks and RAFT snapshots
> Simply storing and reading update index is easy. Reading committed index is 
> more challenging, I propose caching it and update only from the closure, that 
> can also be used by RAFT to truncate the log.
> For a closure, there are several things to account for during the 
> implementation:
>  * DBOptions#setListeners. We need two events - ON_FLUSH_BEGIN and 
> ON_FLUSH_COMPLETED. All "completed" events go after all "begin" events in 
> atomic flush mode. And, once you have your first "completed" event ,you have 
> a guarantee that *all* memtables are already persisted.
> This allows easy tracking of RocksDB flushes, monitoring events alteration is 
> all that's needed.
>  * Unlike PDS implementation, here we will be writing updateIndex value into 
> a memtable every time. This makes it harder to find persistedIndex values for 
> partitions. Gladly, considering the events that we have, during the time 
> between first "completed" and the very next "begin", the state on disk is 
> fully consistent. And there's a way to read data from storage avoiding 
> memtable completely - ReadOptions#setReadTier(PERSISTED_TIER).
> Summarizing everything from the above, we should implement following protocol:
>  
> {code:java}
> During table start: read latest values of update indexes. Store them in an 
> in-memory structure.
> Set "lastEventType = ON_FLUSH_COMPLETED;".
> onFlushBegin:
>   if (lastEventType == ON_FLUSH_BEGIN)
>     return;
>   waitForLastAsyncUpdateIndexesRead();
>   lastEventType = ON_FLUSH_BEGIN;
> onFlushCompleted:
>   if (lastEventType == ON_FLUSH_COMPLETED)
>     return;
>   asyncReadUpdateIndexesFromDisk();
>   lastEventType = ON_FLUSH_COMPLETED;{code}
> Reading values from disk must be performed asynchronously to not stall 
> flushing process. We don't control locks that RocksDb holds while calling 
> listener's methods.
> That asynchronous process would invoke closures that provide presisted 
> updateIndex values to other components.
> NODE: One might say that we should call 
> "waitForLastAsyncUpdateIndexesRead();" as late as possible just in case. But 
> my implementation says calling it during the first event. This is fine. I 
> noticed that column families are flushed in order of their internal ids. 
> These ids correspond to a sequence number of CFs, and the "default" CF is 
> always created first. This is the exact CF that we use to store meta. Maybe 
> we're going to change this and create a separate meta CF. Only then we could 
> start optimizing this part, and only if we'll have an actual proof that 
> there's a stall in this exact place.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to