This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 599470a6c1 IGNITE-23567 Store Metastorage checksums durably (#4654)
599470a6c1 is described below
commit 599470a6c14585b1fbfebb4e39fb46e06838d78e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 30 18:23:07 2024 +0400
IGNITE-23567 Store Metastorage checksums durably (#4654)
---
.../server/persistence/RocksDbKeyValueStorage.java | 65 ++++++++++++++++++----
.../ignite/internal/rocksdb/ColumnFamily.java | 14 +++++
2 files changed, 67 insertions(+), 12 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index b7b08d7c7c..67a88affc9 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -198,7 +198,11 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
/** Revision to timestamp mapping column family. */
private volatile ColumnFamily revisionToTs;
- /** Revision to checksum mapping column family. */
+ /**
+ * Revision to checksum mapping column family. It is written durably (that
is, with RocksDB WAL and fsync=true), to make sure that
+ * we never lose the fact that a revision was applied on a node (it's
needed to make sure we notice Metastorage divergence even after
+ * a machine crash).
+ */
private volatile ColumnFamily revisionToChecksum;
/** Snapshot manager. */
@@ -245,8 +249,19 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
/** Tracks RocksDb resources that must be properly closed. */
private List<AbstractNativeReference> rocksResources = new ArrayList<>();
- /** Metastorage recovery is based on the snapshot & external log. WAL is
never used for recovery, and can be safely disabled. */
- private final WriteOptions defaultWriteOptions = new
WriteOptions().setDisableWAL(true);
+ /**
+ * Write options used to write everything except checkums.
+ *
+ * <p>Access is guarded by {@link #rwLock}.
+ */
+ private WriteOptions defaultWriteOptions;
+
+ /**
+ * Write options used to write checksums.
+ *
+ * <p>Access is guarded by {@link #rwLock}.
+ */
+ private WriteOptions checksumWriteOptions;
/**
* Constructor.
@@ -344,6 +359,14 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
}
private void createDb() throws RocksDBException {
+ // Metastorage recovery is based on the snapshot & external log. WAL
is never used for recovery, and can be safely disabled.
+ defaultWriteOptions = new WriteOptions().setDisableWAL(true);
+ rocksResources.add(defaultWriteOptions);
+
+ // Checksums must be written durably to make sure we notice
Metastorage divergence when it happens.
+ checksumWriteOptions = new WriteOptions().setSync(true);
+ rocksResources.add(checksumWriteOptions);
+
List<ColumnFamilyDescriptor> descriptors = cfDescriptors();
assert descriptors.size() == 5 : descriptors.size();
@@ -428,7 +451,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
rwLock.writeLock().lock();
try {
- IgniteUtils.closeAll(this::closeRocksResources,
defaultWriteOptions);
+ closeRocksResources();
} finally {
rwLock.writeLock().unlock();
}
@@ -593,10 +616,25 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
private void completeAndWriteBatch(
WriteBatch batch, long newRev, KeyValueUpdateContext context, long
newChecksum
) throws RocksDBException {
- HybridTimestamp ts = context.timestamp;
-
byte[] revisionBytes = longToBytes(newRev);
+ // We are writing the checksum durably (with WAL and fsync) and the
revision itself is written without a WAL (and hence no fsync),
+ // so we cannot put them in the same batch, and they cannot be written
atomically. If we wrote revision first and only then
+ // its checksum, we could end up in a situation when a revision gets
written (and stored on disk) and checksum does not get written,
+ // then recovery happens, sees the Raft command index as applied (it's
written in the same batch as revision), so the command
+ // does not get reapplied, and the checksum is not written at all.
+ // This is why we write checksum first. If a crash happens, we'll
reapply the Raft command and will attempt to write the same
+ // checksum which is ok.
+ // The worst thing this can lead to is a false positive on Metastorage
divergence validation (we wrote a revision checksum, but not
+ // the revision itself, so it wasn't applied and there is no real
divergence, but we'll see the node as divergent), but this seems
+ // better than false negatives (when we let a divergent node join).
+ boolean sameChecksumAlreadyExists = validateNoChecksumConflict(newRev,
newChecksum);
+ if (!sameChecksumAlreadyExists) {
+ revisionToChecksum.put(checksumWriteOptions, revisionBytes,
longToBytes(newChecksum));
+ }
+
+ HybridTimestamp ts = context.timestamp;
+
data.put(batch, REVISION_KEY, revisionBytes);
byte[] tsBytes = hybridTsToArray(ts);
@@ -604,9 +642,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
tsToRevision.put(batch, tsBytes, revisionBytes);
revisionToTs.put(batch, revisionBytes, tsBytes);
- validateNoChecksumConflict(newRev, newChecksum);
- revisionToChecksum.put(batch, revisionBytes, longToBytes(newChecksum));
-
addIndexAndTermToWriteBatch(batch, context);
db.write(defaultWriteOptions, batch);
@@ -620,7 +655,7 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
notifyRevisionUpdate();
}
- private void validateNoChecksumConflict(long newRev, long newChecksum)
throws RocksDBException {
+ private boolean validateNoChecksumConflict(long newRev, long newChecksum)
throws RocksDBException {
byte[] existingChecksumBytes =
revisionToChecksum.get(longToBytes(newRev));
if (existingChecksumBytes != null) {
@@ -637,6 +672,8 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
);
}
}
+
+ return existingChecksumBytes != null;
}
private static byte[] hybridTsToArray(HybridTimestamp ts) {
@@ -1342,10 +1379,12 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
@Override
public void clear() {
- // There's no way to easily remove all data from RocksDB, so we need
to re-create it from scratch.
- closeRocksResources();
+ rwLock.readLock().lock();
try {
+ // There's no way to easily remove all data from RocksDB, so we
need to re-create it from scratch.
+ closeRocksResources();
+
destroyRocksDb();
this.rev = 0;
@@ -1356,6 +1395,8 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
createDb();
} catch (Exception e) {
throw new MetaStorageException(RESTORING_STORAGE_ERR, "Failed to
restore snapshot", e);
+ } finally {
+ rwLock.readLock().unlock();
}
}
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
index 7a098bde81..b241da74a1 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
@@ -29,6 +29,7 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
/**
* Wrapper for the column family that encapsulates {@link ColumnFamilyHandle}
and RocksDB's operations with it.
@@ -132,6 +133,19 @@ public class ColumnFamily {
db.put(cfHandle, key, value);
}
+ /**
+ * Puts a key-value pair into this column family with the given {@link
WriteOptions}.
+ *
+ * @param writeOptions Write options to use.
+ * @param key Key.
+ * @param value Value.
+ * @throws RocksDBException If failed.
+ * @see RocksDB#put(ColumnFamilyHandle, byte[], byte[])
+ */
+ public void put(WriteOptions writeOptions, byte[] key, byte[] value)
throws RocksDBException {
+ db.put(cfHandle, writeOptions, key, value);
+ }
+
/**
* Puts a key-value pair into this column family within the write batch.
*