This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch ignite-25947 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 8ee588ddcf240ed3e781c914d0a0616896530a8c Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Jul 17 19:17:22 2025 +0400 IGNITE-25947 Add 'created' flag to RocksDB-based log storage --- .../storage/impl/DefaultLogStorageFactory.java | 52 +++- .../raft/storage/impl/MetadataMigration.java | 174 +++++++++++++ .../raft/storage/impl/RocksDbSharedLogStorage.java | 92 +++++-- .../storage/impl/DefaultLogStorageFactoryTest.java | 276 +++++++++++++++++++++ 4 files changed, 565 insertions(+), 29 deletions(-) diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java index 045240c9793..25fa321929e 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java @@ -65,6 +65,10 @@ import org.rocksdb.util.SizeUnit; public class DefaultLogStorageFactory implements LogStorageFactory { private static final IgniteLogger LOG = Loggers.forClass(DefaultLogStorageFactory.class); + static final byte[] FINISHED_META_MIGRATION_META_KEY = {0}; + + static final byte[] STORAGE_CREATED_META_PREFIX = {1}; + /** Name of the log factory, will be used in logs. */ private final String factoryName; @@ -86,6 +90,8 @@ public class DefaultLogStorageFactory implements LogStorageFactory { /** Write options to use in writes to database. */ private WriteOptions writeOptions; + private ColumnFamilyHandle metaHandle; + /** Configuration column family handle. */ private ColumnFamilyHandle confHandle; @@ -166,6 +172,8 @@ public class DefaultLogStorageFactory implements LogStorageFactory { this.flushListener = new LoggingRocksDbFlushListener(factoryName, nodeName); List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of( + // Column family to store metadata. + new ColumnFamilyDescriptor("Meta".getBytes(UTF_8), cfOption), // Column family to store configuration log entry. new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8), cfOption), // Default column family to store user data log entry. @@ -185,9 +193,13 @@ public class DefaultLogStorageFactory implements LogStorageFactory { // Setup background compactions pool env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(), Priority.LOW); - assert (columnFamilyHandles.size() == 2); - this.confHandle = columnFamilyHandles.get(0); - this.dataHandle = columnFamilyHandles.get(1); + assert (columnFamilyHandles.size() == 3); + this.metaHandle = columnFamilyHandles.get(0); + this.confHandle = columnFamilyHandles.get(1); + this.dataHandle = columnFamilyHandles.get(2); + + MetadataMigration metadataMigration = metadataMigration(); + metadataMigration.migrateIfNeeded(); } catch (Exception e) { closeRocksResources(); @@ -195,6 +207,10 @@ public class DefaultLogStorageFactory implements LogStorageFactory { } } + MetadataMigration metadataMigration() { + return new MetadataMigration(db, writeOptions, metaHandle, confHandle, dataHandle); + } + @Override public CompletableFuture<Void> stopAsync(ComponentContext componentContext) { ExecutorServiceHelper.shutdownAndAwaitTermination(executorService); @@ -226,19 +242,23 @@ public class DefaultLogStorageFactory implements LogStorageFactory { public LogStorage createLogStorage(String raftNodeStorageId, RaftOptions raftOptions) { // raftOptions is ignored as fsync status is passed via dbOptions. - return new RocksDbSharedLogStorage(this, db, confHandle, dataHandle, raftNodeStorageId, writeOptions, executorService); + return new RocksDbSharedLogStorage(this, db, metaHandle, confHandle, dataHandle, raftNodeStorageId, writeOptions, executorService); } @Override public void destroyLogStorage(String uri) { - try { + try (WriteBatch writeBatch = new WriteBatch()) { RocksDbSharedLogStorage.destroyAllEntriesBetween( - db, + writeBatch, confHandle, dataHandle, raftNodeStorageStartPrefix(uri), raftNodeStorageEndPrefix(uri) ); + + writeBatch.delete(metaHandle, RocksDbSharedLogStorage.storageCreatedKey(uri)); + + db.write(this.writeOptions, writeBatch); } catch (RocksDBException e) { throw new LogStorageException("Fail to destroy the log storage for " + uri, e); } @@ -339,4 +359,24 @@ public class DefaultLogStorageFactory implements LogStorageFactory { return opts; } + + @TestOnly + RocksDB db() { + return db; + } + + @TestOnly + ColumnFamilyHandle metaColumnFamilyHandle() { + return metaHandle; + } + + @TestOnly + ColumnFamilyHandle confColumnFamilyHandle() { + return confHandle; + } + + @TestOnly + ColumnFamilyHandle dataColumnFamilyHandle() { + return dataHandle; + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java new file mode 100644 index 00000000000..ea8c51f7937 --- /dev/null +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.impl; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory.FINISHED_META_MIGRATION_META_KEY; +import static org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageEndPrefix; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ArrayUtils; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +/** + * Implements migration of log storage metadata. Namely, it finds all log storages for which at least one key (in either Configuration + * or default column families) exists, and marks them as 'created', so support the invariant that for a log storage on which init() + * is called the corresponding 'created' key is in RocksDB. + */ +class MetadataMigration { + private static final IgniteLogger LOG = Loggers.forClass(MetadataMigration.class); + + private final RocksDB db; + + private final WriteOptions writeOptions; + + private final ColumnFamilyHandle metaHandle; + private final ColumnFamilyHandle confHandle; + private final ColumnFamilyHandle dataHandle; + + /** Constructor. */ + MetadataMigration( + RocksDB db, + WriteOptions writeOptions, + ColumnFamilyHandle metaHandle, + ColumnFamilyHandle confHandle, + ColumnFamilyHandle dataHandle + ) { + this.db = db; + this.writeOptions = writeOptions; + this.metaHandle = metaHandle; + this.confHandle = confHandle; + this.dataHandle = dataHandle; + } + + void migrateIfNeeded() throws RocksDBException { + if (metaMigrationIsFinished()) { + return; + } + + try (WriteBatch writeBatch = new WriteBatch()) { + boolean migratingSomething = doMigration(writeBatch); + + markMetaMigrationAsFinished(writeBatch); + + db.write(writeOptions, writeBatch); + + if (migratingSomething) { + LOG.info("Metadata migration was performed and touched some log storages."); + } + } + } + + private boolean metaMigrationIsFinished() throws RocksDBException { + return db.get(metaHandle, FINISHED_META_MIGRATION_META_KEY) != null; + } + + private boolean doMigration(WriteBatch writeBatch) throws RocksDBException { + boolean migratingSomething = false; + + for (String groupIdForStorage : raftNodeStorageIdsOnDisk()) { + RocksDbSharedLogStorage.saveStorageStartedFlag(metaHandle, groupIdForStorage, writeBatch); + + migratingSomething = true; + } + + return migratingSomething; + } + + Set<String> raftNodeStorageIdsOnDisk() throws RocksDBException { + Set<String> groupIdsForStorage = new HashSet<>(); + + try ( + RocksIterator confIt = db.newIterator(confHandle); + RocksIterator dataIt = db.newIterator(dataHandle) + ) { + confIt.seekToFirst(); + dataIt.seekToFirst(); + + while (confIt.isValid() || dataIt.isValid()) { + if (confIt.isValid() && dataIt.isValid()) { + byte[] confKey = confIt.key(); + byte[] dataKey = dataIt.key(); + + int confToDataComparison = Arrays.compare(confKey, dataKey); + if (confToDataComparison <= 0) { + String idForStorage = handleGroupIdIteratorEntry(confIt, confKey, groupIdsForStorage); + + if (confToDataComparison == 0) { + skipToNextGroupKey(dataIt, idForStorage); + } + } else { + handleGroupIdIteratorEntry(dataIt, dataKey, groupIdsForStorage); + } + } else { + // Just one is valid. + RocksIterator it = confIt.isValid() ? confIt : dataIt; + assert it.isValid(); + + handleGroupIdIteratorEntry(it, it.key(), groupIdsForStorage); + } + } + + // Doing this to make an exception thrown if the iteration was stopped due to an error and not due to exhausting + // the iteration space. + confIt.status(); + dataIt.status(); + } + + return Set.copyOf(groupIdsForStorage); + } + + private static String handleGroupIdIteratorEntry(RocksIterator it, byte[] currentKey, Set<String> groupIdsForStorage) { + int indexOfZero = indexOf((byte) 0, currentKey); + assert indexOfZero >= 0 : new String(currentKey, UTF_8) + " does not have a zero byte"; + + String idForStorage = new String(currentKey, 0, indexOfZero, UTF_8); + groupIdsForStorage.add(idForStorage); + + skipToNextGroupKey(it, idForStorage); + + return idForStorage; + } + + private static int indexOf(byte needle, byte[] haystack) { + for (int i = 0; i < haystack.length; i++) { + if (haystack[i] == needle) { + return i; + } + } + + return -1; + } + + private static void skipToNextGroupKey(RocksIterator it, String idForStorage) { + it.seek(raftNodeStorageEndPrefix(idForStorage)); + } + + private void markMetaMigrationAsFinished(WriteBatch writeBatch) throws RocksDBException { + writeBatch.put(metaHandle, FINISHED_META_MIGRATION_META_KEY, ArrayUtils.BYTE_EMPTY_ARRAY); + } +} diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java index 7ab86b6cc72..28e8794e5c6 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.raft.storage.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.copyOfRange; import static org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageEndPrefix; import static org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageStartPrefix; +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import java.io.IOException; import java.lang.invoke.MethodHandles; @@ -31,8 +33,10 @@ import java.util.concurrent.Executor; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.raft.jraft.conf.Configuration; import org.apache.ignite.raft.jraft.conf.ConfigurationEntry; import org.apache.ignite.raft.jraft.conf.ConfigurationManager; @@ -47,6 +51,7 @@ import org.apache.ignite.raft.jraft.util.BytesUtil; import org.apache.ignite.raft.jraft.util.Describer; import org.apache.ignite.raft.jraft.util.Requires; import org.apache.ignite.raft.jraft.util.Utils; +import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; @@ -77,6 +82,8 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { ByteOrder.BIG_ENDIAN ); + private static final long INITIAL_INDEX = 1L; + /** * First log index and last log index key in configuration column family. */ @@ -88,6 +95,8 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { /** Shared db instance. */ private final RocksDB db; + private final ColumnFamilyHandle metaHandle; + /** Shared configuration column family handle. */ private final ColumnFamilyHandle confHandle; @@ -97,6 +106,8 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { /** Shared write options. */ private final WriteOptions writeOptions; + private final String raftNodeStorageId; + /** Start prefix. */ private final byte[] startPrefix; @@ -131,7 +142,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { private LogEntryDecoder logEntryDecoder; /** First log index. */ - private volatile long firstLogIndex = 1; + private volatile long firstLogIndex = INITIAL_INDEX; /** First log index loaded flag. */ private volatile boolean hasLoadFirstLogIndex; @@ -140,6 +151,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { RocksDbSharedLogStorage( DefaultLogStorageFactory logStorageFactory, RocksDB db, + ColumnFamilyHandle metaHandle, ColumnFamilyHandle confHandle, ColumnFamilyHandle dataHandle, String raftNodeStorageId, @@ -162,9 +174,11 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { this.logStorageFactory = logStorageFactory; this.db = db; + this.metaHandle = metaHandle; this.confHandle = confHandle; this.dataHandle = dataHandle; this.executor = executor; + this.raftNodeStorageId = raftNodeStorageId; this.startPrefix = raftNodeStorageStartPrefix(raftNodeStorageId); this.endPrefix = raftNodeStorageEndPrefix(raftNodeStorageId); this.startBound = new Slice(startPrefix); @@ -172,11 +186,17 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { this.writeOptions = writeOptions; } - /** - * Returns the log factory instance, that created current log storage. - */ - DefaultLogStorageFactory getLogStorageFactory() { - return logStorageFactory; + static byte[] storageCreatedKey(String raftNodeStorageId) { + return concat(DefaultLogStorageFactory.STORAGE_CREATED_META_PREFIX, raftNodeStorageId.getBytes(UTF_8)); + } + + private static byte[] concat(byte[] a, byte[] b) { + byte[] result = new byte[a.length + b.length]; + + System.arraycopy(a, 0, result, 0, a.length); + System.arraycopy(b, 0, result, a.length, b.length); + + return result; } /** {@inheritDoc} */ @@ -191,15 +211,33 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { Requires.requireNonNull(this.logEntryDecoder, "Null log entry decoder"); Requires.requireNonNull(this.logEntryEncoder, "Null log entry encoder"); + saveStorageCreatedFlag(); + return initAndLoad(opts.getConfigurationManager()); } finally { this.manageLock.unlock(); } } + private void saveStorageCreatedFlag() { + try (WriteBatch writeBatch = new WriteBatch()) { + saveStorageStartedFlag(metaHandle, raftNodeStorageId, writeBatch); + + db.write(writeOptions, writeBatch); + } catch (RocksDBException e) { + throw new IgniteInternalException(INTERNAL_ERR, e); + } + } + + static void saveStorageStartedFlag(ColumnFamilyHandle metaHandle, String raftNodeStorageId, WriteBatch writeBatch) + throws RocksDBException { + byte[] storageCreatedKey = storageCreatedKey(raftNodeStorageId); + writeBatch.put(metaHandle, storageCreatedKey, ArrayUtils.BYTE_EMPTY_ARRAY); + } + private boolean initAndLoad(ConfigurationManager configurationManager) { this.hasLoadFirstLogIndex = false; - this.firstLogIndex = 1; + this.firstLogIndex = INITIAL_INDEX; load(configurationManager); return onInitLoaded(); } @@ -212,7 +250,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { it.seek(startPrefix); while (it.isValid()) { byte[] keyWithPrefix = it.key(); - byte[] ks = getKey(keyWithPrefix); + byte[] ks = extractKey(keyWithPrefix); byte[] bs = it.value(); // LogEntry index @@ -251,7 +289,11 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { } } - private byte[] getKey(byte[] ks) { + private byte[] extractKey(byte[] ks) { + return extractKey(ks, startPrefix); + } + + private static byte[] extractKey(byte[] ks, byte[] startPrefix) { return copyOfRange(ks, startPrefix.length, ks.length); } @@ -266,7 +308,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { private boolean saveFirstLogIndex(long firstLogIndex) { this.useLock.lock(); try { - byte[] vs = new byte[8]; + byte[] vs = new byte[Long.BYTES]; LONG_ARRAY_HANDLE.set(vs, 0, firstLogIndex); this.db.put(this.confHandle, this.writeOptions, createKey(FIRST_LOG_IDX_KEY), vs); return true; @@ -313,14 +355,14 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { it.seek(startPrefix); if (it.isValid()) { - byte[] key = getKey(it.key()); + byte[] key = extractKey(it.key()); long ret = (long) LONG_ARRAY_HANDLE.get(key, 0); saveFirstLogIndex(ret); setFirstLogIndex(ret); return ret; } - return 1L; + return INITIAL_INDEX; } } finally { this.useLock.unlock(); @@ -339,7 +381,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { it.seekForPrev(endPrefix); if (it.isValid()) { - byte[] key = getKey(it.key()); + byte[] key = extractKey(it.key()); return (long) LONG_ARRAY_HANDLE.get(key, 0); } @@ -351,7 +393,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { /** {@inheritDoc} */ @Override - public LogEntry getEntry(long index) { + public @Nullable LogEntry getEntry(long index) { this.useLock.lock(); try { if (this.hasLoadFirstLogIndex && index < this.firstLogIndex) { @@ -552,7 +594,12 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { try { LogEntry entry = getEntry(nextLogIndex); - destroyAllEntriesBetween(db, confHandle, dataHandle, startPrefix, endPrefix); + + try (WriteBatch writeBatch = new WriteBatch()) { + destroyAllEntriesBetween(writeBatch, confHandle, dataHandle, startPrefix, endPrefix); + + db.write(this.writeOptions, writeBatch); + } onReset(nextLogIndex); @@ -576,14 +623,14 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { } static void destroyAllEntriesBetween( - RocksDB db, + WriteBatch writeBatch, ColumnFamilyHandle confHandle, ColumnFamilyHandle dataHandle, byte[] startPrefix, byte[] endPrefix ) throws RocksDBException { - db.deleteRange(dataHandle, startPrefix, endPrefix); - db.deleteRange(confHandle, startPrefix, endPrefix); + writeBatch.deleteRange(dataHandle, startPrefix, endPrefix); + writeBatch.deleteRange(confHandle, startPrefix, endPrefix); } /** {@inheritDoc} */ @@ -683,12 +730,11 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { @SuppressWarnings("SameParameterValue") private byte[] createKey(byte[] key) { - var buffer = new byte[startPrefix.length + key.length]; - - System.arraycopy(startPrefix, 0, buffer, 0, startPrefix.length); - System.arraycopy(key, 0, buffer, startPrefix.length, key.length); + return createKey(startPrefix, key); + } - return buffer; + static byte[] createKey(byte[] startPrefix, byte[] key) { + return concat(startPrefix, key); } private byte[] createKey(long index) { diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java new file mode 100644 index 00000000000..0cb4da55f14 --- /dev/null +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.impl; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.List; +import java.util.Set; +import org.apache.ignite.internal.manager.ComponentContext; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.testframework.WorkDirectory; +import org.apache.ignite.internal.testframework.WorkDirectoryExtension; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.raft.jraft.conf.ConfigurationManager; +import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType; +import org.apache.ignite.raft.jraft.entity.LogEntry; +import org.apache.ignite.raft.jraft.entity.LogId; +import org.apache.ignite.raft.jraft.entity.PeerId; +import org.apache.ignite.raft.jraft.entity.codec.DefaultLogEntryCodecFactory; +import org.apache.ignite.raft.jraft.option.LogStorageOptions; +import org.apache.ignite.raft.jraft.option.RaftOptions; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(WorkDirectoryExtension.class) +class DefaultLogStorageFactoryTest { + @WorkDirectory + private Path workDir; + + private DefaultLogStorageFactory logStorageFactory; + + private final LogStorageOptions logStorageOptions = new LogStorageOptions(); + + private final Peer peer = new Peer("127.0.0.1"); + + @BeforeEach + void setUp() { + logStorageOptions.setConfigurationManager(new ConfigurationManager()); + logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance()); + + logStorageFactory = new DefaultLogStorageFactory(workDir); + + startFactory(); + } + + private void startFactory() { + assertThat(logStorageFactory.startAsync(new ComponentContext()), willCompleteSuccessfully()); + } + + @AfterEach + void tearDown() { + if (logStorageFactory != null) { + stopFactory(); + } + } + + private void stopFactory() { + assertThat(logStorageFactory.stopAsync(), willCompleteSuccessfully()); + } + + @Test + void metadataMigrationFindsGroupsHavingOnlyConfigurationEntries() throws Exception { + ZonePartitionId groupId1 = new ZonePartitionId(1, 0); + ZonePartitionId groupId3 = new ZonePartitionId(3, 2); + LogStorage logStorage1 = createAndInitLogStorage(groupId1); + LogStorage logStorage3 = createAndInitLogStorage(groupId3); + + logStorage1.appendEntry(configLogEntry(1)); + logStorage3.appendEntry(configLogEntry(10)); + + Set<String> ids = logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk(); + + assertThat( + ids, + containsInAnyOrder( + nodeIdStringForStorage(groupId1), + nodeIdStringForStorage(groupId3) + ) + ); + } + + private LogStorage createAndInitLogStorage(ZonePartitionId groupId) { + LogStorage logStorage = logStorageFactory.createLogStorage(nodeIdStringForStorage(groupId), new RaftOptions()); + logStorage.init(logStorageOptions); + return logStorage; + } + + private String nodeIdStringForStorage(ZonePartitionId groupId) { + return new RaftNodeId(groupId, peer).nodeIdStringForStorage(); + } + + private static LogEntry configLogEntry(int index) { + LogEntry logEntry = new LogEntry(); + + logEntry.setId(new LogId(index, 1)); + logEntry.setType(EntryType.ENTRY_TYPE_CONFIGURATION); + logEntry.setPeers(List.of(new PeerId("a"))); + + return logEntry; + } + + @Test + void metadataMigrationFindsGroupsHavingOnlyDataEntries() throws Exception { + ZonePartitionId groupId1 = new ZonePartitionId(1, 0); + ZonePartitionId groupId3 = new ZonePartitionId(3, 2); + LogStorage logStorage1 = createAndInitLogStorage(groupId1); + LogStorage logStorage3 = createAndInitLogStorage(groupId3); + + logStorage1.appendEntry(dataLogEntry(1)); + logStorage3.appendEntry(dataLogEntry(10)); + + Set<String> ids = logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk(); + + assertThat( + ids, + containsInAnyOrder( + nodeIdStringForStorage(groupId1), + nodeIdStringForStorage(groupId3) + ) + ); + } + + private static LogEntry dataLogEntry(int index) { + LogEntry logEntry = new LogEntry(); + + logEntry.setId(new LogId(index, 1)); + logEntry.setType(EntryType.ENTRY_TYPE_DATA); + logEntry.setData(ByteBuffer.wrap(new byte[0])); + + return logEntry; + } + + @Test + void metadataMigrationFindsGroupsHavingBothConfigurationAndDataEntries() throws Exception { + ZonePartitionId groupId1 = new ZonePartitionId(1, 0); + ZonePartitionId groupId3 = new ZonePartitionId(3, 2); + LogStorage logStorage1 = createAndInitLogStorage(groupId1); + LogStorage logStorage3 = createAndInitLogStorage(groupId3); + + logStorage1.appendEntry(configLogEntry(1)); + logStorage1.appendEntry(dataLogEntry(2)); + logStorage3.appendEntry(configLogEntry(10)); + logStorage3.appendEntry(dataLogEntry(11)); + + Set<String> ids = logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk(); + + assertThat( + ids, + containsInAnyOrder( + nodeIdStringForStorage(groupId1), + nodeIdStringForStorage(groupId3) + ) + ); + } + + @Test + void metadataMigrationSavesStorageCreatedFlagInMetaColumnFamily() throws Exception { + ZonePartitionId groupId1 = new ZonePartitionId(1, 0); + ZonePartitionId groupId3 = new ZonePartitionId(3, 2); + LogStorage logStorage1 = createAndInitLogStorage(groupId1); + LogStorage logStorage3 = createAndInitLogStorage(groupId3); + + logStorage1.appendEntry(configLogEntry(10)); + logStorage1.appendEntry(dataLogEntry(11)); + logStorage3.appendEntry(configLogEntry(100)); + logStorage3.appendEntry(dataLogEntry(101)); + + logStorageFactory.db().dropColumnFamily(logStorageFactory.metaColumnFamilyHandle()); + logStorageFactory.db().destroyColumnFamilyHandle(logStorageFactory.metaColumnFamilyHandle()); + + // Restart causes a migration. + stopFactory(); + startFactory(); + + assertThat( + logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), storageCreatedKey(groupId1)), + is(new byte[0]) + ); + assertThat( + logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), storageCreatedKey(groupId3)), + is(new byte[0]) + ); + } + + private byte[] storageCreatedKey(ZonePartitionId groupId) { + return ("\u0001" + nodeIdStringForStorage(groupId)).getBytes(UTF_8); + } + + private byte[] entryKey(ZonePartitionId groupId, long index) { + return RocksDbSharedLogStorage.createKey( + (nodeIdStringForStorage(groupId) + "\0").getBytes(UTF_8), + ByteUtils.longToBytes(index) + ); + } + + @Test + void storageInitSavesStorageCreatedFlagInMetaColumnFamily() throws Exception { + ZonePartitionId groupId1 = new ZonePartitionId(1, 0); + ZonePartitionId groupId3 = new ZonePartitionId(3, 2); + createAndInitLogStorage(groupId1); + createAndInitLogStorage(groupId3); + + assertThat( + logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), storageCreatedKey(groupId1)), + is(new byte[0]) + ); + assertThat( + logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), storageCreatedKey(groupId3)), + is(new byte[0]) + ); + } + + @Test + void storageDestructionRemovesAllItsKes() throws Exception { + ZonePartitionId groupId = new ZonePartitionId(3, 2); + LogStorage logStorage = createAndInitLogStorage(groupId); + + logStorage.appendEntry(configLogEntry(100)); + + assertThat( + logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), storageCreatedKey(groupId)), + is(new byte[0]) + ); + assertThat( + logStorageFactory.db().get(logStorageFactory.confColumnFamilyHandle(), entryKey(groupId, 100)), + is(notNullValue()) + ); + assertThat( + logStorageFactory.db().get(logStorageFactory.dataColumnFamilyHandle(), entryKey(groupId, 100)), + is(notNullValue()) + ); + + logStorageFactory.destroyLogStorage(nodeIdStringForStorage(groupId)); + + assertThat( + logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), storageCreatedKey(groupId)), + is(nullValue()) + ); + assertThat( + logStorageFactory.db().get(logStorageFactory.confColumnFamilyHandle(), entryKey(groupId, 100)), + is(nullValue()) + ); + assertThat( + logStorageFactory.db().get(logStorageFactory.dataColumnFamilyHandle(), entryKey(groupId, 100)), + is(nullValue()) + ); + } +}