This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 07fd38004c IGNITE-22668 Fix batch commit in StripeAwareLogManager
(#4047)
07fd38004c is described below
commit 07fd38004c7de06ce773976fa3768876410524e9
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Jul 8 17:23:10 2024 +0300
IGNITE-22668 Fix batch commit in StripeAwareLogManager (#4047)
---
.../apache/ignite/internal/raft/ItLozaTest.java | 237 +++++++++++++++------
.../storage/impl/DefaultLogStorageFactory.java | 25 ++-
.../raft/storage/impl/RocksDbSharedLogStorage.java | 12 +-
.../raft/storage/impl/StripeAwareLogManager.java | 29 ++-
4 files changed, 216 insertions(+), 87 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 91382ff48f..78938cec9e 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -17,9 +17,15 @@
package org.apache.ignite.internal.raft;
+import static java.util.concurrent.CompletableFuture.allOf;
+import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.raft.TestWriteCommand.testWriteCommand;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
@@ -31,28 +37,44 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.StaticNodeFinder;
-import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.CommandClosure;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ReverseIterator;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.storage.LogManager;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -60,24 +82,49 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests for {@link Loza} functionality.
*/
-@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
-public class ItLozaTest extends BaseIgniteAbstractTest {
+public class ItLozaTest extends IgniteAbstractTest {
/** Server port offset. */
private static final int PORT = 20010;
- @WorkDirectory
- private Path dataPath;
+ private final ComponentContext componentContext = new ComponentContext();
- @InjectConfiguration
- private static RaftConfiguration raftConfiguration;
+ private ClusterService clusterService;
+
+ private Loza loza;
+
+ private final List<IgniteComponent> allComponents = new ArrayList<>();
+
+ @BeforeEach
+ void setUp(TestInfo testInfo) {
+ var addr = new NetworkAddress("localhost", PORT);
+
+ clusterService = clusterService(testInfo, PORT, new
StaticNodeFinder(List.of(addr)));
+
+ assertThat(clusterService.startAsync(componentContext),
willCompleteSuccessfully());
+
+ allComponents.add(clusterService);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (loza != null) {
+ IgniteUtils.closeAll(loza.localNodes().stream().map(nodeId -> ()
-> loza.stopRaftNode(nodeId)));
+
+ allComponents.add(loza);
+ }
+
+ new ReverseIterator<>(allComponents).forEachRemaining(c -> {
+ assertThat(c.stopAsync(componentContext),
willCompleteSuccessfully());
+ });
+ }
/**
* Starts a raft group service with a provided group id on a provided Loza
instance.
*
* @return Raft group service.
*/
- private RaftGroupService startClient(TestReplicationGroupId groupId,
ClusterNode node, Loza loza) throws Exception {
+ private RaftGroupService startClient(TestReplicationGroupId groupId,
ClusterNode node) throws Exception {
RaftGroupListener raftGroupListener = mock(RaftGroupListener.class);
when(raftGroupListener.onSnapshotLoad(any())).thenReturn(true);
@@ -91,77 +138,147 @@ public class ItLozaTest extends BaseIgniteAbstractTest {
}
/**
- * Returns the client cluster view.
- *
- * @param testInfo Test info.
- * @param port Local port.
- * @param srvs Server nodes of the cluster.
- * @return The client cluster view.
+ * Tests that RaftGroupServiceImpl uses shared executor for retrying
RaftGroupServiceImpl#sendWithRetry().
*/
- private static ClusterService clusterService(TestInfo testInfo, int port,
List<NetworkAddress> srvs) {
- var network = ClusterServiceTestUtils.clusterService(testInfo, port,
new StaticNodeFinder(srvs));
+ @Test
+ public void testRaftServiceUsingSharedExecutor(@InjectConfiguration
RaftConfiguration raftConfiguration) throws Exception {
+ ClusterService spyService = spy(clusterService);
- assertThat(network.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ MessagingService messagingServiceMock =
spy(spyService.messagingService());
- return network;
- }
+ when(spyService.messagingService()).thenReturn(messagingServiceMock);
- /**
- * Tests that RaftGroupServiceImpl uses shared executor for retrying
RaftGroupServiceImpl#sendWithRetry().
- */
- @Test
- public void testRaftServiceUsingSharedExecutor(TestInfo testInfo) throws
Exception {
- ClusterService service = null;
+ CompletableFuture<NetworkMessage> exception =
CompletableFuture.failedFuture(new IOException());
+
+ loza = TestLozaFactory.create(spyService, raftConfiguration, workDir,
new HybridClockImpl());
+
+ assertThat(loza.startAsync(componentContext),
willCompleteSuccessfully());
+
+ for (int i = 0; i < 5; i++) {
+ // return an error on first invocation
+ doReturn(exception)
+ // assert that a retry has been issued on the executor
+ .doAnswer(invocation -> {
+ assertThat(Thread.currentThread().getName(),
containsString(Loza.CLIENT_POOL_NAME));
+
+ return exception;
+ })
+ // finally call the real method
+ .doCallRealMethod()
+ .when(messagingServiceMock).invoke(any(ClusterNode.class),
any(), anyLong());
- Loza loza = null;
+ startClient(new TestReplicationGroupId(Integer.toString(i)),
spyService.topologyService().localMember());
- RaftGroupService[] grpSrvcs = new RaftGroupService[5];
+ verify(messagingServiceMock, times(3 * (i + 1)))
+ .invoke(any(ClusterNode.class), any(), anyLong());
+ }
+ }
- try {
- service = spy(clusterService(testInfo, PORT, List.of()));
+ /**
+ * Tests a scenario when two types of Raft Log Storages are bound to the
same stripe and validates that data gets correctly saved
+ * to both of them during writes.
+ */
+ @RepeatedTest(100)
+ void testVolatileAndPersistentGroupsOnSameStripe(
+ @InjectConfiguration("mock.logStripesCount=1")
+ RaftConfiguration raftConfiguration
+ ) throws Exception {
+ loza = TestLozaFactory.create(clusterService, raftConfiguration,
workDir, new HybridClockImpl());
- MessagingService messagingServiceMock =
spy(service.messagingService());
+ assertThat(loza.startAsync(componentContext),
willCompleteSuccessfully());
- when(service.messagingService()).thenReturn(messagingServiceMock);
+ String nodeName = clusterService.nodeName();
- CompletableFuture<NetworkMessage> exception =
CompletableFuture.failedFuture(new IOException());
+ var volatileLogStorageFactoryCreator = new
VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("spill"));
- loza = TestLozaFactory.create(service, raftConfiguration,
dataPath, new HybridClockImpl());
+
assertThat(volatileLogStorageFactoryCreator.startAsync(componentContext),
willCompleteSuccessfully());
- assertThat(loza.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+ allComponents.add(volatileLogStorageFactoryCreator);
- for (int i = 0; i < grpSrvcs.length; i++) {
- // return an error on first invocation
- doReturn(exception)
- // assert that a retry has been issued on the executor
- .doAnswer(invocation -> {
- assertThat(Thread.currentThread().getName(),
containsString(Loza.CLIENT_POOL_NAME));
+ PeersAndLearners configuration =
PeersAndLearners.fromConsistentIds(Set.of(nodeName));
- return exception;
- })
- // finally call the real method
- .doCallRealMethod()
-
.when(messagingServiceMock).invoke(any(ClusterNode.class), any(), anyLong());
+ Peer peer = configuration.peer(nodeName);
- grpSrvcs[i] = startClient(new
TestReplicationGroupId(Integer.toString(i)),
service.topologyService().localMember(), loza);
+ // Raft listener that simply drains the given iterators.
+ RaftGroupListener raftGroupListener = new RaftGroupListener() {
+ @Override
+ public void onWrite(Iterator<CommandClosure<WriteCommand>>
iterator) {
+ iterator.forEachRemaining(c -> c.result(null));
+ }
- verify(messagingServiceMock, times(3 * (i + 1)))
- .invoke(any(ClusterNode.class), any(), anyLong());
+ @Override
+ public void onRead(Iterator<CommandClosure<ReadCommand>> iterator)
{
}
- } finally {
- for (RaftGroupService srvc : grpSrvcs) {
- srvc.shutdown();
- loza.stopRaftNodes(srvc.groupId());
+ @Override
+ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo)
{
}
- if (loza != null) {
- assertThat(loza.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
+ @Override
+ public boolean onSnapshotLoad(Path path) {
+ return true;
}
- if (service != null) {
- assertThat(service.stopAsync(new ComponentContext()),
willCompleteSuccessfully());
+ @Override
+ public void onShutdown() {
}
- }
+ };
+
+ LogStorageBudgetView volatileCfg =
raftConfiguration.volatileRaft().logStorage().value();
+
+ LogStorageFactory volatileLogStorageFactory =
volatileLogStorageFactoryCreator.factory(volatileCfg);
+
+ // Start two Raft nodes: one backed by a volatile storage and the
other - by "shared" persistent storage.
+ var volatileRaftNodeId = new RaftNodeId(new
TestReplicationGroupId("volatile"), peer);
+
+ CompletableFuture<RaftGroupService> volatileServiceFuture =
loza.startRaftGroupNode(
+ volatileRaftNodeId,
+ configuration,
+ raftGroupListener,
+ RaftGroupEventsListener.noopLsnr,
+ RaftGroupOptions.forVolatileStores()
+ .setLogStorageFactory(volatileLogStorageFactory)
+ .raftMetaStorageFactory((groupId, raftOptions) -> new
VolatileRaftMetaStorage())
+ );
+
+ var persistentNodeId = new RaftNodeId(new
TestReplicationGroupId("persistent"), peer);
+
+ CompletableFuture<RaftGroupService> persistentServiceFuture =
loza.startRaftGroupNode(
+ persistentNodeId,
+ configuration,
+ raftGroupListener,
+ RaftGroupEventsListener.noopLsnr,
+ RaftGroupOptions.forPersistentStores()
+ );
+
+ assertThat(allOf(volatileServiceFuture, persistentServiceFuture),
willCompleteSuccessfully());
+
+ RaftGroupService volatileService = volatileServiceFuture.join();
+ RaftGroupService persistentService = persistentServiceFuture.join();
+
+ // Execute two write command in parallel. We then hope that these
commands wil be batched together.
+ WriteCommand cmd = testWriteCommand("foo");
+
+ CompletableFuture<Void> f1 = volatileService.run(cmd);
+ CompletableFuture<Void> f2 = persistentService.run(cmd);
+
+ assertThat(f1, willCompleteSuccessfully());
+ assertThat(f2, willCompleteSuccessfully());
+
+ // Inspect the raft log and validates that both writes have been
committed to the storage.
+ validateLastLogEntry(volatileRaftNodeId);
+ validateLastLogEntry(persistentNodeId);
+ }
+
+ private void validateLastLogEntry(RaftNodeId raftNodeId) {
+ Node raftNode = ((JraftServerImpl)
loza.server()).raftGroupService(raftNodeId).getRaftNode();
+
+ LogManager logManager = getFieldValue(raftNode, "logManager");
+
+ long lastLogIndex = logManager.getLastLogIndex();
+
+ LogEntry entry = logManager.getEntry(lastLogIndex);
+
+ assertThat(entry, is(notNullValue()));
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
index 89901affe1..763b1c93ef 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
@@ -41,6 +41,7 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.Platform;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.rocksdb.AbstractNativeReference;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -205,7 +206,8 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
}
/**
- * Returns a thread-local {@link WriteBatch} instance, attached to current
factory, append data from multiple storages at the same time.
+ * Returns or creates a thread-local {@link WriteBatch} instance, attached
to current factory, for appending data
+ * from multiple storages at the same time.
*/
WriteBatch getOrCreateThreadLocalWriteBatch() {
WriteBatch writeBatch = threadLocalWriteBatch.get();
@@ -220,16 +222,23 @@ public class DefaultLogStorageFactory implements
LogStorageFactory {
}
/**
- * Clears {@link WriteBatch} returned by {@link
#getOrCreateThreadLocalWriteBatch()}.
+ * Returns a thread-local {@link WriteBatch} instance, attached to current
factory, for appending append data from multiple storages
+ * at the same time.
+ *
+ * @return {@link WriteBatch} instance or {@code null} if it was never
created.
*/
- void clearThreadLocalWriteBatch() {
- WriteBatch writeBatch = threadLocalWriteBatch.get();
+ @Nullable
+ WriteBatch getThreadLocalWriteBatch() {
+ return threadLocalWriteBatch.get();
+ }
- if (writeBatch != null) {
- writeBatch.close();
+ /**
+ * Clears {@link WriteBatch} returned by {@link
#getOrCreateThreadLocalWriteBatch()}.
+ */
+ void clearThreadLocalWriteBatch(WriteBatch writeBatch) {
+ writeBatch.close();
- threadLocalWriteBatch.set(null);
- }
+ threadLocalWriteBatch.remove();
}
/**
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
index 2382f1aeb3..3c15dfc6a4 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
@@ -487,19 +487,23 @@ public class RocksDbSharedLogStorage implements
LogStorage, Describer {
/**
* Writes batch, previously filled by {@link #appendEntriesToBatch(List)}
calls, into a rocksdb storage and clears the batch by calling
- * {@link DefaultLogStorageFactory#clearThreadLocalWriteBatch()}.
+ * {@link DefaultLogStorageFactory#clearThreadLocalWriteBatch}.
*/
void commitWriteBatch() {
- try {
- WriteBatch writeBatch =
logStorageFactory.getOrCreateThreadLocalWriteBatch();
+ WriteBatch writeBatch = logStorageFactory.getThreadLocalWriteBatch();
+ if (writeBatch == null) {
+ return;
+ }
+
+ try {
if (writeBatch.count() > 0) {
db.write(this.writeOptions, writeBatch);
}
} catch (RocksDBException e) {
LOG.error("Execute batch failed with rocksdb exception.", e);
} finally {
- logStorageFactory.clearThreadLocalWriteBatch();
+ logStorageFactory.clearThreadLocalWriteBatch(writeBatch);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
index 66c77c3c71..9f9d423c1b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
@@ -227,23 +227,22 @@ public class StripeAwareLogManager extends LogManagerImpl
{
appendBatcher.appendToStorage();
}
- if (!appendBatchers.isEmpty()) {
- // Since the storage is shared, any batcher can flush it.
- // This is a little confusing and hacky, but it doesn't
require explicit access to the log storage factory,
- // which makes it far easier to use in current jraft code.
- // The reason why we don't call this method on log factory,
for example, is because the factory doesn't have proper access
- // to the RAFT configuration, and can't say, whether it should
use "fsync" or not, for example.
- try {
- appendBatchers.iterator().next().commitWriteBatch();
- } catch (Exception e) {
- LOG.error("**Critical error**, failed to appendEntries.",
e);
-
- for (StripeAwareAppendBatcher appendBatcher :
appendBatchers) {
- appendBatcher.reportError(RaftError.EIO.getNumber(),
"Fail to append log entries");
- }
+ // Calling "commitWriteBatch" on StripeAwareAppendBatcher is
confusing and hacky, but it doesn't require explicit access
+ // to the log storage factory, which makes it far easier to use in
current jraft code.
+ // The reason why we don't call this method on log factory, for
example, is because the factory doesn't have proper access
+ // to the RAFT configuration, and can't say, whether it should use
"fsync" or not, for example.
+ try {
+ for (StripeAwareAppendBatcher appendBatcher : appendBatchers) {
+ appendBatcher.commitWriteBatch();
+ }
+ } catch (Exception e) {
+ LOG.error("**Critical error**, failed to appendEntries.", e);
- return;
+ for (StripeAwareAppendBatcher appendBatcher : appendBatchers) {
+ appendBatcher.reportError(RaftError.EIO.getNumber(), "Fail
to append log entries");
}
+
+ return;
}
// When data is committed, we can notify all stable closures and
send response messages.