This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch ignite-25947
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 8ee588ddcf240ed3e781c914d0a0616896530a8c
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Thu Jul 17 19:17:22 2025 +0400

    IGNITE-25947 Add 'created' flag to RocksDB-based log storage
---
 .../storage/impl/DefaultLogStorageFactory.java     |  52 +++-
 .../raft/storage/impl/MetadataMigration.java       | 174 +++++++++++++
 .../raft/storage/impl/RocksDbSharedLogStorage.java |  92 +++++--
 .../storage/impl/DefaultLogStorageFactoryTest.java | 276 +++++++++++++++++++++
 4 files changed, 565 insertions(+), 29 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
index 045240c9793..25fa321929e 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
@@ -65,6 +65,10 @@ import org.rocksdb.util.SizeUnit;
 public class DefaultLogStorageFactory implements LogStorageFactory {
     private static final IgniteLogger LOG = 
Loggers.forClass(DefaultLogStorageFactory.class);
 
+    static final byte[] FINISHED_META_MIGRATION_META_KEY = {0};
+
+    static final byte[] STORAGE_CREATED_META_PREFIX = {1};
+
     /** Name of the log factory, will be used in logs. */
     private final String factoryName;
 
@@ -86,6 +90,8 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
     /** Write options to use in writes to database. */
     private WriteOptions writeOptions;
 
+    private ColumnFamilyHandle metaHandle;
+
     /** Configuration column family handle. */
     private ColumnFamilyHandle confHandle;
 
@@ -166,6 +172,8 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
         this.flushListener = new LoggingRocksDbFlushListener(factoryName, 
nodeName);
 
         List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of(
+                // Column family to store metadata.
+                new ColumnFamilyDescriptor("Meta".getBytes(UTF_8), cfOption),
                 // Column family to store configuration log entry.
                 new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8), 
cfOption),
                 // Default column family to store user data log entry.
@@ -185,9 +193,13 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
             // Setup background compactions pool
             
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(), 
Priority.LOW);
 
-            assert (columnFamilyHandles.size() == 2);
-            this.confHandle = columnFamilyHandles.get(0);
-            this.dataHandle = columnFamilyHandles.get(1);
+            assert (columnFamilyHandles.size() == 3);
+            this.metaHandle = columnFamilyHandles.get(0);
+            this.confHandle = columnFamilyHandles.get(1);
+            this.dataHandle = columnFamilyHandles.get(2);
+
+            MetadataMigration metadataMigration = metadataMigration();
+            metadataMigration.migrateIfNeeded();
         } catch (Exception e) {
             closeRocksResources();
 
@@ -195,6 +207,10 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
         }
     }
 
+    MetadataMigration metadataMigration() {
+        return new MetadataMigration(db, writeOptions, metaHandle, confHandle, 
dataHandle);
+    }
+
     @Override
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
         ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
@@ -226,19 +242,23 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
     public LogStorage createLogStorage(String raftNodeStorageId, RaftOptions 
raftOptions) {
         // raftOptions is ignored as fsync status is passed via dbOptions.
 
-        return new RocksDbSharedLogStorage(this, db, confHandle, dataHandle, 
raftNodeStorageId, writeOptions, executorService);
+        return new RocksDbSharedLogStorage(this, db, metaHandle, confHandle, 
dataHandle, raftNodeStorageId, writeOptions, executorService);
     }
 
     @Override
     public void destroyLogStorage(String uri) {
-        try {
+        try (WriteBatch writeBatch = new WriteBatch()) {
             RocksDbSharedLogStorage.destroyAllEntriesBetween(
-                    db,
+                    writeBatch,
                     confHandle,
                     dataHandle,
                     raftNodeStorageStartPrefix(uri),
                     raftNodeStorageEndPrefix(uri)
             );
+
+            writeBatch.delete(metaHandle, 
RocksDbSharedLogStorage.storageCreatedKey(uri));
+
+            db.write(this.writeOptions, writeBatch);
         } catch (RocksDBException e) {
             throw new LogStorageException("Fail to destroy the log storage for 
" + uri, e);
         }
@@ -339,4 +359,24 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
 
         return opts;
     }
+
+    @TestOnly
+    RocksDB db() {
+        return db;
+    }
+
+    @TestOnly
+    ColumnFamilyHandle metaColumnFamilyHandle() {
+        return metaHandle;
+    }
+
+    @TestOnly
+    ColumnFamilyHandle confColumnFamilyHandle() {
+        return confHandle;
+    }
+
+    @TestOnly
+    ColumnFamilyHandle dataColumnFamilyHandle() {
+        return dataHandle;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java
new file mode 100644
index 00000000000..ea8c51f7937
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/MetadataMigration.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory.FINISHED_META_MIGRATION_META_KEY;
+import static 
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageEndPrefix;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Implements migration of log storage metadata. Namely, it finds all log 
storages for which at least one key (in either Configuration
+ * or default column families) exists, and marks them as 'created', so support 
the invariant that for a log storage on which init()
+ * is called the corresponding 'created' key is in RocksDB.
+ */
+class MetadataMigration {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetadataMigration.class);
+
+    private final RocksDB db;
+
+    private final WriteOptions writeOptions;
+
+    private final ColumnFamilyHandle metaHandle;
+    private final ColumnFamilyHandle confHandle;
+    private final ColumnFamilyHandle dataHandle;
+
+    /** Constructor. */
+    MetadataMigration(
+            RocksDB db,
+            WriteOptions writeOptions,
+            ColumnFamilyHandle metaHandle,
+            ColumnFamilyHandle confHandle,
+            ColumnFamilyHandle dataHandle
+    ) {
+        this.db = db;
+        this.writeOptions = writeOptions;
+        this.metaHandle = metaHandle;
+        this.confHandle = confHandle;
+        this.dataHandle = dataHandle;
+    }
+
+    void migrateIfNeeded() throws RocksDBException {
+        if (metaMigrationIsFinished()) {
+            return;
+        }
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            boolean migratingSomething = doMigration(writeBatch);
+
+            markMetaMigrationAsFinished(writeBatch);
+
+            db.write(writeOptions, writeBatch);
+
+            if (migratingSomething) {
+                LOG.info("Metadata migration was performed and touched some 
log storages.");
+            }
+        }
+    }
+
+    private boolean metaMigrationIsFinished() throws RocksDBException {
+        return db.get(metaHandle, FINISHED_META_MIGRATION_META_KEY) != null;
+    }
+
+    private boolean doMigration(WriteBatch writeBatch) throws RocksDBException 
{
+        boolean migratingSomething = false;
+
+        for (String groupIdForStorage : raftNodeStorageIdsOnDisk()) {
+            RocksDbSharedLogStorage.saveStorageStartedFlag(metaHandle, 
groupIdForStorage, writeBatch);
+
+            migratingSomething = true;
+        }
+
+        return migratingSomething;
+    }
+
+    Set<String> raftNodeStorageIdsOnDisk() throws RocksDBException {
+        Set<String> groupIdsForStorage = new HashSet<>();
+
+        try (
+                RocksIterator confIt = db.newIterator(confHandle);
+                RocksIterator dataIt = db.newIterator(dataHandle)
+        ) {
+            confIt.seekToFirst();
+            dataIt.seekToFirst();
+
+            while (confIt.isValid() || dataIt.isValid()) {
+                if (confIt.isValid() && dataIt.isValid()) {
+                    byte[] confKey = confIt.key();
+                    byte[] dataKey = dataIt.key();
+
+                    int confToDataComparison = Arrays.compare(confKey, 
dataKey);
+                    if (confToDataComparison <= 0) {
+                        String idForStorage = 
handleGroupIdIteratorEntry(confIt, confKey, groupIdsForStorage);
+
+                        if (confToDataComparison == 0) {
+                            skipToNextGroupKey(dataIt, idForStorage);
+                        }
+                    } else {
+                        handleGroupIdIteratorEntry(dataIt, dataKey, 
groupIdsForStorage);
+                    }
+                } else {
+                    // Just one is valid.
+                    RocksIterator it = confIt.isValid() ? confIt : dataIt;
+                    assert it.isValid();
+
+                    handleGroupIdIteratorEntry(it, it.key(), 
groupIdsForStorage);
+                }
+            }
+
+            // Doing this to make an exception thrown if the iteration was 
stopped due to an error and not due to exhausting
+            // the iteration space.
+            confIt.status();
+            dataIt.status();
+        }
+
+        return Set.copyOf(groupIdsForStorage);
+    }
+
+    private static String handleGroupIdIteratorEntry(RocksIterator it, byte[] 
currentKey, Set<String> groupIdsForStorage) {
+        int indexOfZero = indexOf((byte) 0, currentKey);
+        assert indexOfZero >= 0 : new String(currentKey, UTF_8) + " does not 
have a zero byte";
+
+        String idForStorage = new String(currentKey, 0, indexOfZero, UTF_8);
+        groupIdsForStorage.add(idForStorage);
+
+        skipToNextGroupKey(it, idForStorage);
+
+        return idForStorage;
+    }
+
+    private static int indexOf(byte needle, byte[] haystack) {
+        for (int i = 0; i < haystack.length; i++) {
+            if (haystack[i] == needle) {
+                return i;
+            }
+        }
+
+        return -1;
+    }
+
+    private static void skipToNextGroupKey(RocksIterator it, String 
idForStorage) {
+        it.seek(raftNodeStorageEndPrefix(idForStorage));
+    }
+
+    private void markMetaMigrationAsFinished(WriteBatch writeBatch) throws 
RocksDBException {
+        writeBatch.put(metaHandle, FINISHED_META_MIGRATION_META_KEY, 
ArrayUtils.BYTE_EMPTY_ARRAY);
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
index 7ab86b6cc72..28e8794e5c6 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.raft.storage.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Arrays.copyOfRange;
 import static 
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageEndPrefix;
 import static 
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageStartPrefix;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -31,8 +33,10 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
@@ -47,6 +51,7 @@ import org.apache.ignite.raft.jraft.util.BytesUtil;
 import org.apache.ignite.raft.jraft.util.Describer;
 import org.apache.ignite.raft.jraft.util.Requires;
 import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.Nullable;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
@@ -77,6 +82,8 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
             ByteOrder.BIG_ENDIAN
     );
 
+    private static final long INITIAL_INDEX = 1L;
+
     /**
      * First log index and last log index key in configuration column family.
      */
@@ -88,6 +95,8 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
     /** Shared db instance. */
     private final RocksDB db;
 
+    private final ColumnFamilyHandle metaHandle;
+
     /** Shared configuration column family handle. */
     private final ColumnFamilyHandle confHandle;
 
@@ -97,6 +106,8 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
     /** Shared write options. */
     private final WriteOptions writeOptions;
 
+    private final String raftNodeStorageId;
+
     /** Start prefix. */
     private final byte[] startPrefix;
 
@@ -131,7 +142,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
     private LogEntryDecoder logEntryDecoder;
 
     /** First log index. */
-    private volatile long firstLogIndex = 1;
+    private volatile long firstLogIndex = INITIAL_INDEX;
 
     /** First log index loaded flag. */
     private volatile boolean hasLoadFirstLogIndex;
@@ -140,6 +151,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
     RocksDbSharedLogStorage(
             DefaultLogStorageFactory logStorageFactory,
             RocksDB db,
+            ColumnFamilyHandle metaHandle,
             ColumnFamilyHandle confHandle,
             ColumnFamilyHandle dataHandle,
             String raftNodeStorageId,
@@ -162,9 +174,11 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
 
         this.logStorageFactory = logStorageFactory;
         this.db = db;
+        this.metaHandle = metaHandle;
         this.confHandle = confHandle;
         this.dataHandle = dataHandle;
         this.executor = executor;
+        this.raftNodeStorageId = raftNodeStorageId;
         this.startPrefix = raftNodeStorageStartPrefix(raftNodeStorageId);
         this.endPrefix = raftNodeStorageEndPrefix(raftNodeStorageId);
         this.startBound = new Slice(startPrefix);
@@ -172,11 +186,17 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         this.writeOptions = writeOptions;
     }
 
-    /**
-     * Returns the log factory instance, that created current log storage.
-     */
-    DefaultLogStorageFactory getLogStorageFactory() {
-        return logStorageFactory;
+    static byte[] storageCreatedKey(String raftNodeStorageId) {
+        return concat(DefaultLogStorageFactory.STORAGE_CREATED_META_PREFIX, 
raftNodeStorageId.getBytes(UTF_8));
+    }
+
+    private static byte[] concat(byte[] a, byte[] b) {
+        byte[] result = new byte[a.length + b.length];
+
+        System.arraycopy(a, 0, result, 0, a.length);
+        System.arraycopy(b, 0, result, a.length, b.length);
+
+        return result;
     }
 
     /** {@inheritDoc} */
@@ -191,15 +211,33 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
             Requires.requireNonNull(this.logEntryDecoder, "Null log entry 
decoder");
             Requires.requireNonNull(this.logEntryEncoder, "Null log entry 
encoder");
 
+            saveStorageCreatedFlag();
+
             return initAndLoad(opts.getConfigurationManager());
         } finally {
             this.manageLock.unlock();
         }
     }
 
+    private void saveStorageCreatedFlag() {
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            saveStorageStartedFlag(metaHandle, raftNodeStorageId, writeBatch);
+
+            db.write(writeOptions, writeBatch);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException(INTERNAL_ERR, e);
+        }
+    }
+
+    static void saveStorageStartedFlag(ColumnFamilyHandle metaHandle, String 
raftNodeStorageId, WriteBatch writeBatch)
+            throws RocksDBException {
+        byte[] storageCreatedKey = storageCreatedKey(raftNodeStorageId);
+        writeBatch.put(metaHandle, storageCreatedKey, 
ArrayUtils.BYTE_EMPTY_ARRAY);
+    }
+
     private boolean initAndLoad(ConfigurationManager configurationManager) {
         this.hasLoadFirstLogIndex = false;
-        this.firstLogIndex = 1;
+        this.firstLogIndex = INITIAL_INDEX;
         load(configurationManager);
         return onInitLoaded();
     }
@@ -212,7 +250,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
             it.seek(startPrefix);
             while (it.isValid()) {
                 byte[] keyWithPrefix = it.key();
-                byte[] ks = getKey(keyWithPrefix);
+                byte[] ks = extractKey(keyWithPrefix);
                 byte[] bs = it.value();
 
                 // LogEntry index
@@ -251,7 +289,11 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         }
     }
 
-    private byte[] getKey(byte[] ks) {
+    private byte[] extractKey(byte[] ks) {
+        return extractKey(ks, startPrefix);
+    }
+
+    private static byte[] extractKey(byte[] ks, byte[] startPrefix) {
         return copyOfRange(ks, startPrefix.length, ks.length);
     }
 
@@ -266,7 +308,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
     private boolean saveFirstLogIndex(long firstLogIndex) {
         this.useLock.lock();
         try {
-            byte[] vs = new byte[8];
+            byte[] vs = new byte[Long.BYTES];
             LONG_ARRAY_HANDLE.set(vs, 0, firstLogIndex);
             this.db.put(this.confHandle, this.writeOptions, 
createKey(FIRST_LOG_IDX_KEY), vs);
             return true;
@@ -313,14 +355,14 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
                 it.seek(startPrefix);
 
                 if (it.isValid()) {
-                    byte[] key = getKey(it.key());
+                    byte[] key = extractKey(it.key());
                     long ret = (long) LONG_ARRAY_HANDLE.get(key, 0);
                     saveFirstLogIndex(ret);
                     setFirstLogIndex(ret);
                     return ret;
                 }
 
-                return 1L;
+                return INITIAL_INDEX;
             }
         } finally {
             this.useLock.unlock();
@@ -339,7 +381,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
             it.seekForPrev(endPrefix);
 
             if (it.isValid()) {
-                byte[] key = getKey(it.key());
+                byte[] key = extractKey(it.key());
                 return (long) LONG_ARRAY_HANDLE.get(key, 0);
             }
 
@@ -351,7 +393,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
 
     /** {@inheritDoc} */
     @Override
-    public LogEntry getEntry(long index) {
+    public @Nullable LogEntry getEntry(long index) {
         this.useLock.lock();
         try {
             if (this.hasLoadFirstLogIndex && index < this.firstLogIndex) {
@@ -552,7 +594,12 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
 
         try {
             LogEntry entry = getEntry(nextLogIndex);
-            destroyAllEntriesBetween(db, confHandle, dataHandle, startPrefix, 
endPrefix);
+
+            try (WriteBatch writeBatch = new WriteBatch()) {
+                destroyAllEntriesBetween(writeBatch, confHandle, dataHandle, 
startPrefix, endPrefix);
+
+                db.write(this.writeOptions, writeBatch);
+            }
 
             onReset(nextLogIndex);
 
@@ -576,14 +623,14 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
     }
 
     static void destroyAllEntriesBetween(
-            RocksDB db,
+            WriteBatch writeBatch,
             ColumnFamilyHandle confHandle,
             ColumnFamilyHandle dataHandle,
             byte[] startPrefix,
             byte[] endPrefix
     ) throws RocksDBException {
-        db.deleteRange(dataHandle, startPrefix, endPrefix);
-        db.deleteRange(confHandle, startPrefix, endPrefix);
+        writeBatch.deleteRange(dataHandle, startPrefix, endPrefix);
+        writeBatch.deleteRange(confHandle, startPrefix, endPrefix);
     }
 
     /** {@inheritDoc} */
@@ -683,12 +730,11 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
 
     @SuppressWarnings("SameParameterValue")
     private byte[] createKey(byte[] key) {
-        var buffer = new byte[startPrefix.length + key.length];
-
-        System.arraycopy(startPrefix, 0, buffer, 0, startPrefix.length);
-        System.arraycopy(key, 0, buffer, startPrefix.length, key.length);
+        return createKey(startPrefix, key);
+    }
 
-        return buffer;
+    static byte[] createKey(byte[] startPrefix, byte[] key) {
+        return concat(startPrefix, key);
     }
 
     private byte[] createKey(long index) {
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
new file mode 100644
index 00000000000..0cb4da55f14
--- /dev/null
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.codec.DefaultLogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(WorkDirectoryExtension.class)
+class DefaultLogStorageFactoryTest {
+    @WorkDirectory
+    private Path workDir;
+
+    private DefaultLogStorageFactory logStorageFactory;
+
+    private final LogStorageOptions logStorageOptions = new 
LogStorageOptions();
+
+    private final Peer peer = new Peer("127.0.0.1");
+
+    @BeforeEach
+    void setUp() {
+        logStorageOptions.setConfigurationManager(new ConfigurationManager());
+        
logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance());
+
+        logStorageFactory = new DefaultLogStorageFactory(workDir);
+
+        startFactory();
+    }
+
+    private void startFactory() {
+        assertThat(logStorageFactory.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (logStorageFactory != null) {
+            stopFactory();
+        }
+    }
+
+    private void stopFactory() {
+        assertThat(logStorageFactory.stopAsync(), willCompleteSuccessfully());
+    }
+
+    @Test
+    void metadataMigrationFindsGroupsHavingOnlyConfigurationEntries() throws 
Exception {
+        ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+        ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+        LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+        LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+        logStorage1.appendEntry(configLogEntry(1));
+        logStorage3.appendEntry(configLogEntry(10));
+
+        Set<String> ids = 
logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk();
+
+        assertThat(
+                ids,
+                containsInAnyOrder(
+                        nodeIdStringForStorage(groupId1),
+                        nodeIdStringForStorage(groupId3)
+                )
+        );
+    }
+
+    private LogStorage createAndInitLogStorage(ZonePartitionId groupId) {
+        LogStorage logStorage = 
logStorageFactory.createLogStorage(nodeIdStringForStorage(groupId), new 
RaftOptions());
+        logStorage.init(logStorageOptions);
+        return logStorage;
+    }
+
+    private String nodeIdStringForStorage(ZonePartitionId groupId) {
+        return new RaftNodeId(groupId, peer).nodeIdStringForStorage();
+    }
+
+    private static LogEntry configLogEntry(int index) {
+        LogEntry logEntry = new LogEntry();
+
+        logEntry.setId(new LogId(index, 1));
+        logEntry.setType(EntryType.ENTRY_TYPE_CONFIGURATION);
+        logEntry.setPeers(List.of(new PeerId("a")));
+
+        return logEntry;
+    }
+
+    @Test
+    void metadataMigrationFindsGroupsHavingOnlyDataEntries() throws Exception {
+        ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+        ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+        LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+        LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+        logStorage1.appendEntry(dataLogEntry(1));
+        logStorage3.appendEntry(dataLogEntry(10));
+
+        Set<String> ids = 
logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk();
+
+        assertThat(
+                ids,
+                containsInAnyOrder(
+                        nodeIdStringForStorage(groupId1),
+                        nodeIdStringForStorage(groupId3)
+                )
+        );
+    }
+
+    private static LogEntry dataLogEntry(int index) {
+        LogEntry logEntry = new LogEntry();
+
+        logEntry.setId(new LogId(index, 1));
+        logEntry.setType(EntryType.ENTRY_TYPE_DATA);
+        logEntry.setData(ByteBuffer.wrap(new byte[0]));
+
+        return logEntry;
+    }
+
+    @Test
+    void metadataMigrationFindsGroupsHavingBothConfigurationAndDataEntries() 
throws Exception {
+        ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+        ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+        LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+        LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+        logStorage1.appendEntry(configLogEntry(1));
+        logStorage1.appendEntry(dataLogEntry(2));
+        logStorage3.appendEntry(configLogEntry(10));
+        logStorage3.appendEntry(dataLogEntry(11));
+
+        Set<String> ids = 
logStorageFactory.metadataMigration().raftNodeStorageIdsOnDisk();
+
+        assertThat(
+                ids,
+                containsInAnyOrder(
+                        nodeIdStringForStorage(groupId1),
+                        nodeIdStringForStorage(groupId3)
+                )
+        );
+    }
+
+    @Test
+    void metadataMigrationSavesStorageCreatedFlagInMetaColumnFamily() throws 
Exception {
+        ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+        ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+        LogStorage logStorage1 = createAndInitLogStorage(groupId1);
+        LogStorage logStorage3 = createAndInitLogStorage(groupId3);
+
+        logStorage1.appendEntry(configLogEntry(10));
+        logStorage1.appendEntry(dataLogEntry(11));
+        logStorage3.appendEntry(configLogEntry(100));
+        logStorage3.appendEntry(dataLogEntry(101));
+
+        
logStorageFactory.db().dropColumnFamily(logStorageFactory.metaColumnFamilyHandle());
+        
logStorageFactory.db().destroyColumnFamilyHandle(logStorageFactory.metaColumnFamilyHandle());
+
+        // Restart causes a migration.
+        stopFactory();
+        startFactory();
+
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), 
storageCreatedKey(groupId1)),
+                is(new byte[0])
+        );
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), 
storageCreatedKey(groupId3)),
+                is(new byte[0])
+        );
+    }
+
+    private byte[] storageCreatedKey(ZonePartitionId groupId) {
+        return ("\u0001" + nodeIdStringForStorage(groupId)).getBytes(UTF_8);
+    }
+
+    private byte[] entryKey(ZonePartitionId groupId, long index) {
+        return RocksDbSharedLogStorage.createKey(
+                (nodeIdStringForStorage(groupId) + "\0").getBytes(UTF_8),
+                ByteUtils.longToBytes(index)
+        );
+    }
+
+    @Test
+    void storageInitSavesStorageCreatedFlagInMetaColumnFamily() throws 
Exception {
+        ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+        ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+        createAndInitLogStorage(groupId1);
+        createAndInitLogStorage(groupId3);
+
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), 
storageCreatedKey(groupId1)),
+                is(new byte[0])
+        );
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), 
storageCreatedKey(groupId3)),
+                is(new byte[0])
+        );
+    }
+
+    @Test
+    void storageDestructionRemovesAllItsKes() throws Exception {
+        ZonePartitionId groupId = new ZonePartitionId(3, 2);
+        LogStorage logStorage = createAndInitLogStorage(groupId);
+
+        logStorage.appendEntry(configLogEntry(100));
+
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), 
storageCreatedKey(groupId)),
+                is(new byte[0])
+        );
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.confColumnFamilyHandle(), 
entryKey(groupId, 100)),
+                is(notNullValue())
+        );
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.dataColumnFamilyHandle(), 
entryKey(groupId, 100)),
+                is(notNullValue())
+        );
+
+        logStorageFactory.destroyLogStorage(nodeIdStringForStorage(groupId));
+
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.metaColumnFamilyHandle(), 
storageCreatedKey(groupId)),
+                is(nullValue())
+        );
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.confColumnFamilyHandle(), 
entryKey(groupId, 100)),
+                is(nullValue())
+        );
+        assertThat(
+                
logStorageFactory.db().get(logStorageFactory.dataColumnFamilyHandle(), 
entryKey(groupId, 100)),
+                is(nullValue())
+        );
+    }
+}


Reply via email to