This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a5bb0974edf IGNITE-26299 Port hybrid log storage from jraft (#7892)
a5bb0974edf is described below
commit a5bb0974edfdfc2c1636a78a103d5805886140f8
Author: Phillippko <[email protected]>
AuthorDate: Mon Mar 30 13:41:37 2026 +0700
IGNITE-26299 Port hybrid log storage from jraft (#7892)
---
.../internal/raft/storage/LogStorageManager.java | 8 +-
.../jraft/core/HybridLogJRaftServiceFactory.java | 67 ++++++
.../ignite/raft/jraft/option/RaftOptions.java | 18 +-
.../storage/logit/storage/HybridLogStorage.java | 225 +++++++++++++++++++++
.../file/assit/HybridStorageStatusCheckpoint.java | 50 +++++
.../jraft/storage/logit/HybridLogStorageTest.java | 176 ++++++++++++++++
6 files changed, 539 insertions(+), 5 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageManager.java
index 2d4d88f3a36..49a5f334ad5 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/LogStorageManager.java
@@ -32,18 +32,18 @@ public interface LogStorageManager extends IgniteComponent {
/**
* Creates a log storage.
*
- * @param uri Log storage URI.
+ * @param groupId Group ID to create storage for.
* @param raftOptions Raft options.
* @return Log storage.
*/
- LogStorage createLogStorage(String uri, RaftOptions raftOptions);
+ LogStorage createLogStorage(String groupId, RaftOptions raftOptions);
/**
* Destroys a log storage (that is, removes it from the disk).
*
- * @param uri Log storage URI.
+ * @param groupId Groupd ID to destroy log storage of.
*/
- void destroyLogStorage(String uri);
+ void destroyLogStorage(String groupId);
/**
* Obtains group IDs for storage of all Raft groups existing on disk.
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/HybridLogJRaftServiceFactory.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/HybridLogJRaftServiceFactory.java
new file mode 100644
index 00000000000..52a2340c769
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/HybridLogJRaftServiceFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.core;
+
+import java.nio.file.Path;
+import org.apache.ignite.internal.raft.storage.LogStorageManager;
+import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.storage.HybridLogStorage;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.StringUtils;
+
+/** Creates hybrid log storage using passed managers, for smooth migration
from old log storage type. */
+public class HybridLogJRaftServiceFactory extends IgniteJraftServiceFactory {
+ private final LogStorageManager newStorageManager;
+
+ private final Path newStoragePath;
+
+ /**
+ * Creates a new instance of HybridLogJRaftServiceFactory for migrating
from old storage type to new one.
+ * @param oldStorageManager Factory to create old log storages.
+ * @param newStorageManager Factory to create new log storage.
+ * @param newStoragePath Path used by {@param newStorageManager}. Should be
different from old storage path to avoid conflicts.
+ */
+ public HybridLogJRaftServiceFactory(
+ LogStorageManager oldStorageManager,
+ LogStorageManager newStorageManager,
+ Path newStoragePath
+ ) {
+ super(oldStorageManager);
+
+ this.newStorageManager = newStorageManager;
+ this.newStoragePath = newStoragePath;
+ }
+
+ @Override
+ public LogStorage createLogStorage(String groupId, RaftOptions
raftOptions) {
+ Requires.requireTrue(StringUtils.isNotBlank(groupId), "Blank
groupId.");
+
+ // Create old storage if needed.
+ LogStorage oldStorage = null;
+ if (raftOptions.isStartupOldStorage()) {
+ oldStorage = super.createLogStorage(groupId, raftOptions);
+ }
+
+ LogStorage newLogStorage = newStorageManager.createLogStorage(groupId,
raftOptions);
+
+ return new HybridLogStorage(newStoragePath, raftOptions, oldStorage,
newLogStorage);
+ }
+}
+
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
index f7d35ddd751..979e0e1fe54 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
@@ -139,6 +139,12 @@ public class RaftOptions implements Copiable<RaftOptions> {
*/
private boolean stepDownWhenVoteTimedout = true;
+ /**
+ * Check whether start up old storage (RocksdbLogStorage) when use
newLogStorage
+ * This option needs to be set to true if logs still exists in old storage.
+ */
+ private boolean startupOldStorage = false;
+
/**
* Maximum total byte size of tasks in the apply queue.
*
@@ -312,6 +318,14 @@ public class RaftOptions implements Copiable<RaftOptions> {
this.openStatistics = openStatistics;
}
+ public boolean isStartupOldStorage() {
+ return startupOldStorage;
+ }
+
+ public void setStartupOldStorage(final boolean startupOldStorage) {
+ this.startupOldStorage = startupOldStorage;
+ }
+
/**
* @return Raft message factory.
*/
@@ -357,6 +371,7 @@ public class RaftOptions implements Copiable<RaftOptions> {
raftOptions.setReadOnlyOptions(this.readOnlyOptions);
raftOptions.setRaftMessagesFactory(this.raftMessagesFactory);
raftOptions.setMaxApplyQueueByteSize(this.maxApplyQueueByteSize);
+ raftOptions.setStartupOldStorage(this.startupOldStorage);
return raftOptions;
}
@@ -372,6 +387,7 @@ public class RaftOptions implements Copiable<RaftOptions> {
+ ", maxReplicatorInflightMsgs=" + this.maxReplicatorInflightMsgs
+ ", disruptorBufferSize="
+ this.disruptorBufferSize + ",
disruptorPublishEventWaitTimeoutSecs="
+ this.disruptorPublishEventWaitTimeoutSecs + ",
enableLogEntryChecksum=" + this.enableLogEntryChecksum
- + ", readOnlyOptions=" + this.readOnlyOptions + ",
maxApplyQueueByteSize=" + this.maxApplyQueueByteSize + '}';
+ + ", readOnlyOptions=" + this.readOnlyOptions + ",
maxApplyQueueByteSize=" + this.maxApplyQueueByteSize
+ + ", startUpOldStorage=" + this.startupOldStorage + '}';
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/HybridLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/HybridLogStorage.java
new file mode 100644
index 00000000000..122d6df6da6
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/HybridLogStorage.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import
org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.HybridStorageStatusCheckpoint;
+import org.apache.ignite.raft.jraft.util.OnlyForTest;
+
+/**
+ * HybridLogStorage is used to be compatible with new and old logStorage.
+ */
+public class HybridLogStorage implements LogStorage {
+ /** Path to save the checkpoint of hybrid log storage status, which is
used to indicate whether the old log storage exists. */
+ public static final String STATUS_CHECKPOINT_PATH =
"HybridStatusCheckpoint";
+
+ private static final IgniteLogger LOG =
Loggers.forClass(HybridLogStorage.class);
+
+ private final HybridStorageStatusCheckpoint statusCheckpoint;
+ private volatile boolean isOldStorageExist;
+ private final LogStorage oldLogStorage;
+ private final LogStorage newLogStorage;
+
+ // The index which separates the oldStorage and newStorage.
+ private long thresholdIndex;
+
+ public HybridLogStorage(Path newStoragePath, RaftOptions raftOptions,
LogStorage oldStorage, LogStorage newStorage) {
+ this.newLogStorage = newStorage;
+ this.oldLogStorage = oldStorage;
+ String statusCheckpointPath =
newStoragePath.resolve(STATUS_CHECKPOINT_PATH).toString();
+
+ this.statusCheckpoint = new
HybridStorageStatusCheckpoint(statusCheckpointPath, raftOptions);
+ }
+
+ @Override
+ public boolean init( LogStorageOptions opts) {
+ try {
+ this.statusCheckpoint.load();
+ this.isOldStorageExist = this.statusCheckpoint.isOldStorageExist;
+ if (this.isOldStorageExist) {
+ if (this.oldLogStorage != null) {
+ if (!this.oldLogStorage.init(opts)) {
+ LOG.warn("Init old log storage failed when startup
hybridLogStorage");
+ return false;
+ }
+ long lastLogIndex = this.oldLogStorage.getLastLogIndex();
+ if (lastLogIndex == 0) {
+ shutdownOldLogStorage();
+ } else if (lastLogIndex > 0) {
+ // Still exists logs in oldLogStorage, need to wait
snapshot
+ this.thresholdIndex = lastLogIndex + 1;
+ this.isOldStorageExist = true;
+ LOG.info(
+ "Still exists logs in oldLogStorage, lastIndex:
{}, need to wait snapshot to truncate logs",
+ lastLogIndex);
+ }
+ } else {
+ this.isOldStorageExist = false;
+ }
+ }
+
+ if (!this.newLogStorage.init(opts)) {
+ LOG.warn("Init new log storage failed when startup
hybridLogStorage");
+ return false;
+ }
+
+ saveStatusCheckpoint();
+
+ return true;
+ } catch (IOException e) {
+ LOG.error("Error happen when load hybrid status checkpoint", e);
+ return false;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (isOldStorageExist()) {
+ this.oldLogStorage.shutdown();
+ }
+ this.newLogStorage.shutdown();
+ }
+
+ @Override
+ public long getFirstLogIndex() {
+ if (isOldStorageExist()) {
+ return this.oldLogStorage.getFirstLogIndex();
+ }
+ return this.newLogStorage.getFirstLogIndex();
+ }
+
+ @Override
+ public long getLastLogIndex() {
+ long newLastLogIndex = this.newLogStorage.getLastLogIndex();
+ if (newLastLogIndex > 0) {
+ return newLastLogIndex;
+ }
+ if (isOldStorageExist()) {
+ return this.oldLogStorage.getLastLogIndex();
+ }
+ return 0;
+ }
+
+ @Override
+ public LogEntry getEntry( long index) {
+ if (index >= this.thresholdIndex) {
+ return this.newLogStorage.getEntry(index);
+ }
+ if (isOldStorageExist()) {
+ return this.oldLogStorage.getEntry(index);
+ }
+ return null;
+ }
+
+ @Override
+ public long getTerm( long index) {
+ if (index >= this.thresholdIndex) {
+ return this.newLogStorage.getTerm(index);
+ }
+ if (isOldStorageExist()) {
+ return this.oldLogStorage.getTerm(index);
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean appendEntry( LogEntry entry) {
+ return this.newLogStorage.appendEntry(entry);
+ }
+
+ @Override
+ public int appendEntries( List<LogEntry> entries) {
+ return this.newLogStorage.appendEntries(entries);
+ }
+
+ @Override
+ public boolean truncatePrefix( long firstIndexKept) {
+ if (!isOldStorageExist()) {
+ return this.newLogStorage.truncatePrefix(firstIndexKept);
+ }
+
+ if (firstIndexKept < this.thresholdIndex) {
+ return this.oldLogStorage.truncatePrefix(firstIndexKept);
+ }
+
+ // When firstIndex >= thresholdIndex, we can reset the old storage the
shutdown it.
+ if (isOldStorageExist()) {
+ this.oldLogStorage.reset(1);
+ shutdownOldLogStorage();
+ LOG.info("Truncate prefix at logIndex : {}, the thresholdIndex is
: {}, shutdown oldLogStorage success!",
+ firstIndexKept, this.thresholdIndex);
+ }
+ return this.newLogStorage.truncatePrefix(firstIndexKept);
+ }
+
+ @Override
+ public boolean truncateSuffix( long lastIndexKept) {
+ if (isOldStorageExist()) {
+ if (!this.oldLogStorage.truncateSuffix(lastIndexKept)) {
+ return false;
+ }
+ }
+ return this.newLogStorage.truncateSuffix(lastIndexKept);
+ }
+
+ @Override
+ public boolean reset( long nextLogIndex) {
+ if (isOldStorageExist()) {
+ if (!this.oldLogStorage.reset(nextLogIndex)) {
+ return false;
+ }
+ shutdownOldLogStorage();
+ }
+ return this.newLogStorage.reset(nextLogIndex);
+ }
+
+ private void shutdownOldLogStorage() {
+ this.oldLogStorage.shutdown();
+ this.isOldStorageExist = false;
+ this.thresholdIndex = 0;
+ saveStatusCheckpoint();
+ }
+
+ private void saveStatusCheckpoint() {
+ this.statusCheckpoint.isOldStorageExist = this.isOldStorageExist;
+ try {
+ // Save status
+ this.statusCheckpoint.save();
+ } catch ( IOException e) {
+ LOG.error("Error happen when save hybrid status checkpoint", e);
+ }
+ }
+
+ public boolean isOldStorageExist() {
+ return this.isOldStorageExist;
+ }
+
+ @OnlyForTest
+ public long getThresholdIndex() {
+ return thresholdIndex;
+ }
+}
+
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/HybridStorageStatusCheckpoint.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/HybridStorageStatusCheckpoint.java
new file mode 100644
index 00000000000..712202d0504
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/HybridStorageStatusCheckpoint.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;import
org.apache.ignite.raft.jraft.util.Bits;
+
+/**
+ * This checkpoint is used for saving whether the oldStorage in HybridStorage
is shutdown.
+ */
+public class HybridStorageStatusCheckpoint extends Checkpoint {
+ public boolean isOldStorageExist = true;
+
+ public HybridStorageStatusCheckpoint(String path, RaftOptions raftOptions)
{
+ super(path, raftOptions);
+ }
+
+ @Override
+ public byte[] encode() {
+ byte[] bs = new byte[2];
+ short val = (short) (this.isOldStorageExist ? 1 : 0);
+ Bits.putShort(bs, 0, val);
+ return bs;
+ }
+
+ @Override
+ public boolean decode(byte[] bs) {
+ if (bs.length < 1) {
+ return false;
+ }
+ short val = Bits.getShort(bs, 0);
+ this.isOldStorageExist = val == 1;
+ return true;
+ }
+}
+
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/HybridLogStorageTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/HybridLogStorageTest.java
new file mode 100644
index 00000000000..7be47dc1532
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/HybridLogStorageTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.internal.raft.storage.LogStorageManager;
+import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageManager;
+import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageManager;
+import org.apache.ignite.raft.jraft.JRaftServiceFactory;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.core.HybridLogJRaftServiceFactory;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.BaseStorageTest;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LocalLogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.HybridLogStorage;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+class HybridLogStorageTest extends BaseStorageTest {
+ private static final String STORAGE_RELATIVE_PATH = "log";
+
+ private static final String NEW_STORAGE_RELATIVE_PATH = "new";
+
+ @Test
+ public void testTransferLogStorage() {
+ RaftOptions raftOptions = new RaftOptions();
+
+ raftOptions.setStartupOldStorage(true);
+
+ LogStorage oldStorage = new LocalLogStorage(raftOptions);
+
+ LogStorageManager oldStorageFactory = new LocalLogStorageManager() {
+ @Override
+ public LogStorage createLogStorage(String uri, RaftOptions
raftOptions) {
+ return oldStorage;
+ }
+ };
+
+ assertTrue(oldStorage.init(logStorageOptions()));
+
+ long valueCount = 10;
+
+ for (int i = 1; i <= valueCount; i++) {
+ oldStorage.appendEntry(TestUtils.mockEntry(i, 1));
+ }
+
+ oldStorage.shutdown();
+
+ HybridLogStorage hybridLogStorage =
createHybridLogStorage(raftOptions, oldStorageFactory);
+
+ assertTrue(hybridLogStorage.init(logStorageOptions()));
+
+ assertTrue(hybridLogStorage.isOldStorageExist());
+
+ // Checkpoint saved to disk when storage is started.
+ assertTrue(Files.exists(statusCheckpointPath()));
+
+ long expectedThresholdIndex = oldStorage.getLastLogIndex() + 1;
+
+ assertEquals(expectedThresholdIndex,
hybridLogStorage.getThresholdIndex());
+
+ for (int i = 0; i < valueCount; i++) {
+ hybridLogStorage.appendEntry(TestUtils.mockEntry((int)
(expectedThresholdIndex + i), 1));
+ }
+
+ assertEquals(expectedThresholdIndex + valueCount - 1,
hybridLogStorage.getLastLogIndex());
+
+ hybridLogStorage.truncatePrefix(expectedThresholdIndex);
+ assertEquals(expectedThresholdIndex,
hybridLogStorage.getFirstLogIndex());
+ assertFalse(hybridLogStorage.isOldStorageExist());
+
+ hybridLogStorage.shutdown();
+
+ assertTrue(hybridLogStorage.init(logStorageOptions()));
+ assertFalse(hybridLogStorage.isOldStorageExist());
+ assertEquals(0, hybridLogStorage.getThresholdIndex());
+ assertEquals(expectedThresholdIndex,
hybridLogStorage.getFirstLogIndex());
+ assertEquals(expectedThresholdIndex + valueCount - 1,
hybridLogStorage.getLastLogIndex());
+
+ // Entries written to new storage must be readable after restart.
+ for (int i = 0; i < valueCount; i++) {
+ long index = expectedThresholdIndex + i;
+ assertNotNull(hybridLogStorage.getEntry(index), "Entry missing at
index " + index);
+ }
+ }
+
+ @Test
+ public void testHybridStorageWithoutOldStorage() {
+ RaftOptions raftOptions = new RaftOptions();
+
+ raftOptions.setStartupOldStorage(false);
+
+ HybridLogStorage hybridLogStorage =
createHybridLogStorage(raftOptions, null);
+
+ assertTrue(hybridLogStorage.init(logStorageOptions()));
+
+ assertFalse(hybridLogStorage.isOldStorageExist());
+ assertEquals(0, hybridLogStorage.getThresholdIndex());
+
+ // Checkpoint saved to disk when storage is started.
+ assertTrue(Files.exists(statusCheckpointPath()));
+
+ long valueCount = 10;
+
+ for (int i = 1; i <= valueCount; i++) {
+ hybridLogStorage.appendEntry(TestUtils.mockEntry(i, 1));
+ }
+
+ assertEquals(1, hybridLogStorage.getFirstLogIndex());
+ assertEquals(valueCount, hybridLogStorage.getLastLogIndex());
+
+ hybridLogStorage.shutdown();
+
+ hybridLogStorage.init(logStorageOptions());
+ assertFalse(hybridLogStorage.isOldStorageExist());
+ assertEquals(0, hybridLogStorage.getThresholdIndex());
+ assertEquals(1, hybridLogStorage.getFirstLogIndex());
+ assertEquals(valueCount, hybridLogStorage.getLastLogIndex());
+
+ hybridLogStorage.shutdown();
+ }
+
+ private HybridLogStorage createHybridLogStorage(RaftOptions raftOptions,
LogStorageManager oldStorageFactory) {
+ Path storagePath =
path.resolve(STORAGE_RELATIVE_PATH).resolve(NEW_STORAGE_RELATIVE_PATH);
+
+ LogStorageManager newStorageFactory = new
LogitLogStorageManager("test", storeOptions(), storagePath);
+
+ JRaftServiceFactory factory = new
HybridLogJRaftServiceFactory(oldStorageFactory, newStorageFactory, storagePath);
+
+ return (HybridLogStorage) factory.createLogStorage("test-group-id",
raftOptions);
+ }
+
+ private Path statusCheckpointPath() {
+ return
path.resolve(STORAGE_RELATIVE_PATH).resolve(NEW_STORAGE_RELATIVE_PATH).resolve(HybridLogStorage.STATUS_CHECKPOINT_PATH);
+ }
+
+ private static StoreOptions storeOptions() {
+ StoreOptions storeOptions = new StoreOptions();
+ storeOptions.setSegmentFileSize(512 * 1024);
+ storeOptions.setConfFileSize(512 * 1024);
+ storeOptions.setEnableWarmUpFile(false);
+ return storeOptions;
+ }
+
+ private static LogStorageOptions logStorageOptions() {
+ LogStorageOptions opts = new LogStorageOptions();
+ opts.setConfigurationManager(new ConfigurationManager());
+ opts.setLogEntryCodecFactory(LogEntryV1CodecFactory.getInstance());
+ return opts;
+ }
+}