This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e63b957bd85 IGNITE-25947 Add 'created' flag to RocksDB-based log
storage (#6273)
e63b957bd85 is described below
commit e63b957bd85215192a2e83bd6ab50d4d19adf819
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jul 18 15:45:31 2025 +0400
IGNITE-25947 Add 'created' flag to RocksDB-based log storage (#6273)
---
.../apache/ignite/internal/util/ArrayUtils.java | 8 +-
.../storage/impl/DefaultLogStorageFactory.java | 52 +++-
.../raft/storage/impl/MetadataMigration.java | 179 +++++++++++++
.../raft/storage/impl/RocksDbSharedLogStorage.java | 84 +++++--
.../storage/impl/DefaultLogStorageFactoryTest.java | 276 +++++++++++++++++++++
.../inmemory/ItRaftStorageVolatilityTest.java | 28 +--
6 files changed, 580 insertions(+), 47 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index 35e3f90cc99..afdbd778000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -200,7 +200,7 @@ public final class ArrayUtils {
* @param arr Array to check.
* @return {@code true} if {@code null} or an empty array is provided,
{@code false} otherwise.
*/
- public static boolean nullOrEmpty(byte[] arr) {
+ public static boolean nullOrEmpty(byte @Nullable [] arr) {
return arr == null || arr.length == 0;
}
@@ -230,7 +230,7 @@ public final class ArrayUtils {
* @param arr Array to check.
* @return {@code true} if {@code null} or an empty array is provided,
{@code false} otherwise.
*/
- public static boolean nullOrEmpty(long[] arr) {
+ public static boolean nullOrEmpty(long @Nullable [] arr) {
return arr == null || arr.length == 0;
}
@@ -329,7 +329,7 @@ public final class ArrayUtils {
* @param longs One or more elements.
* @return Concatenated array.
*/
- public static long[] concat(@Nullable long[] arr, long... longs) {
+ public static long[] concat(long @Nullable [] arr, long... longs) {
if (nullOrEmpty(arr)) {
return longs;
}
@@ -348,7 +348,7 @@ public final class ArrayUtils {
* @param bytes One or more elements.
* @return Concatenated array.
*/
- public static byte[] concat(@Nullable byte[] arr, byte... bytes) {
+ public static byte[] concat(byte @Nullable [] arr, byte... bytes) {
if (nullOrEmpty(arr)) {
return bytes;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
index 045240c9793..25fa321929e 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
@@ -65,6 +65,10 @@ import org.rocksdb.util.SizeUnit;
public class DefaultLogStorageFactory implements LogStorageFactory {
private static final IgniteLogger LOG =
Loggers.forClass(DefaultLogStorageFactory.class);
+ static final byte[] FINISHED_META_MIGRATION_META_KEY = {0};
+
+ static final byte[] STORAGE_CREATED_META_PREFIX = {1};
+
/** Name of the log factory, will be used in logs. */
private final String factoryName;
@@ -86,6 +90,8 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
/** Write options to use in writes to database. */
private WriteOptions writeOptions;
+ private ColumnFamilyHandle metaHandle;
+
/** Configuration column family handle. */
private ColumnFamilyHandle confHandle;
@@ -166,6 +172,8 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
this.flushListener = new LoggingRocksDbFlushListener(factoryName,
nodeName);
List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of(
+ // Column family to store metadata.
+ new ColumnFamilyDescriptor("Meta".getBytes(UTF_8), cfOption),
// Column family to store configuration log entry.
new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8),
cfOption),
// Default column family to store user data log entry.
@@ -185,9 +193,13 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
// Setup background compactions pool
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(),
Priority.LOW);
- assert (columnFamilyHandles.size() == 2);
- this.confHandle = columnFamilyHandles.get(0);
- this.dataHandle = columnFamilyHandles.get(1);
+ assert (columnFamilyHandles.size() == 3);
+ this.metaHandle = columnFamilyHandles.get(0);
+ this.confHandle = columnFamilyHandles.get(1);
+ this.dataHandle = columnFamilyHandles.get(2);
+
+ MetadataMigration metadataMigration = metadataMigration();
+ metadataMigration.migrateIfNeeded();
} catch (Exception e) {
closeRocksResources();
@@ -195,6 +207,10 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
}
}
+ MetadataMigration metadataMigration() {
+ return new MetadataMigration(db, writeOptions, metaHandle, confHandle,
dataHandle);
+ }
+
@Override
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
@@ -226,19 +242,23 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
public LogStorage createLogStorage(String raftNodeStorageId, RaftOptions
raftOptions) {
// raftOptions is ignored as fsync status is passed via dbOptions.
- return new RocksDbSharedLogStorage(this, db, confHandle, dataHandle,
raftNodeStorageId, writeOptions, executorService);
+ return new RocksDbSharedLogStorage(this, db, metaHandle, confHandle,
dataHandle, raftNodeStorageId, writeOptions, executorService);
}
@Override
public void destroyLogStorage(String uri) {
- try {
+ try (WriteBatch writeBatch = new WriteBatch()) {
RocksDbSharedLogStorage.destroyAllEntriesBetween(
- db,
+ writeBatch,
confHandle,
dataHandle,
raftNodeStorageStartPrefix(uri),
raftNodeStorageEndPrefix(uri)
);
+
+ writeBatch.delete(metaHandle,
RocksDbSharedLogStorage.storageCreatedKey(uri));
+
+ db.write(this.writeOptions, writeBatch);
} catch (RocksDBException e) {
throw new LogStorageException("Fail to destroy the log storage for
" + uri, e);
}
@@ -339,4 +359,24 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
return opts;
}
+
+ @TestOnly
+ RocksDB db() {
+ return db;
+ }
+
+ @TestOnly
+ ColumnFamilyHandle metaColumnFamilyHandle() {
+ return metaHandle;
+ }
+
+ @TestOnly
+ ColumnFamilyHandle confColumnFamilyHandle() {
+ return confHandle;
+ }
+
+ @TestOnly
+ ColumnFamilyHandle dataColumnFamilyHandle() {
+ return dataHandle;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java
new file mode 100644
index 00000000000..5578c264ecb
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory.FINISHED_META_MIGRATION_META_KEY;
+import static
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageEndPrefix;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Implements migration of log storage metadata. Namely, it finds all log
storages for which at least one key (in either Configuration
+ * or default column families) exists, and marks them as 'created', so support
the invariant that for a log storage on which init()
+ * is called the corresponding 'created' key is in RocksDB.
+ */
+class MetadataMigration {
+ private static final IgniteLogger LOG =
Loggers.forClass(MetadataMigration.class);
+
+ private final RocksDB db;
+
+ private final WriteOptions writeOptions;
+
+ private final ColumnFamilyHandle metaHandle;
+ private final ColumnFamilyHandle confHandle;
+ private final ColumnFamilyHandle dataHandle;
+
+ /** Constructor. */
+ MetadataMigration(
+ RocksDB db,
+ WriteOptions writeOptions,
+ ColumnFamilyHandle metaHandle,
+ ColumnFamilyHandle confHandle,
+ ColumnFamilyHandle dataHandle
+ ) {
+ this.db = db;
+ this.writeOptions = writeOptions;
+ this.metaHandle = metaHandle;
+ this.confHandle = confHandle;
+ this.dataHandle = dataHandle;
+ }
+
+ void migrateIfNeeded() throws RocksDBException {
+ if (metaMigrationIsFinished()) {
+ return;
+ }
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ boolean migratingSomething = doMigration(writeBatch);
+
+ markMetaMigrationAsFinished(writeBatch);
+
+ db.write(writeOptions, writeBatch);
+
+ if (migratingSomething) {
+ LOG.info("Metadata migration was performed and touched some
log storages.");
+ }
+ }
+ }
+
+ private boolean metaMigrationIsFinished() throws RocksDBException {
+ return db.get(metaHandle, FINISHED_META_MIGRATION_META_KEY) != null;
+ }
+
+ private boolean doMigration(WriteBatch writeBatch) throws RocksDBException
{
+ boolean migratingSomething = false;
+
+ for (String groupIdForStorage : raftNodeStorageIdsOnDisk()) {
+ RocksDbSharedLogStorage.saveStorageStartedFlag(metaHandle,
groupIdForStorage, writeBatch);
+
+ migratingSomething = true;
+ }
+
+ return migratingSomething;
+ }
+
+ Set<String> raftNodeStorageIdsOnDisk() throws RocksDBException {
+ Set<String> groupIdsForStorage = new HashSet<>();
+
+ try (
+ RocksIterator confIt = db.newIterator(confHandle);
+ RocksIterator dataIt = db.newIterator(dataHandle)
+ ) {
+ confIt.seekToFirst();
+ dataIt.seekToFirst();
+
+ // Iterate both configuration and data column families in parallel
to find all groupIds.
+ // When we find a new groupId, we put it to the set and then we
seek to next groupId to skip
+ // all keys that belong to the one we just found.
+ // For some groups, it is possible that only configuration CF or
only data CF contain data for that
+ // group, so we handle this situation appropriately.
+ while (confIt.isValid() || dataIt.isValid()) {
+ if (confIt.isValid() && dataIt.isValid()) {
+ byte[] confKey = confIt.key();
+ byte[] dataKey = dataIt.key();
+
+ int confToDataComparison = Arrays.compare(confKey,
dataKey);
+ if (confToDataComparison <= 0) {
+ String idForStorage =
handleGroupIdIteratorEntry(confIt, confKey, groupIdsForStorage);
+
+ if (confToDataComparison == 0) {
+ skipToNextGroupKey(dataIt, idForStorage);
+ }
+ } else {
+ handleGroupIdIteratorEntry(dataIt, dataKey,
groupIdsForStorage);
+ }
+ } else {
+ // Just one is valid.
+ RocksIterator it = confIt.isValid() ? confIt : dataIt;
+ assert it.isValid();
+
+ handleGroupIdIteratorEntry(it, it.key(),
groupIdsForStorage);
+ }
+ }
+
+ // Doing this to make an exception thrown if the iteration was
stopped due to an error and not due to exhausting
+ // the iteration space.
+ confIt.status();
+ dataIt.status();
+ }
+
+ return Set.copyOf(groupIdsForStorage);
+ }
+
+ private static String handleGroupIdIteratorEntry(RocksIterator it, byte[]
currentKey, Set<String> groupIdsForStorage) {
+ int indexOfZero = indexOf((byte) 0, currentKey);
+ assert indexOfZero >= 0 : new String(currentKey, UTF_8) + " does not
have a zero byte";
+
+ String idForStorage = new String(currentKey, 0, indexOfZero, UTF_8);
+ groupIdsForStorage.add(idForStorage);
+
+ skipToNextGroupKey(it, idForStorage);
+
+ return idForStorage;
+ }
+
+ private static int indexOf(byte needle, byte[] haystack) {
+ for (int i = 0; i < haystack.length; i++) {
+ if (haystack[i] == needle) {
+ return i;
+ }
+ }
+
+ return -1;
+ }
+
+ private static void skipToNextGroupKey(RocksIterator it, String
idForStorage) {
+ it.seek(raftNodeStorageEndPrefix(idForStorage));
+ }
+
+ private void markMetaMigrationAsFinished(WriteBatch writeBatch) throws
RocksDBException {
+ writeBatch.put(metaHandle, FINISHED_META_MIGRATION_META_KEY,
ArrayUtils.BYTE_EMPTY_ARRAY);
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
index 7ab86b6cc72..f067083e3e1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
@@ -17,9 +17,12 @@
package org.apache.ignite.internal.raft.storage.impl;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.copyOfRange;
import static
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageEndPrefix;
import static
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageStartPrefix;
+import static org.apache.ignite.internal.util.ArrayUtils.concat;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -31,8 +34,10 @@ import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
@@ -47,6 +52,7 @@ import org.apache.ignite.raft.jraft.util.BytesUtil;
import org.apache.ignite.raft.jraft.util.Describer;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
@@ -77,6 +83,8 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
ByteOrder.BIG_ENDIAN
);
+ private static final long INITIAL_INDEX = 1L;
+
/**
* First log index and last log index key in configuration column family.
*/
@@ -88,6 +96,8 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
/** Shared db instance. */
private final RocksDB db;
+ private final ColumnFamilyHandle metaHandle;
+
/** Shared configuration column family handle. */
private final ColumnFamilyHandle confHandle;
@@ -97,6 +107,8 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
/** Shared write options. */
private final WriteOptions writeOptions;
+ private final String raftNodeStorageId;
+
/** Start prefix. */
private final byte[] startPrefix;
@@ -131,7 +143,7 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
private LogEntryDecoder logEntryDecoder;
/** First log index. */
- private volatile long firstLogIndex = 1;
+ private volatile long firstLogIndex = INITIAL_INDEX;
/** First log index loaded flag. */
private volatile boolean hasLoadFirstLogIndex;
@@ -140,6 +152,7 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
RocksDbSharedLogStorage(
DefaultLogStorageFactory logStorageFactory,
RocksDB db,
+ ColumnFamilyHandle metaHandle,
ColumnFamilyHandle confHandle,
ColumnFamilyHandle dataHandle,
String raftNodeStorageId,
@@ -162,9 +175,11 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
this.logStorageFactory = logStorageFactory;
this.db = db;
+ this.metaHandle = metaHandle;
this.confHandle = confHandle;
this.dataHandle = dataHandle;
this.executor = executor;
+ this.raftNodeStorageId = raftNodeStorageId;
this.startPrefix = raftNodeStorageStartPrefix(raftNodeStorageId);
this.endPrefix = raftNodeStorageEndPrefix(raftNodeStorageId);
this.startBound = new Slice(startPrefix);
@@ -172,11 +187,8 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
this.writeOptions = writeOptions;
}
- /**
- * Returns the log factory instance, that created current log storage.
- */
- DefaultLogStorageFactory getLogStorageFactory() {
- return logStorageFactory;
+ static byte[] storageCreatedKey(String raftNodeStorageId) {
+ return concat(DefaultLogStorageFactory.STORAGE_CREATED_META_PREFIX,
raftNodeStorageId.getBytes(UTF_8));
}
/** {@inheritDoc} */
@@ -191,15 +203,33 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
Requires.requireNonNull(this.logEntryDecoder, "Null log entry
decoder");
Requires.requireNonNull(this.logEntryEncoder, "Null log entry
encoder");
+ saveStorageCreatedFlag();
+
return initAndLoad(opts.getConfigurationManager());
} finally {
this.manageLock.unlock();
}
}
+ private void saveStorageCreatedFlag() {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ saveStorageStartedFlag(metaHandle, raftNodeStorageId, writeBatch);
+
+ db.write(writeOptions, writeBatch);
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, e);
+ }
+ }
+
+ static void saveStorageStartedFlag(ColumnFamilyHandle metaHandle, String
raftNodeStorageId, WriteBatch writeBatch)
+ throws RocksDBException {
+ byte[] storageCreatedKey = storageCreatedKey(raftNodeStorageId);
+ writeBatch.put(metaHandle, storageCreatedKey,
ArrayUtils.BYTE_EMPTY_ARRAY);
+ }
+
private boolean initAndLoad(ConfigurationManager configurationManager) {
this.hasLoadFirstLogIndex = false;
- this.firstLogIndex = 1;
+ this.firstLogIndex = INITIAL_INDEX;
load(configurationManager);
return onInitLoaded();
}
@@ -212,7 +242,7 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
it.seek(startPrefix);
while (it.isValid()) {
byte[] keyWithPrefix = it.key();
- byte[] ks = getKey(keyWithPrefix);
+ byte[] ks = extractKey(keyWithPrefix);
byte[] bs = it.value();
// LogEntry index
@@ -251,7 +281,11 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
}
}
- private byte[] getKey(byte[] ks) {
+ private byte[] extractKey(byte[] ks) {
+ return extractKey(ks, startPrefix);
+ }
+
+ private static byte[] extractKey(byte[] ks, byte[] startPrefix) {
return copyOfRange(ks, startPrefix.length, ks.length);
}
@@ -266,7 +300,7 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
private boolean saveFirstLogIndex(long firstLogIndex) {
this.useLock.lock();
try {
- byte[] vs = new byte[8];
+ byte[] vs = new byte[Long.BYTES];
LONG_ARRAY_HANDLE.set(vs, 0, firstLogIndex);
this.db.put(this.confHandle, this.writeOptions,
createKey(FIRST_LOG_IDX_KEY), vs);
return true;
@@ -313,14 +347,14 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
it.seek(startPrefix);
if (it.isValid()) {
- byte[] key = getKey(it.key());
+ byte[] key = extractKey(it.key());
long ret = (long) LONG_ARRAY_HANDLE.get(key, 0);
saveFirstLogIndex(ret);
setFirstLogIndex(ret);
return ret;
}
- return 1L;
+ return INITIAL_INDEX;
}
} finally {
this.useLock.unlock();
@@ -339,7 +373,7 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
it.seekForPrev(endPrefix);
if (it.isValid()) {
- byte[] key = getKey(it.key());
+ byte[] key = extractKey(it.key());
return (long) LONG_ARRAY_HANDLE.get(key, 0);
}
@@ -351,7 +385,7 @@ public class RocksDbSharedLogStorage implements LogStorage,
Describer {
/** {@inheritDoc} */
@Override
- public LogEntry getEntry(long index) {
+ public @Nullable LogEntry getEntry(long index) {
this.useLock.lock();
try {
if (this.hasLoadFirstLogIndex && index < this.firstLogIndex) {
@@ -552,7 +586,12 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
try {
LogEntry entry = getEntry(nextLogIndex);
- destroyAllEntriesBetween(db, confHandle, dataHandle, startPrefix,
endPrefix);
+
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ destroyAllEntriesBetween(writeBatch, confHandle, dataHandle,
startPrefix, endPrefix);
+
+ db.write(this.writeOptions, writeBatch);
+ }
onReset(nextLogIndex);
@@ -576,14 +615,14 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
}
static void destroyAllEntriesBetween(
- RocksDB db,
+ WriteBatch writeBatch,
ColumnFamilyHandle confHandle,
ColumnFamilyHandle dataHandle,
byte[] startPrefix,
byte[] endPrefix
) throws RocksDBException {
- db.deleteRange(dataHandle, startPrefix, endPrefix);
- db.deleteRange(confHandle, startPrefix, endPrefix);
+ writeBatch.deleteRange(dataHandle, startPrefix, endPrefix);
+ writeBatch.deleteRange(confHandle, startPrefix, endPrefix);
}
/** {@inheritDoc} */
@@ -683,12 +722,11 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
@SuppressWarnings("SameParameterValue")
private byte[] createKey(byte[] key) {
- var buffer = new byte[startPrefix.length + key.length];
-
- System.arraycopy(startPrefix, 0, buffer, 0, startPrefix.length);
- System.arraycopy(key, 0, buffer, startPrefix.length, key.length);
+ return createKey(startPrefix, key);
+ }
- return buffer;
+ static byte[] createKey(byte[] startPrefix, byte[] key) {
+ return concat(startPrefix, key);
}
private byte[] createKey(long index) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
new file mode 100644
index 00000000000..0cb4da55f14
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.codec.DefaultLogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(WorkDirectoryExtension.class)
+class DefaultLogStorageFactoryTest {
+ @WorkDirectory
+ private Path workDir;
+
+ private DefaultLogStorageFactory logStorageFactory;
+
+ private final LogStorageOptions logStorageOptions = new
LogStorageOptions();
+
+ private final Peer peer = new Peer("127.0.0.1");
+
+ @BeforeEach
+ void setUp() {
+ logStorageOptions.setConfigurationManager(new ConfigurationManager());
+
logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance());
+
+ logStorageFactory = new DefaultLogStorageFactory(workDir);
+
+ startFactory();
+ }
+
+ private void startFactory() {
+ assertThat(logStorageFactory.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (logStorageFactory != null) {
+ stopFactory();
+ }
+ }
+
+ private void stopFactory() {
+ assertThat(logStorageFactory.stopAsync(), willCompleteSuccessfully());
+ }
+
+ @Test
+ void metadataMigrationFindsGroupsHavingOnlyConfigurationEntries() throws
Exception {
+ ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+ ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+ LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+ LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+ logStorage1.appendEntry(configLogEntry(1));
+ logStorage3.appendEntry(configLogEntry(10));
+
+ Set<String> ids =
logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk();
+
+ assertThat(
+ ids,
+ containsInAnyOrder(
+ nodeIdStringForStorage(groupId1),
+ nodeIdStringForStorage(groupId3)
+ )
+ );
+ }
+
+ private LogStorage createAndInitLogStorage(ZonePartitionId groupId) {
+ LogStorage logStorage =
logStorageFactory.createLogStorage(nodeIdStringForStorage(groupId), new
RaftOptions());
+ logStorage.init(logStorageOptions);
+ return logStorage;
+ }
+
+ private String nodeIdStringForStorage(ZonePartitionId groupId) {
+ return new RaftNodeId(groupId, peer).nodeIdStringForStorage();
+ }
+
+ private static LogEntry configLogEntry(int index) {
+ LogEntry logEntry = new LogEntry();
+
+ logEntry.setId(new LogId(index, 1));
+ logEntry.setType(EntryType.ENTRY_TYPE_CONFIGURATION);
+ logEntry.setPeers(List.of(new PeerId("a")));
+
+ return logEntry;
+ }
+
+ @Test
+ void metadataMigrationFindsGroupsHavingOnlyDataEntries() throws Exception {
+ ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+ ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+ LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+ LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+ logStorage1.appendEntry(dataLogEntry(1));
+ logStorage3.appendEntry(dataLogEntry(10));
+
+ Set<String> ids =
logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk();
+
+ assertThat(
+ ids,
+ containsInAnyOrder(
+ nodeIdStringForStorage(groupId1),
+ nodeIdStringForStorage(groupId3)
+ )
+ );
+ }
+
+ private static LogEntry dataLogEntry(int index) {
+ LogEntry logEntry = new LogEntry();
+
+ logEntry.setId(new LogId(index, 1));
+ logEntry.setType(EntryType.ENTRY_TYPE_DATA);
+ logEntry.setData(ByteBuffer.wrap(new byte[0]));
+
+ return logEntry;
+ }
+
+ @Test
+ void metadataMigrationFindsGroupsHavingBothConfigurationAndDataEntries()
throws Exception {
+ ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+ ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+ LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+ LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+ logStorage1.appendEntry(configLogEntry(1));
+ logStorage1.appendEntry(dataLogEntry(2));
+ logStorage3.appendEntry(configLogEntry(10));
+ logStorage3.appendEntry(dataLogEntry(11));
+
+ Set<String> ids =
logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk();
+
+ assertThat(
+ ids,
+ containsInAnyOrder(
+ nodeIdStringForStorage(groupId1),
+ nodeIdStringForStorage(groupId3)
+ )
+ );
+ }
+
+ @Test
+ void metadataMigrationSavesStorageCreatedFlagInMetaColumnFamily() throws
Exception {
+ ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+ ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+ LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+ LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+ logStorage1.appendEntry(configLogEntry(10));
+ logStorage1.appendEntry(dataLogEntry(11));
+ logStorage3.appendEntry(configLogEntry(100));
+ logStorage3.appendEntry(dataLogEntry(101));
+
+
logStorageFactory.db().dropColumnFamily(logStorageFactory.metaColumnFamilyHandle());
+
logStorageFactory.db().destroyColumnFamilyHandle(logStorageFactory.metaColumnFamilyHandle());
+
+ // Restart causes a migration.
+ stopFactory();
+ startFactory();
+
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(),
storageCreatedKey(groupId1)),
+ is(new byte[0])
+ );
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(),
storageCreatedKey(groupId3)),
+ is(new byte[0])
+ );
+ }
+
+ private byte[] storageCreatedKey(ZonePartitionId groupId) {
+ return ("\u0001" + nodeIdStringForStorage(groupId)).getBytes(UTF_8);
+ }
+
+ private byte[] entryKey(ZonePartitionId groupId, long index) {
+ return RocksDbSharedLogStorage.createKey(
+ (nodeIdStringForStorage(groupId) + "\0").getBytes(UTF_8),
+ ByteUtils.longToBytes(index)
+ );
+ }
+
+ @Test
+ void storageInitSavesStorageCreatedFlagInMetaColumnFamily() throws
Exception {
+ ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+ ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+ createAndInitLogStorage(groupId1);
+ createAndInitLogStorage(groupId3);
+
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(),
storageCreatedKey(groupId1)),
+ is(new byte[0])
+ );
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(),
storageCreatedKey(groupId3)),
+ is(new byte[0])
+ );
+ }
+
+ @Test
+ void storageDestructionRemovesAllItsKes() throws Exception {
+ ZonePartitionId groupId = new ZonePartitionId(3, 2);
+ LogStorage logStorage = createAndInitLogStorage(groupId);
+
+ logStorage.appendEntry(configLogEntry(100));
+
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(),
storageCreatedKey(groupId)),
+ is(new byte[0])
+ );
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.confColumnFamilyHandle(),
entryKey(groupId, 100)),
+ is(notNullValue())
+ );
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.dataColumnFamilyHandle(),
entryKey(groupId, 100)),
+ is(notNullValue())
+ );
+
+ logStorageFactory.destroyLogStorage(nodeIdStringForStorage(groupId));
+
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(),
storageCreatedKey(groupId)),
+ is(nullValue())
+ );
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.confColumnFamilyHandle(),
entryKey(groupId, 100)),
+ is(nullValue())
+ );
+ assertThat(
+
logStorageFactory.db().get(logStorageFactory.dataColumnFamilyHandle(),
entryKey(groupId, 100)),
+ is(nullValue())
+ );
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
index a609bc58019..9015f926751 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
@@ -158,18 +158,13 @@ class ItRaftStorageVolatilityTest extends
ClusterPerTestIntegrationTest {
Path logRocksDbDir = ignite.partitionsWorkDir().raftLogPath();
- List<ColumnFamilyDescriptor> cfDescriptors = List.of(
- // Column family to store configuration log entry.
- new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8)),
- // Default column family to store user data log entry.
- new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY)
- );
+ List<ColumnFamilyDescriptor> cfDescriptors = cfDescriptors();
List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
try (RocksDB db = RocksDB.open(logRocksDbDir.toString(),
cfDescriptors, cfHandles)) {
- assertThatFamilyHasNoDataForPartition(db, tablePartitionPrefix,
cfHandles.get(0));
assertThatFamilyHasNoDataForPartition(db, tablePartitionPrefix,
cfHandles.get(1));
+ assertThatFamilyHasNoDataForPartition(db, tablePartitionPrefix,
cfHandles.get(2));
}
}
@@ -219,21 +214,26 @@ class ItRaftStorageVolatilityTest extends
ClusterPerTestIntegrationTest {
Path logRocksDbDir = ignite.partitionsWorkDir().raftLogPath();
- List<ColumnFamilyDescriptor> cfDescriptors = List.of(
- // Column family to store configuration log entry.
- new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8)),
- // Default column family to store user data log entry.
- new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY)
- );
+ List<ColumnFamilyDescriptor> cfDescriptors = cfDescriptors();
List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
try (RocksDB db = RocksDB.open(logRocksDbDir.toString(),
cfDescriptors, cfHandles)) {
- assertThatFamilyHasDataForPartition(db, partitionPrefix,
cfHandles.get(0));
assertThatFamilyHasDataForPartition(db, partitionPrefix,
cfHandles.get(1));
+ assertThatFamilyHasDataForPartition(db, partitionPrefix,
cfHandles.get(2));
}
}
+ private static List<ColumnFamilyDescriptor> cfDescriptors() {
+ return List.of(
+ new ColumnFamilyDescriptor("Meta".getBytes(UTF_8)),
+ // Column family to store configuration log entry.
+ new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8)),
+ // Default column family to store user data log entry.
+ new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY)
+ );
+ }
+
private static void assertThatFamilyHasDataForPartition(RocksDB db, String
tablePartitionPrefix, ColumnFamilyHandle cfHandle) {
try (
ReadOptions readOptions = new
ReadOptions().setIterateLowerBound(new
Slice(tablePartitionPrefix.getBytes(UTF_8)));