This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-4.12
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.12 by this push:
     new f459f27  ISSUE #2567: Save last revision in rocksdb
f459f27 is described below

commit f459f27738b30454bb556e931bd373fa8b9d2606
Author: Surinder Singh <[email protected]>
AuthorDate: Tue Feb 2 01:21:54 2021 -0800

    ISSUE #2567: Save last revision in rocksdb
    
    Descriptions of the changes in this PR:
    
    Save latest revision in rocksdb.
    
    ### Motivation
    Currently while restoring the complete transaction log is replayed.
    As the transaction log builds up over time, it takes longer and longer to 
restore the storage containers on a new pod.
    
    ### Changes
    
    Instead we will save the revision in the rocksdb. We will use the revision 
retrieved after
    restoring the last checkpoint to skip ahead in the journal logs and only 
replay the entries
    that are not included in the checkpoint
    
    Master Issue: #2567
    
    Reviewers: Enrico Olivelli <[email protected]>, Andrey Yegorov 
<[email protected]>, Matteo Merli <[email protected]>
    
    This closes #2568 from sursingh/save-last-version, closes #2567
    
    (cherry picked from commit 1ddeb9bfd4ad5a016e76254391fb88c53f566f63)
    Signed-off-by: Enrico Olivelli <[email protected]>
---
 .../bookkeeper/statelib/impl/kv/RocksdbKVStore.java      | 16 ++++++++++++++++
 .../bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java     |  4 ++++
 2 files changed, 20 insertions(+)

diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
index fabf1e4..0205c48 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
@@ -217,6 +217,22 @@ public class RocksdbKVStore<K, V> implements KVStore<K, V> 
{
         }
     }
 
+    protected void updateLastRevision(WriteBatch batch, long revision) {
+        if (revision >= 0) { // k/v comes from log stream
+            if (getLastRevision() >= revision) { // these k/v pairs are 
duplicates
+                return;
+            }
+            try {
+                // update revision
+                setLastRevision(revision);
+                batch.put(metaCfHandle, LAST_REVISION, lastRevisionBytes);
+            } catch (RocksDBException e) {
+                throw new StateStoreRuntimeException(
+                        "Error while updating last revision " + revision + " 
from store " + name, e);
+            }
+        }
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     public synchronized void init(StateStoreSpec spec) throws 
StateStoreException {
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
index fa595dc..5c9a83d 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
@@ -365,6 +365,7 @@ class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> 
implements MVCCStore<K, V
         IncrementResult<K, V> result = null;
         try {
             result = increment(revision, batch, op);
+            updateLastRevision(batch, revision);
             executeBatch(batch);
             return result;
         } catch (StateStoreRuntimeException e) {
@@ -463,6 +464,7 @@ class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> 
implements MVCCStore<K, V
         PutResult<K, V> result = null;
         try {
             result = put(revision, batch, op);
+            updateLastRevision(batch, revision);
             executeBatch(batch);
             return result;
         } catch (StateStoreRuntimeException e) {
@@ -576,6 +578,7 @@ class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> 
implements MVCCStore<K, V
         DeleteResult<K, V> result = null;
         try {
             result = delete(revision, batch, op, true);
+            updateLastRevision(batch, revision);
             executeBatch(batch);
             return result;
         } catch (StateStoreRuntimeException e) {
@@ -740,6 +743,7 @@ class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> 
implements MVCCStore<K, V
             for (Op<K, V> o : operations) {
                 results.add(executeOp(revision, batch, o));
             }
+            updateLastRevision(batch, revision);
             executeBatch(batch);
 
             // 4. repare the result

Reply via email to