This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 753dcde009 IGNITE-20496 Enable Logit log storage by default (#2758)
753dcde009 is described below
commit 753dcde0095e0061efddc15a61c0ae2dc8c80239
Author: Ivan Bessonov <[email protected]>
AuthorDate: Tue Oct 31 14:10:06 2023 +0300
IGNITE-20496 Enable Logit log storage by default (#2758)
---
.../server/persistence/RocksDbKeyValueStorage.java | 19 +++---
.../internal/raft/server/impl/JraftServerImpl.java | 17 ++++-
.../storage/logit/LogitLogStorageFactory.java | 45 ++++++++-----
.../jraft/storage/logit/option/StoreOptions.java | 8 +--
.../storage/logit/storage/LogitLogStorage.java | 35 +++++-----
.../jraft/storage/logit/storage/db/AbstractDB.java | 24 ++++---
.../jraft/storage/logit/storage/db/ConfDB.java | 5 +-
.../jraft/storage/logit/storage/db/IndexDB.java | 5 +-
.../storage/logit/storage/db/SegmentLogDB.java | 5 +-
.../logit/storage/factory/LogStoreFactory.java | 19 ++++--
.../storage/logit/storage/file/AbstractFile.java | 28 ++++----
.../storage/logit/storage/file/FileManager.java | 2 +-
.../logit/storage/file/index/IndexFile.java | 9 ++-
.../logit/storage/file/segment/SegmentFile.java | 15 +++--
.../jraft/storage/impl/LogStorageBenchmark.java | 76 +++++++++++++++-------
.../jraft/storage/logit/LogitLogStorageTest.java | 7 +-
.../raft/jraft/storage/logit/db/ConfDBTest.java | 8 ++-
.../raft/jraft/storage/logit/db/IndexDBTest.java | 9 ++-
.../jraft/storage/logit/db/SegmentLogDBTest.java | 9 ++-
.../storage/logit/file/index/IndexFileTest.java | 3 +-
.../logit/file/segment/SegmentFileTest.java | 3 +-
.../inmemory/ItRaftStorageVolatilityTest.java | 3 +
22 files changed, 232 insertions(+), 122 deletions(-)
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index e08fccc7f8..b298b6c8e8 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -41,7 +41,9 @@ import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_E
import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.STARTING_STORAGE_ERR;
import static org.rocksdb.util.SizeUnit.MB;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
@@ -246,7 +248,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
destroyRocksDb();
createDb();
- } catch (RocksDBException e) {
+ } catch (IOException | RocksDBException e) {
throw new MetaStorageException(STARTING_STORAGE_ERR, "Failed to
start the storage", e);
}
}
@@ -331,16 +333,15 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
/**
- * Clear the RocksDB instance. The major difference with directly deleting
the DB directory manually is that destroyDB() will take care
- * of the case where the RocksDB database is stored in multiple
directories. For instance, a single DB can be configured to store its
- * data in multiple directories by specifying different paths to
DBOptions::db_paths, DBOptions::db_log_dir, and DBOptions::wal_dir.
+ * Clear the RocksDB instance.
*
- * @throws RocksDBException If failed.
+ * @throws IOException If failed.
*/
- protected void destroyRocksDb() throws RocksDBException {
- try (Options opt = new Options()) {
- RocksDB.destroyDB(dbPath.toString(), opt);
- }
+ protected void destroyRocksDb() throws IOException {
+ // For unknown reasons, RocksDB#destroyDB(String, Options) throws
RocksDBException with ".../LOCK: No such file or directory".
+ IgniteUtils.deleteIfExists(dbPath);
+
+ Files.createDirectories(dbPath);
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index c3f7a1934d..d5aadc784d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -42,6 +42,7 @@ import java.util.function.BiPredicate;
import java.util.stream.IntStream;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -56,6 +57,7 @@ import
org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
import
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
+import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageFactory;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -84,6 +86,7 @@ import
org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestInterceptor;
import
org.apache.ignite.raft.jraft.rpc.impl.core.NullAppendEntriesRequestInterceptor;
import
org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl.StableClosureEvent;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
@@ -97,6 +100,12 @@ import org.jetbrains.annotations.TestOnly;
* Raft server implementation on top of forked JRaft library.
*/
public class JraftServerImpl implements RaftServer {
+ /**
+ * Enables logit log storage. {@code true} by default.
+ * This is a temporary property, that should only be used for testing and
comparing the two storages.
+ */
+ public static final String LOGIT_STORAGE_ENABLED_PROPERTY =
"LOGIT_STORAGE_ENABLED";
+
/** Cluster service. */
private final ClusterService service;
@@ -168,7 +177,9 @@ public class JraftServerImpl implements RaftServer {
this.service = service;
this.dataPath = dataPath;
this.nodeManager = new NodeManager();
- this.logStorageFactory = new
DefaultLogStorageFactory(dataPath.resolve("log"));
+ this.logStorageFactory =
IgniteSystemProperties.getBoolean(LOGIT_STORAGE_ENABLED_PROPERTY, true)
+ ? new LogitLogStorageFactory(dataPath.resolve("log"),
getLogOptions())
+ : new DefaultLogStorageFactory(dataPath.resolve("log"));
this.opts = opts;
this.raftGroupEventsClientListener = raftGroupEventsClientListener;
@@ -205,6 +216,10 @@ public class JraftServerImpl implements RaftServer {
serviceEventInterceptor = new RaftServiceEventInterceptor();
}
+ private StoreOptions getLogOptions() {
+ return new StoreOptions();
+ }
+
/**
* Sets {@link AppendEntriesRequestInterceptor} to use. Should only be
called from the same thread that is used
* to {@link #start()} the component.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
similarity index 58%
rename from
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java
rename to
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
index bc66237992..2bcb4cf1ec 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.storage.logit;
+package org.apache.ignite.internal.raft.storage.logit;
-import java.nio.file.Paths;
-import java.util.concurrent.ExecutorService;
+import java.nio.file.Path;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
@@ -32,39 +32,54 @@ import
org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.StringUtils;
+/**
+ * Log storage factory for {@link LogitLogStorage} instances.
+ */
public class LogitLogStorageFactory implements LogStorageFactory {
private static final IgniteLogger LOG =
Loggers.forClass(LogitLogStorageFactory.class);
- public static final String NEW_STORAGE_PATH = "LogitStorage";
+ private static final String LOG_DIR_PREFIX = "log-";
/** Executor for shared storages. */
- protected final ExecutorService executorService;
+ private final ScheduledExecutorService checkpointExecutor;
+
+ private final Path baseLogStoragesPath;
private final StoreOptions storeOptions;
- public LogitLogStorageFactory(StoreOptions storeOptions) {
+ /**
+ * Constructor.
+ *
+ * @param baseLogStoragesPath Location of all log storages, created by
this factory.
+ * @param storeOptions Logit log storage options.
+ */
+ public LogitLogStorageFactory(Path baseLogStoragesPath, StoreOptions
storeOptions) {
+ this.baseLogStoragesPath = baseLogStoragesPath;
this.storeOptions = storeOptions;
- executorService = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors() * 2,
- new NamedThreadFactory("raft-shared-log-storage-pool", LOG)
+ checkpointExecutor = Executors.newSingleThreadScheduledExecutor(
+ new NamedThreadFactory("logit-checkpoint-executor", LOG)
);
}
@Override
public void start() {
-
}
@Override
- public LogStorage createLogStorage(String uri, RaftOptions raftOptions) {
- Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage
uri.");
+ public LogStorage createLogStorage(String groupId, RaftOptions
raftOptions) {
+ Requires.requireTrue(StringUtils.isNotBlank(groupId), "Blank log
storage uri.");
- String newStoragePath = Paths.get(uri, NEW_STORAGE_PATH).toString();
- return new LogitLogStorage(newStoragePath, storeOptions, raftOptions,
executorService);
+ Path storagePath = resolveLogStoragePath(groupId);
+
+ return new LogitLogStorage(storagePath, storeOptions, raftOptions,
checkpointExecutor);
}
@Override
public void close() {
- ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
+ ExecutorServiceHelper.shutdownAndAwaitTermination(checkpointExecutor);
+ }
+
+ private Path resolveLogStoragePath(String groupId) {
+ return baseLogStoragesPath.resolve(LOG_DIR_PREFIX + groupId);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
index 855373a320..a4d3162955 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
@@ -27,13 +27,13 @@ public class StoreOptions {
private static final String storagePath = "localLog";
- // Default is 0.5G
- private int segmentFileSize = 1024 * 1024 *
512;
+ // Default is 64Mb, similar to Ignite 2.
+ private int segmentFileSize = 1024 * 1024 *
64;
private int indexFileSize =
FileHeader.HEADER_SIZE + 5000000
*
IndexEntry.INDEX_SIZE;
-
- private int confFileSize = 1024 * 1024 *
512;
+ // Default is 64Mb, similar to Ignite 2.
+ private int confFileSize = 1024 * 1024 *
64;
// Whether enable warm up file when pre allocate
private boolean enableWarmUpFile = true;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
index 69df2d73a6..0e55460932 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
@@ -18,10 +18,11 @@
package org.apache.ignite.raft.jraft.storage.logit.storage;
import java.io.IOException;
-import java.nio.file.Paths;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executor;import java.util.concurrent.locks.Lock;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -53,7 +54,6 @@ import
org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.Requires;
-import org.apache.ignite.raft.jraft.util.Utils;
/**
* A logStorage implemented by java
@@ -83,16 +83,16 @@ public class LogitLogStorage implements LogStorage {
private LogStoreFactory logStoreFactory;
/** Executor that handles prefix truncation. */
- private final Executor executor;
+ private final ScheduledExecutorService checkpointExecutor;
- public LogitLogStorage(final String path, final StoreOptions storeOptions,
RaftOptions raftOptions, Executor executor) {
- this.indexStorePath = Paths.get(path, INDEX_STORE_PATH).toString();
- this.segmentStorePath = Paths.get(path, SEGMENT_STORE_PATH).toString();
- this.confStorePath = Paths.get(path, CONF_STORE_PATH).toString();
+ public LogitLogStorage(Path path, StoreOptions storeOptions, RaftOptions
raftOptions, ScheduledExecutorService checkpointExecutor) {
+ this.indexStorePath = path.resolve(INDEX_STORE_PATH).toString();
+ this.segmentStorePath = path.resolve(SEGMENT_STORE_PATH).toString();
+ this.confStorePath = path.resolve(CONF_STORE_PATH).toString();
this.storeOptions = storeOptions;
this.raftOptions = raftOptions;
- this.executor = executor;
- final String checkPointPath = Paths.get(path,
FIRST_INDEX_CHECKPOINT).toString();
+ this.checkpointExecutor = checkpointExecutor;
+ final String checkPointPath =
path.resolve(FIRST_INDEX_CHECKPOINT).toString();
this.firstLogIndexCheckpoint = new
FirstLogIndexCheckpoint(checkPointPath, raftOptions);
}
@@ -108,9 +108,9 @@ public class LogitLogStorage implements LogStorage {
// Create dbs and recover
this.logStoreFactory = new LogStoreFactory(this.storeOptions,
raftOptions);
- this.indexDB = new IndexDB(this.indexStorePath);
- this.segmentLogDB = new SegmentLogDB(this.segmentStorePath);
- this.confDB = new ConfDB(this.confStorePath);
+ this.indexDB = new IndexDB(this.indexStorePath,
checkpointExecutor);
+ this.segmentLogDB = new SegmentLogDB(this.segmentStorePath,
checkpointExecutor);
+ this.confDB = new ConfDB(this.confStorePath, checkpointExecutor);
if (!(this.indexDB.init(this.logStoreFactory) &&
this.segmentLogDB.init(this.logStoreFactory) && this.confDB
.init(this.logStoreFactory))) {
LOG.warn("Init dbs failed when startup logitLogStorage");
@@ -470,11 +470,10 @@ public class LogitLogStorage implements LogStorage {
try {
final boolean ret = saveFirstLogIndex(firstIndexKept);
if (ret) {
- Utils.runInThread(executor, () -> {
- this.indexDB.truncatePrefix(firstIndexKept);
- this.segmentLogDB.truncatePrefix(firstIndexKept);
- this.confDB.truncatePrefix(firstIndexKept);
- });
+ // TODO IGNITE-20754 Make async when possible.
+ this.indexDB.truncatePrefix(firstIndexKept);
+ this.segmentLogDB.truncatePrefix(firstIndexKept);
+ this.confDB.truncatePrefix(firstIndexKept);
}
return ret;
} finally {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
index 7477abb61d..7a097d5b2a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
@@ -21,13 +21,12 @@ import java.io.IOException;
import java.nio.file.Path;import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.raft.jraft.Lifecycle;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
@@ -61,10 +60,13 @@ public abstract class AbstractDB implements
Lifecycle<LogStoreFactory> {
protected StoreOptions storeOptions;
protected AbortFile abortFile;
protected FlushStatusCheckpoint flushStatusCheckpoint;
- private ScheduledExecutorService checkpointExecutor;
- protected AbstractDB(final String storePath) {
+ private final ScheduledExecutorService checkpointExecutor;
+ private ScheduledFuture<?> checkpointScheduledFuture;
+
+ protected AbstractDB(String storePath, ScheduledExecutorService
checkpointExecutor) {
this.storePath = storePath;
+ this.checkpointExecutor = checkpointExecutor;
}
@Override
@@ -81,15 +83,18 @@ public abstract class AbstractDB implements
Lifecycle<LogStoreFactory> {
}
this.fileManager = logStoreFactory.newFileManager(getDBFileType(),
this.storePath,
this.serviceManager.getAllocateService());
- this.checkpointExecutor = Executors
- .newSingleThreadScheduledExecutor(new
NamedThreadFactory(getDBName() + "-Checkpoint-Thread-", LOG));
final int interval =
this.storeOptions.getCheckpointFlushStatusInterval();
- this.checkpointExecutor.scheduleAtFixedRate(this::doCheckpoint,
interval, interval, TimeUnit.MILLISECONDS);
+
+ checkpointScheduledFuture =
+
this.checkpointExecutor.scheduleAtFixedRate(this::doCheckpoint, interval,
interval, TimeUnit.MILLISECONDS);
+
return true;
}
@Override
public void shutdown() {
+ checkpointScheduledFuture.cancel(false);
+
doCheckpoint();
if (this.serviceManager != null) {
this.serviceManager.shutdown();
@@ -100,7 +105,6 @@ public abstract class AbstractDB implements
Lifecycle<LogStoreFactory> {
if (this.abortFile != null) {
this.abortFile.destroy();
}
- this.checkpointExecutor.shutdown();
}
/**
@@ -310,7 +314,7 @@ public abstract class AbstractDB implements
Lifecycle<LogStoreFactory> {
this.flushStatusCheckpoint.save();
}
} catch (final IOException e) {
- LOG.error("Error when do checkpoint in db:{}", getDBName());
+ LOG.error("Error when do checkpoint in db:{}", e, getDBName());
}
}
@@ -382,6 +386,8 @@ public abstract class AbstractDB implements
Lifecycle<LogStoreFactory> {
public boolean truncateSuffix(final long lastIndexKept, final int pos) {
if (this.fileManager.truncateSuffix(lastIndexKept, pos)) {
doCheckpoint();
+
+ return true; // This fix is missing in "jraft".
}
return false;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
index a79ee0ace3..2d4be73206 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
/**
@@ -24,8 +25,8 @@ import
org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
*/
public class ConfDB extends AbstractDB {
- public ConfDB(final String storePath) {
- super(storePath);
+ public ConfDB(String storePath, ScheduledExecutorService
checkpointExecutor) {
+ super(storePath, checkpointExecutor);
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
index 012e740bb1..7cbf6626d6 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.storage.logit.storage.db;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
import
org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
@@ -30,8 +31,8 @@ import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
*/
public class IndexDB extends AbstractDB {
- public IndexDB(final String storePath) {
- super(storePath);
+ public IndexDB(String storePath, ScheduledExecutorService
checkpointExecutor) {
+ super(storePath, checkpointExecutor);
}
/**
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
index 370194bd47..477aa5ede1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
/**
@@ -24,8 +25,8 @@ import
org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
*/
public class SegmentLogDB extends AbstractDB {
- public SegmentLogDB(final String storePath) {
- super(storePath);
+ public SegmentLogDB(String storePath, ScheduledExecutorService
checkpointExecutor) {
+ super(storePath, checkpointExecutor);
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
index 8b262ddfc3..8ec757b313 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
@@ -37,15 +37,26 @@ public class LogStoreFactory {
public LogStoreFactory(final StoreOptions opts, RaftOptions raftOptions) {
this.storeOptions = opts;
- this.raftOptions = raftOptions;}
+ this.raftOptions = raftOptions;
+ }
/**
* Create new file(index/segment/conf)
*/
public AbstractFile newFile(final FileType fileType, final String
filePath) {
- return isIndex(fileType) ? new IndexFile(filePath,
this.storeOptions.getIndexFileSize()) : //
- isConf(fileType) ? new SegmentFile(filePath,
this.storeOptions.getConfFileSize()) : //
- new SegmentFile(filePath,
this.storeOptions.getSegmentFileSize());
+ switch (fileType) {
+ case FILE_INDEX:
+ return new IndexFile(raftOptions, filePath,
this.storeOptions.getIndexFileSize());
+
+ case FILE_SEGMENT:
+ return new SegmentFile(raftOptions, filePath,
this.storeOptions.getSegmentFileSize());
+
+ case FILE_CONFIGURATION:
+ return new SegmentFile(raftOptions, filePath,
this.storeOptions.getConfFileSize());
+
+ default:
+ throw new AssertionError("Unidentified file type: " +
fileType);
+ }
}
/**
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
index ac0e4d36a6..7c3972e470 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
@@ -31,7 +31,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.logit.LibC;
import
org.apache.ignite.raft.jraft.storage.logit.util.concurrent.ReferenceResource;
import org.apache.ignite.raft.jraft.util.Platform;
@@ -52,6 +54,8 @@ public abstract class AbstractFile extends ReferenceResource {
protected static final byte FILE_END_BYTE = 'x';
+ private final RaftOptions raftOptions;
+
protected String filePath;
// The size of this file
@@ -75,7 +79,8 @@ public abstract class AbstractFile extends ReferenceResource {
private volatile boolean isMapped = false;
private final ReentrantLock mapLock = new ReentrantLock();
- public AbstractFile(final String filePath, final int fileSize, final
boolean isMapped) {
+ public AbstractFile(RaftOptions raftOptions, final String filePath, final
int fileSize, final boolean isMapped) {
+ this.raftOptions = raftOptions;
initAndMap(filePath, fileSize, isMapped);
this.header = new FileHeader();
}
@@ -224,18 +229,13 @@ public abstract class AbstractFile extends
ReferenceResource {
return RecoverResult.newInstance(true, isRecoverTotal,
recoverPosition);
}
- public enum CheckDataResult {
- CHECK_SUCCESS(1), // If check success, return dataSize
- CHECK_FAIL(-1), // If check failed, return -1
- FILE_END(0); // If come to file end, return 0
+ public static class CheckDataResult {
+ public static final CheckDataResult CHECK_FAIL = new
CheckDataResult(-1); // If check failed, return -1
+ public static final CheckDataResult FILE_END = new CheckDataResult(0);
// If come to file end, return 0
private int size;
- CheckDataResult(final int pos) {
- this.size = pos;
- }
-
- public void setSize(final int pos) {
+ public CheckDataResult(final int pos) {
this.size = pos;
}
}
@@ -294,7 +294,9 @@ public abstract class AbstractFile extends
ReferenceResource {
if (hold()) {
final int value = getWrotePosition();
try {
- this.mappedByteBuffer.force();
+ if (raftOptions.isSync()) {
+ this.mappedByteBuffer.force();
+ }
} catch (final Throwable e) {
LOG.error("Error occurred when force data to disk.", e);
throw new RuntimeException(e);
@@ -395,9 +397,7 @@ public abstract class AbstractFile extends
ReferenceResource {
}
public void put(final ByteBuffer buffer, final int index, final byte[]
data) {
- for (int i = 0; i < data.length; i++) {
- buffer.put(index + i, data[i]);
- }
+ GridUnsafe.copyHeapOffheap(data, GridUnsafe.BYTE_ARR_OFF,
GridUnsafe.bufferAddress(buffer) + index, data.length);
}
public long getFirstLogIndex() {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
index ea784bda31..53b0e15099 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
@@ -435,7 +435,7 @@ public class FileManager {
for (final AbstractFile abstractFile : this.files) {
final long lastLogIndex = abstractFile.getLastLogIndex();
if (lastLogIndex < firstIndexKept) {
- willRemoveFiles.addAll(this.files);
+ willRemoveFiles.add(abstractFile);
}
}
return true;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
index d2399afbfc..a18f5dc289 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
/**
@@ -49,8 +50,8 @@ public class IndexFile extends AbstractFile {
public static final int RECORD_MAGIC_BYTES_SIZE =
RECORD_MAGIC_BYTES.length;
- public IndexFile(final String filePath, final int fileSize) {
- super(filePath, fileSize, true);
+ public IndexFile(RaftOptions raftOptions, final String filePath, final int
fileSize) {
+ super(raftOptions, filePath, fileSize, true);
}
/**
@@ -204,9 +205,7 @@ public class IndexFile extends AbstractFile {
if (buffer.remaining() < getIndexSize() - RECORD_MAGIC_BYTES_SIZE - 1)
{
return CheckDataResult.CHECK_FAIL;
}
- final CheckDataResult result = CheckDataResult.CHECK_SUCCESS;
- result.setSize(getIndexSize());
- return result;
+ return new CheckDataResult(getIndexSize());
}
@Override
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
index ebc675cc1a..01fe46f272 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
/**
@@ -53,8 +54,8 @@ public class SegmentFile extends AbstractFile {
// 4 Bytes for written data length
private static final int RECORD_DATA_LENGTH_SIZE = 4;
- public SegmentFile(final String filePath, final int fileSize) {
- super(filePath, fileSize, true);
+ public SegmentFile(RaftOptions raftOptions, final String filePath, final
int fileSize) {
+ super(raftOptions, filePath, fileSize, true);
}
/**
@@ -102,9 +103,11 @@ public class SegmentFile extends AbstractFile {
getFilePath(), logIndex, pos,
this.header.getFirstLogIndex(), getLastLogIndex());
return null;
}
- if (pos > getFlushedPosition()) {
+ // Original jraft code did the comparison with flushed position.
In didn't work in cases where leader would write log entry
+ // locally, wouldn't flush it, and then will try replicating it. I
don't know whether it's correct, but this is how it works.
+ if (pos > getWrotePosition()) {
LOG.warn(
- "Try to read data from segment file {} out of comitted
position, logIndex={}, readPos={}, wrotePos={}, flushPos={}.",
+ "Try to read data from segment file {} out of written
position, logIndex={}, readPos={}, wrotePos={}, flushPos={}.",
getFilePath(), logIndex, pos, getWrotePosition(),
getFlushedPosition());
return null;
}
@@ -165,9 +168,7 @@ public class SegmentFile extends AbstractFile {
if (buffer.remaining() < dataLen) {
return CheckDataResult.CHECK_FAIL;
}
- final CheckDataResult result = CheckDataResult.CHECK_SUCCESS;
- result.setSize(RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE +
dataLen);
- return result;
+ return new CheckDataResult(RECORD_MAGIC_BYTES_SIZE +
RECORD_DATA_LENGTH_SIZE + dataLen);
}
@Override
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java
index f2b107f315..c53dc213d1 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java
@@ -16,20 +16,29 @@
*/
package org.apache.ignite.raft.jraft.storage.impl;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
import org.apache.ignite.raft.jraft.option.LogStorageOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
-import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
import org.apache.ignite.raft.jraft.util.Utils;
public class LogStorageBenchmark {
+ private static final int WARMUP_LOG_ENTRIES = 10000;
private final LogStorage logStorage;
@@ -39,6 +48,8 @@ public class LogStorageBenchmark {
private final int batchSize;
+ private final byte[] bytes;
+
public LogStorageBenchmark(final LogStorage logStorage, final int logSize,
final int totalLogs,
final int batchSize) {
super();
@@ -46,13 +57,19 @@ public class LogStorageBenchmark {
this.logSize = logSize;
this.totalLogs = totalLogs;
this.batchSize = batchSize;
+ this.bytes = new byte[logSize];
+ ThreadLocalRandom.current().nextBytes(bytes);
}
- private void write(final int batchSize, final int logSize, final int
totalLogs) {
+ private void write(final int batchSize, final int logSize, final int
totalLogs, int offset) {
List<LogEntry> entries = new ArrayList<>(batchSize);
- for (int i = 0; i < totalLogs; i += batchSize) {
+ for (int i = offset; i < totalLogs; i += batchSize) {
for (int j = i; j < i + batchSize; j++) {
- entries.add(TestUtils.mockEntry(j, j, logSize));
+ LogEntry entry = new
LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+ entry.setId(new LogId(j, j));
+ entry.setData(ByteBuffer.wrap(bytes)); // Reuse the same data
for benchmark purity.
+
+ entries.add(entry);
}
int ret = this.logStorage.appendEntries(entries);
if (ret != batchSize) {
@@ -89,25 +106,27 @@ public class LogStorageBenchmark {
private void report(final String op, final long cost) {
System.out.println("Test " + op + ":");
- System.out.println(" Log number :" + this.totalLogs);
- System.out.println(" Log Size :" + this.logSize);
- System.out.println(" Batch Size :" + this.batchSize);
- System.out.println(" Cost time(s) :" + cost / 1000);
- System.out.println(" Total size :" + (long) this.totalLogs *
this.logSize);
+ System.out.println(" Log number : " + this.totalLogs);
+ System.out.println(" Log Size : " + this.logSize);
+ System.out.println(" Batch Size : " + this.batchSize);
+ System.out.println(" Cost time(s) : " + cost / 1000.0f);
+ System.out.println(" Total size : " + (long) this.totalLogs *
this.logSize);
+ System.out.println(" Throughput(bps) : " + 1000L * this.totalLogs *
this.logSize / cost);
+ System.out.println(" Throughput(rps) : " + 1000L * this.totalLogs /
cost);
}
private void doTest() {
System.out.println("Begin test...");
{
System.out.println("Warm up...");
- write(10, 64, 10000);
- read(64, 10000);
+ write(this.batchSize, this.logSize, WARMUP_LOG_ENTRIES, 0);
+ read(this.logSize, WARMUP_LOG_ENTRIES);
}
System.out.println("Start test...");
{
long start = Utils.monotonicMs();
- write(this.batchSize, this.logSize, this.totalLogs);
+ write(this.batchSize, this.logSize, this.totalLogs,
WARMUP_LOG_ENTRIES);
long cost = Utils.monotonicMs() - start;
report("write", cost);
}
@@ -121,22 +140,33 @@ public class LogStorageBenchmark {
System.out.println("Test done!");
}
- public static void main(final String[] args) {
- String testPath = Paths.get(SystemPropertyUtil.get("user.dir"),
"log_storage").toString();
+ public static void main(final String[] args) throws Exception {
+ Path testPath = Paths.get(SystemPropertyUtil.get("user.dir"),
"log_storage");
+ IgniteUtils.deleteIfExists(testPath);
+
System.out.println("Test log storage path: " + testPath);
int batchSize = 100;
int logSize = 16 * 1024;
- int totalLogs = 30 * 1024;
+ int totalLogs = 100 * 1024;
- LogStorage logStorage = new RocksDBLogStorage(testPath, new
RaftOptions());
- //LogStorage logStorage = new LocalLogStorage(testPath, new
RaftOptions());
+// LogStorageFactory logStorageFactory = new
DefaultLogStorageFactory(testPath);
+ LogStorageFactory logStorageFactory = new
LogitLogStorageFactory(testPath, new StoreOptions());
+ logStorageFactory.start();
- LogStorageOptions opts = new LogStorageOptions();
- opts.setConfigurationManager(new ConfigurationManager());
- opts.setLogEntryCodecFactory(LogEntryV1CodecFactory.getInstance());
- logStorage.init(opts);
+ try (AutoCloseable factory = logStorageFactory::close) {
+ RaftOptions raftOptions = new RaftOptions();
+ raftOptions.setSync(false);
- new LogStorageBenchmark(logStorage, logSize, totalLogs,
batchSize).doTest();
- }
+ LogStorage logStorage = logStorageFactory.createLogStorage("test",
raftOptions);
+ LogStorageOptions opts = new LogStorageOptions();
+ opts.setConfigurationManager(new ConfigurationManager());
+ opts.setLogEntryCodecFactory(LogEntryV1CodecFactory.getInstance());
+ logStorage.init(opts);
+
+ try (AutoCloseable log = logStorage::shutdown) {
+ new LogStorageBenchmark(logStorage, logSize, totalLogs,
batchSize).doTest();
+ }
+ }
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
index 2fa2fe0590..d256c25ba5 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
@@ -21,6 +21,8 @@ import static
org.apache.ignite.raft.jraft.entity.PeerId.emptyPeer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.List;
+import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageFactory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.option.RaftOptions;
@@ -35,14 +37,17 @@ import org.apache.ignite.raft.jraft.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(WorkDirectoryExtension.class)
public class LogitLogStorageTest extends BaseLogStorageTest {
private LogitLogStorageFactory logStorageFactory;
+
@BeforeEach
@Override
public void setup() throws Exception {
- logStorageFactory = new LogitLogStorageFactory(testStoreOptions());
+ logStorageFactory = new LogitLogStorageFactory(path,
testStoreOptions());
logStorageFactory.start();
super.setup();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
index f58fa63a79..7d7303d753 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
@@ -24,6 +24,8 @@ import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -48,10 +50,13 @@ public class ConfDBTest extends BaseLogitStorageTest {
private LogEntryDecoder decoder;
private LogEntryEncoder encoder;
+ private ScheduledExecutorService checkpointExecutor;
+
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
+ checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
this.confStorePath = this.path + File.separator + "conf";
Files.createDirectories(Path.of(confStorePath));
this.logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
@@ -61,13 +66,14 @@ public class ConfDBTest extends BaseLogitStorageTest {
}
public void init() {
- this.confDB = new ConfDB(this.confStorePath);
+ this.confDB = new ConfDB(this.confStorePath, checkpointExecutor);
this.confDB.init(this.logStoreFactory);
}
@AfterEach
public void teardown() throws Exception {
this.confDB.shutdown();
+ checkpointExecutor.shutdown();
}
@Test
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
index 9e5930b896..4f8f54ca1e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
@@ -22,6 +22,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
import org.apache.ignite.raft.jraft.storage.logit.storage.db.IndexDB;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.AbortFile;
@@ -37,10 +39,14 @@ public class IndexDBTest extends BaseLogitStorageTest {
private String indexStorePath;
private AbortFile abortFile;
+ private ScheduledExecutorService checkpointExecutor;
+
+
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
+ checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
this.indexStorePath = this.path + File.separator + "index";
this.abortFile = new AbortFile(this.indexStorePath + File.separator +
"Abort");
Files.createDirectories(Path.of(indexStorePath));
@@ -48,13 +54,14 @@ public class IndexDBTest extends BaseLogitStorageTest {
}
public void init() {
- this.indexDB = new IndexDB(this.indexStorePath);
+ this.indexDB = new IndexDB(this.indexStorePath, checkpointExecutor);
this.indexDB.init(this.logStoreFactory);
}
@AfterEach
public void teardown() throws Exception {
this.indexDB.shutdown();
+ checkpointExecutor.shutdown();
}
/**
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
index 13542f0bca..38ad968959 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
@@ -23,6 +23,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
@@ -37,23 +39,28 @@ public class SegmentLogDBTest extends BaseLogitStorageTest {
private SegmentLogDB segmentLogDB;
private String segmentStorePath;
+ private ScheduledExecutorService checkpointExecutor;
+
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
+ checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
+
this.segmentStorePath = this.path + File.separator + "segment";
Files.createDirectories(Path.of(segmentStorePath));
this.init();
}
public void init() {
- this.segmentLogDB = new SegmentLogDB(this.segmentStorePath);
+ this.segmentLogDB = new SegmentLogDB(this.segmentStorePath,
checkpointExecutor);
this.segmentLogDB.init(this.logStoreFactory);
}
@AfterEach
public void teardown() throws Exception {
this.segmentLogDB.shutdown();
+ checkpointExecutor.shutdown();
}
@Test
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
index 0821aa78ea..c3b7397075 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.channels.FileChannel;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
@@ -49,7 +50,7 @@ public class IndexFileTest extends BaseLogitStorageTest {
private void init() {
final String filePath = this.path + File.separator + "IndexFileTest";
- this.offsetIndex = new IndexFile(filePath, FILE_SIZE);
+ this.offsetIndex = new IndexFile(new RaftOptions(), filePath,
FILE_SIZE);
}
private final IndexEntry appendEntry0 = new IndexEntry(0, 1);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
index 4463927239..30ad39d79b 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
import
org.apache.ignite.raft.jraft.storage.logit.storage.file.segment.SegmentFile;
@@ -52,7 +53,7 @@ public class SegmentFileTest extends BaseLogitStorageTest {
private void init() {
final String filePath = this.path + File.separator + "IndexFileTest";
- this.segmentFile = new SegmentFile(filePath, FILE_SIZE);
+ this.segmentFile = new SegmentFile(new RaftOptions(), filePath,
FILE_SIZE);
}
@Test
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
index e865b7573f..b02548c48c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
@@ -40,7 +40,9 @@ import
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.raft.configuration.EntryCountBudgetChange;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.junit.jupiter.api.Test;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -53,6 +55,7 @@ import org.rocksdb.Slice;
* Tests for making sure that RAFT groups corresponding to partition stores of
in-memory tables use volatile
* storages for storing RAFT meta and RAFT log, while they are persistent for
persistent storages.
*/
+@WithSystemProperty(key = JraftServerImpl.LOGIT_STORAGE_ENABLED_PROPERTY,
value = "false")
class ItRaftStorageVolatilityTest extends ClusterPerTestIntegrationTest {
private static final String TABLE_NAME = "test";