This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bcf909485a1 IGNITE-25895 Implement method to get on-disk group IDs in
Raft log storage (#6297)
bcf909485a1 is described below
commit bcf909485a1dbb4211c2ce92d299dc75b7a61891
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Jul 23 12:56:54 2025 +0400
IGNITE-25895 Implement method to get on-disk group IDs in Raft log storage
(#6297)
---
.../persistence/store/FilePageStoreManager.java | 3 +-
.../raft/ItTruncateSuffixAndRestartTest.java | 7 ++++
...StorageFactory.java => GroupIdFastForward.java} | 31 ++++++---------
.../internal/raft/storage/LogStorageFactory.java | 9 +++++
.../storage/impl/DefaultLogStorageFactory.java | 44 ++++++++++++++++++++++
.../raft/storage/impl/LocalLogStorageFactory.java | 7 ++++
.../storage/impl/VolatileLogStorageFactory.java | 7 ++++
.../raft/storage/logit/LogitLogStorageFactory.java | 7 ++++
.../storage/impl/DefaultLogStorageFactoryTest.java | 18 +++++++++
.../internal/storage/engine/StorageEngine.java | 3 ++
.../state/rocksdb/TxStateRocksDbSharedStorage.java | 6 ++-
11 files changed, 121 insertions(+), 21 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
index 1f46e4155e5..b11a0ee3b98 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
@@ -513,7 +513,8 @@ public class FilePageStoreManager implements
PageReadWriteManager {
try (Stream<Path> tableDirs = Files.list(dbDir)) {
return tableDirs
.filter(path -> Files.isDirectory(path) &&
path.getFileName().toString().startsWith(GROUP_DIR_PREFIX))
-
.map(FilePageStoreManager::extractTableId).collect(toUnmodifiableSet());
+ .map(FilePageStoreManager::extractTableId)
+ .collect(toUnmodifiableSet());
} catch (IOException e) {
throw new IgniteInternalException(Common.INTERNAL_ERR, "Cannot
scan for groupIDs", e);
}
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index 770c1fed515..fe56e5d93ae 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.internal.close.ManuallyCloseable;
@@ -401,6 +402,12 @@ public class ItTruncateSuffixAndRestartTest extends
BaseIgniteAbstractTest {
// No-op.
}
+ @Override
+ public Set<String> raftNodeStorageIdsOnDisk() {
+ // There is nothing on disk.
+ return Set.of();
+ }
+
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return nullCompletedFuture();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/GroupIdFastForward.java
similarity index 53%
copy from
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
copy to
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/GroupIdFastForward.java
index ce06260e810..3ef0c70001c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/GroupIdFastForward.java
@@ -17,27 +17,20 @@
package org.apache.ignite.internal.raft.storage;
-import org.apache.ignite.internal.components.LogSyncer;
-import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.LogStorage;
-
-/** Log storage factory interface. */
-// TODO https://issues.apache.org/jira/browse/IGNITE-22766
-public interface LogStorageFactory extends LogSyncer, IgniteComponent {
- /**
- * Creates a log storage.
- *
- * @param uri Log storage URI.
- * @param raftOptions Raft options.
- * @return Log storage.
- */
- LogStorage createLogStorage(String uri, RaftOptions raftOptions);
+import org.jetbrains.annotations.Nullable;
+/**
+ * Used to calculate next group ID for storage (in log storage) from the
previous one. Might be used to increase efficacy
+ * of a scan over log storage key ranges containing different groupIDs: if we
know we want to fast forward,
+ * this instance will tell where to fast forward.
+ */
+@FunctionalInterface
+public interface GroupIdFastForward {
/**
- * Destroys a log storage (that is, removes it from the disk).
+ * Returns ID of the group (for storage) to which to fast forward from the
given one. Returns {@code null} if no fast forward
+ * is needed (in this case, the iteration will simply proceed with the
next key).
*
- * @param uri Log storage URI.
+ * @param storageGroupId Current group ID from which we want to fast
forward.
*/
- void destroyLogStorage(String uri);
+ @Nullable String nextStorageGroupId(String storageGroupId);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
index ce06260e810..58721b98ba1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageFactory.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft.storage;
+import java.util.Set;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.raft.jraft.option.RaftOptions;
@@ -40,4 +41,12 @@ public interface LogStorageFactory extends LogSyncer,
IgniteComponent {
* @param uri Log storage URI.
*/
void destroyLogStorage(String uri);
+
+ /**
+ * Obtains group IDs for storage of all Raft groups existing on disk.
+ *
+ * <p>This method should only be called when the log storage is not
accessed otherwise (so no Raft groups can appear or be destroyed
+ * in parallel with this call).
+ */
+ Set<String> raftNodeStorageIdsOnDisk();
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
index 25fa321929e..7293c531964 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
@@ -18,20 +18,26 @@
package org.apache.ignite.internal.raft.storage.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.unmodifiableSet;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageEndPrefix;
import static
org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorageUtils.raftNodeStorageStartPrefix;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
@@ -55,8 +61,11 @@ import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Env;
import org.rocksdb.Priority;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.rocksdb.util.SizeUnit;
@@ -360,6 +369,41 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
return opts;
}
+ @Override
+ public Set<String> raftNodeStorageIdsOnDisk() {
+ var groupIdsForStorage = new HashSet<String>();
+
+ try (
+ Slice upperBoundSlice = new
Slice(incrementPrefix(STORAGE_CREATED_META_PREFIX));
+ ReadOptions readOptions = new
ReadOptions().setIterateUpperBound(upperBoundSlice);
+ RocksIterator iterator = db.newIterator(metaHandle,
readOptions)
+ ) {
+ iterator.seek(STORAGE_CREATED_META_PREFIX);
+
+ while (iterator.isValid()) {
+ byte[] key = iterator.key();
+
+ String idForStorage = new String(
+ key,
+ STORAGE_CREATED_META_PREFIX.length,
+ key.length - STORAGE_CREATED_META_PREFIX.length,
+ UTF_8
+ );
+ groupIdsForStorage.add(idForStorage);
+
+ iterator.next();
+ }
+
+ // Doing this to make an exception thrown if the iteration was
stopped due to an error and not due to exhausting
+ // the iteration space.
+ iterator.status();
+ } catch (RocksDBException e) {
+ throw new IgniteInternalException(INTERNAL_ERR, "Cannot get group
storage IDs", e);
+ }
+
+ return unmodifiableSet(groupIdsForStorage);
+ }
+
@TestOnly
RocksDB db() {
return db;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
index 6938fdb7a68..ab47e6321c4 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/LocalLogStorageFactory.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.storage.impl;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
@@ -50,6 +51,12 @@ public class LocalLogStorageFactory implements
LogStorageFactory {
// This creates on-heap storages, nothing to destroy.
}
+ @Override
+ public Set<String> raftNodeStorageIdsOnDisk() {
+ // This creates on-heap storages, there is nothing on disk.
+ return Set.of();
+ }
+
@Override
public void sync() {
// no-op
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
index 51d76fc3876..58838a3465d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java
@@ -126,6 +126,13 @@ public class VolatileLogStorageFactory implements
LogStorageFactory {
}
}
+ @Override
+ public Set<String> raftNodeStorageIdsOnDisk() {
+ // This is a volatile storage; the storage is destroyed as a whole on
startup, so nothing can remain on disk to the moment
+ // when this method is called.
+ return Set.of();
+ }
+
private LogStorageBudget createLogStorageBudget() {
return newBudget(logStorageBudgetConfig);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
index ebebe5bf02a..d4e28c6ac1b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.storage.logit;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.nio.file.Path;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -112,6 +113,12 @@ public class LogitLogStorageFactory implements
LogStorageFactory {
}
}
+ @Override
+ public Set<String> raftNodeStorageIdsOnDisk() {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-25988 -
implement.
+ return Set.of();
+ }
+
@Override
public void sync() {
// TODO: https://issues.apache.org/jira/browse/IGNITE-21955
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
index 0cb4da55f14..84a2f54be30 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactoryTest.java
@@ -273,4 +273,22 @@ class DefaultLogStorageFactoryTest {
is(nullValue())
);
}
+
+ @Test
+ void groupsScanFindsGroups() {
+ ZonePartitionId groupId1 = new ZonePartitionId(1, 0);
+ ZonePartitionId groupId3 = new ZonePartitionId(3, 2);
+ createAndInitLogStorage(groupId1);
+ createAndInitLogStorage(groupId3);
+
+ Set<String> ids = logStorageFactory.raftNodeStorageIdsOnDisk();
+
+ assertThat(
+ ids,
+ containsInAnyOrder(
+ new RaftNodeId(groupId1,
peer).nodeIdStringForStorage(),
+ new RaftNodeId(groupId3, peer).nodeIdStringForStorage()
+ )
+ );
+ }
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
index d8b81334021..af931962128 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageEngine.java
@@ -86,6 +86,9 @@ public interface StorageEngine {
/**
* Returns IDs of tables for which there are MV partition storages on
disk. Those were created and flushed to disk; either
* destruction was not started for them, or it failed.
+ *
+ * <p>This method should only be called when the storage is not accessed
otherwise (so no storages in it can appear or
+ * be destroyed in parallel with this call).
*/
Set<Integer> tableIdsOnDisk();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
index 7c1fe17e946..2fc5f6e02ce 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbSharedStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.tx.storage.state.rocksdb;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Collections.unmodifiableSet;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -313,6 +314,9 @@ public class TxStateRocksDbSharedStorage implements
IgniteComponent {
/**
* Returns IDs of tables/zones for which there are tx state partition
storages on disk. Those were created and flushed to disk; either
* destruction was not started for them, or it failed.
+ *
+ * <p>This method should only be called when the tx state storage is not
accessed otherwise (so no storages in it can appear or
+ * be destroyed in parallel with this call).
*/
public Set<Integer> tableOrZoneIdsOnDisk() {
Set<Integer> ids = new HashSet<>();
@@ -341,6 +345,6 @@ public class TxStateRocksDbSharedStorage implements
IgniteComponent {
throw new TxStateStorageException(INTERNAL_ERR, "Cannot get
table/zone IDs", e);
}
- return Set.copyOf(ids);
+ return unmodifiableSet(ids);
}
}