This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 599470a6c1 IGNITE-23567 Store Metastorage checksums durably (#4654)
599470a6c1 is described below

commit 599470a6c14585b1fbfebb4e39fb46e06838d78e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 30 18:23:07 2024 +0400

    IGNITE-23567 Store Metastorage checksums durably (#4654)
---
 .../server/persistence/RocksDbKeyValueStorage.java | 65 ++++++++++++++++++----
 .../ignite/internal/rocksdb/ColumnFamily.java      | 14 +++++
 2 files changed, 67 insertions(+), 12 deletions(-)

diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index b7b08d7c7c..67a88affc9 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -198,7 +198,11 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     /** Revision to timestamp mapping column family. */
     private volatile ColumnFamily revisionToTs;
 
-    /** Revision to checksum mapping column family. */
+    /**
+     * Revision to checksum mapping column family. It is written durably (that 
is, with RocksDB WAL and fsync=true), to make sure that
+     * we never lose the fact that a revision was applied on a node (it's 
needed to make sure we notice Metastorage divergence even after
+     * a machine crash).
+     */
     private volatile ColumnFamily revisionToChecksum;
 
     /** Snapshot manager. */
@@ -245,8 +249,19 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     /** Tracks RocksDb resources that must be properly closed. */
     private List<AbstractNativeReference> rocksResources = new ArrayList<>();
 
-    /** Metastorage recovery is based on the snapshot & external log. WAL is 
never used for recovery, and can be safely disabled. */
-    private final WriteOptions defaultWriteOptions = new 
WriteOptions().setDisableWAL(true);
+    /**
+     * Write options used to write everything except checkums.
+     *
+     * <p>Access is guarded by {@link #rwLock}.
+     */
+    private WriteOptions defaultWriteOptions;
+
+    /**
+     * Write options used to write checksums.
+     *
+     * <p>Access is guarded by {@link #rwLock}.
+     */
+    private WriteOptions checksumWriteOptions;
 
     /**
      * Constructor.
@@ -344,6 +359,14 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     }
 
     private void createDb() throws RocksDBException {
+        // Metastorage recovery is based on the snapshot & external log. WAL 
is never used for recovery, and can be safely disabled.
+        defaultWriteOptions = new WriteOptions().setDisableWAL(true);
+        rocksResources.add(defaultWriteOptions);
+
+        // Checksums must be written durably to make sure we notice 
Metastorage divergence when it happens.
+        checksumWriteOptions = new WriteOptions().setSync(true);
+        rocksResources.add(checksumWriteOptions);
+
         List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
 
         assert descriptors.size() == 5 : descriptors.size();
@@ -428,7 +451,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
         rwLock.writeLock().lock();
         try {
-            IgniteUtils.closeAll(this::closeRocksResources, 
defaultWriteOptions);
+            closeRocksResources();
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -593,10 +616,25 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     private void completeAndWriteBatch(
             WriteBatch batch, long newRev, KeyValueUpdateContext context, long 
newChecksum
     ) throws RocksDBException {
-        HybridTimestamp ts = context.timestamp;
-
         byte[] revisionBytes = longToBytes(newRev);
 
+        // We are writing the checksum durably (with WAL and fsync) and the 
revision itself is written without a WAL (and hence no fsync),
+        // so we cannot put them in the same batch, and they cannot be written 
atomically. If we wrote revision first and only then
+        // its checksum, we could end up in a situation when a revision gets 
written (and stored on disk) and checksum does not get written,
+        // then recovery happens, sees the Raft command index as applied (it's 
written in the same batch as revision), so the command
+        // does not get reapplied, and the checksum is not written at all.
+        // This is why we write checksum first. If a crash happens, we'll 
reapply the Raft command and will attempt to write the same
+        // checksum which is ok.
+        // The worst thing this can lead to is a false positive on Metastorage 
divergence validation (we wrote a revision checksum, but not
+        // the revision itself, so it wasn't applied and there is no real 
divergence, but we'll see the node as divergent), but this seems
+        // better than false negatives (when we let a divergent node join).
+        boolean sameChecksumAlreadyExists = validateNoChecksumConflict(newRev, 
newChecksum);
+        if (!sameChecksumAlreadyExists) {
+            revisionToChecksum.put(checksumWriteOptions, revisionBytes, 
longToBytes(newChecksum));
+        }
+
+        HybridTimestamp ts = context.timestamp;
+
         data.put(batch, REVISION_KEY, revisionBytes);
 
         byte[] tsBytes = hybridTsToArray(ts);
@@ -604,9 +642,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         tsToRevision.put(batch, tsBytes, revisionBytes);
         revisionToTs.put(batch, revisionBytes, tsBytes);
 
-        validateNoChecksumConflict(newRev, newChecksum);
-        revisionToChecksum.put(batch, revisionBytes, longToBytes(newChecksum));
-
         addIndexAndTermToWriteBatch(batch, context);
 
         db.write(defaultWriteOptions, batch);
@@ -620,7 +655,7 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         notifyRevisionUpdate();
     }
 
-    private void validateNoChecksumConflict(long newRev, long newChecksum) 
throws RocksDBException {
+    private boolean validateNoChecksumConflict(long newRev, long newChecksum) 
throws RocksDBException {
         byte[] existingChecksumBytes = 
revisionToChecksum.get(longToBytes(newRev));
 
         if (existingChecksumBytes != null) {
@@ -637,6 +672,8 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
                 );
             }
         }
+
+        return existingChecksumBytes != null;
     }
 
     private static byte[] hybridTsToArray(HybridTimestamp ts) {
@@ -1342,10 +1379,12 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
 
     @Override
     public void clear() {
-        // There's no way to easily remove all data from RocksDB, so we need 
to re-create it from scratch.
-        closeRocksResources();
+        rwLock.readLock().lock();
 
         try {
+            // There's no way to easily remove all data from RocksDB, so we 
need to re-create it from scratch.
+            closeRocksResources();
+
             destroyRocksDb();
 
             this.rev = 0;
@@ -1356,6 +1395,8 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
             createDb();
         } catch (Exception e) {
             throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to 
restore snapshot", e);
+        } finally {
+            rwLock.readLock().unlock();
         }
     }
 
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
index 7a098bde81..b241da74a1 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
@@ -29,6 +29,7 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
 
 /**
  * Wrapper for the column family that encapsulates {@link ColumnFamilyHandle} 
and RocksDB's operations with it.
@@ -132,6 +133,19 @@ public class ColumnFamily {
         db.put(cfHandle, key, value);
     }
 
+    /**
+     * Puts a key-value pair into this column family with the given {@link 
WriteOptions}.
+     *
+     * @param writeOptions Write options to use.
+     * @param key Key.
+     * @param value Value.
+     * @throws RocksDBException If failed.
+     * @see RocksDB#put(ColumnFamilyHandle, byte[], byte[])
+     */
+    public void put(WriteOptions writeOptions, byte[] key, byte[] value) 
throws RocksDBException {
+        db.put(cfHandle, writeOptions, key, value);
+    }
+
     /**
      * Puts a key-value pair into this column family within the write batch.
      *

Reply via email to