This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 07fd38004c IGNITE-22668 Fix batch commit in StripeAwareLogManager 
(#4047)
07fd38004c is described below

commit 07fd38004c7de06ce773976fa3768876410524e9
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Jul 8 17:23:10 2024 +0300

    IGNITE-22668 Fix batch commit in StripeAwareLogManager (#4047)
---
 .../apache/ignite/internal/raft/ItLozaTest.java    | 237 +++++++++++++++------
 .../storage/impl/DefaultLogStorageFactory.java     |  25 ++-
 .../raft/storage/impl/RocksDbSharedLogStorage.java |  12 +-
 .../raft/storage/impl/StripeAwareLogManager.java   |  29 ++-
 4 files changed, 216 insertions(+), 87 deletions(-)

diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 91382ff48f..78938cec9e 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -17,9 +17,15 @@
 
 package org.apache.ignite.internal.raft;
 
+import static java.util.concurrent.CompletableFuture.allOf;
+import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.raft.TestWriteCommand.testWriteCommand;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doReturn;
@@ -31,28 +37,44 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.StaticNodeFinder;
-import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.raft.service.CommandClosure;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ReverseIterator;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.storage.LogManager;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -60,24 +82,49 @@ import org.junit.jupiter.api.extension.ExtendWith;
 /**
  * Tests for {@link Loza} functionality.
  */
-@ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ConfigurationExtension.class)
-public class ItLozaTest extends BaseIgniteAbstractTest {
+public class ItLozaTest extends IgniteAbstractTest {
     /** Server port offset. */
     private static final int PORT = 20010;
 
-    @WorkDirectory
-    private Path dataPath;
+    private final ComponentContext componentContext = new ComponentContext();
 
-    @InjectConfiguration
-    private static RaftConfiguration raftConfiguration;
+    private ClusterService clusterService;
+
+    private Loza loza;
+
+    private final List<IgniteComponent> allComponents = new ArrayList<>();
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) {
+        var addr = new NetworkAddress("localhost", PORT);
+
+        clusterService = clusterService(testInfo, PORT, new 
StaticNodeFinder(List.of(addr)));
+
+        assertThat(clusterService.startAsync(componentContext), 
willCompleteSuccessfully());
+
+        allComponents.add(clusterService);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (loza != null) {
+            IgniteUtils.closeAll(loza.localNodes().stream().map(nodeId -> () 
-> loza.stopRaftNode(nodeId)));
+
+            allComponents.add(loza);
+        }
+
+        new ReverseIterator<>(allComponents).forEachRemaining(c -> {
+            assertThat(c.stopAsync(componentContext), 
willCompleteSuccessfully());
+        });
+    }
 
     /**
      * Starts a raft group service with a provided group id on a provided Loza 
instance.
      *
      * @return Raft group service.
      */
-    private RaftGroupService startClient(TestReplicationGroupId groupId, 
ClusterNode node, Loza loza) throws Exception {
+    private RaftGroupService startClient(TestReplicationGroupId groupId, 
ClusterNode node) throws Exception {
         RaftGroupListener raftGroupListener = mock(RaftGroupListener.class);
 
         when(raftGroupListener.onSnapshotLoad(any())).thenReturn(true);
@@ -91,77 +138,147 @@ public class ItLozaTest extends BaseIgniteAbstractTest {
     }
 
     /**
-     * Returns the client cluster view.
-     *
-     * @param testInfo Test info.
-     * @param port     Local port.
-     * @param srvs     Server nodes of the cluster.
-     * @return The client cluster view.
+     * Tests that RaftGroupServiceImpl uses shared executor for retrying 
RaftGroupServiceImpl#sendWithRetry().
      */
-    private static ClusterService clusterService(TestInfo testInfo, int port, 
List<NetworkAddress> srvs) {
-        var network = ClusterServiceTestUtils.clusterService(testInfo, port, 
new StaticNodeFinder(srvs));
+    @Test
+    public void testRaftServiceUsingSharedExecutor(@InjectConfiguration 
RaftConfiguration raftConfiguration) throws Exception {
+        ClusterService spyService = spy(clusterService);
 
-        assertThat(network.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+        MessagingService messagingServiceMock = 
spy(spyService.messagingService());
 
-        return network;
-    }
+        when(spyService.messagingService()).thenReturn(messagingServiceMock);
 
-    /**
-     * Tests that RaftGroupServiceImpl uses shared executor for retrying 
RaftGroupServiceImpl#sendWithRetry().
-     */
-    @Test
-    public void testRaftServiceUsingSharedExecutor(TestInfo testInfo) throws 
Exception {
-        ClusterService service = null;
+        CompletableFuture<NetworkMessage> exception = 
CompletableFuture.failedFuture(new IOException());
+
+        loza = TestLozaFactory.create(spyService, raftConfiguration, workDir, 
new HybridClockImpl());
+
+        assertThat(loza.startAsync(componentContext), 
willCompleteSuccessfully());
+
+        for (int i = 0; i < 5; i++) {
+            // return an error on first invocation
+            doReturn(exception)
+                    // assert that a retry has been issued on the executor
+                    .doAnswer(invocation -> {
+                        assertThat(Thread.currentThread().getName(), 
containsString(Loza.CLIENT_POOL_NAME));
+
+                        return exception;
+                    })
+                    // finally call the real method
+                    .doCallRealMethod()
+                    .when(messagingServiceMock).invoke(any(ClusterNode.class), 
any(), anyLong());
 
-        Loza loza = null;
+            startClient(new TestReplicationGroupId(Integer.toString(i)), 
spyService.topologyService().localMember());
 
-        RaftGroupService[] grpSrvcs = new RaftGroupService[5];
+            verify(messagingServiceMock, times(3 * (i + 1)))
+                    .invoke(any(ClusterNode.class), any(), anyLong());
+        }
+    }
 
-        try {
-            service = spy(clusterService(testInfo, PORT, List.of()));
+    /**
+     * Tests a scenario when two types of Raft Log Storages are bound to the 
same stripe and validates that data gets correctly saved
+     * to both of them during writes.
+     */
+    @RepeatedTest(100)
+    void testVolatileAndPersistentGroupsOnSameStripe(
+            @InjectConfiguration("mock.logStripesCount=1")
+            RaftConfiguration raftConfiguration
+    ) throws Exception {
+        loza = TestLozaFactory.create(clusterService, raftConfiguration, 
workDir, new HybridClockImpl());
 
-            MessagingService messagingServiceMock = 
spy(service.messagingService());
+        assertThat(loza.startAsync(componentContext), 
willCompleteSuccessfully());
 
-            when(service.messagingService()).thenReturn(messagingServiceMock);
+        String nodeName = clusterService.nodeName();
 
-            CompletableFuture<NetworkMessage> exception = 
CompletableFuture.failedFuture(new IOException());
+        var volatileLogStorageFactoryCreator = new 
VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("spill"));
 
-            loza = TestLozaFactory.create(service, raftConfiguration, 
dataPath, new HybridClockImpl());
+        
assertThat(volatileLogStorageFactoryCreator.startAsync(componentContext), 
willCompleteSuccessfully());
 
-            assertThat(loza.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+        allComponents.add(volatileLogStorageFactoryCreator);
 
-            for (int i = 0; i < grpSrvcs.length; i++) {
-                // return an error on first invocation
-                doReturn(exception)
-                        // assert that a retry has been issued on the executor
-                        .doAnswer(invocation -> {
-                            assertThat(Thread.currentThread().getName(), 
containsString(Loza.CLIENT_POOL_NAME));
+        PeersAndLearners configuration = 
PeersAndLearners.fromConsistentIds(Set.of(nodeName));
 
-                            return exception;
-                        })
-                        // finally call the real method
-                        .doCallRealMethod()
-                        
.when(messagingServiceMock).invoke(any(ClusterNode.class), any(), anyLong());
+        Peer peer = configuration.peer(nodeName);
 
-                grpSrvcs[i] = startClient(new 
TestReplicationGroupId(Integer.toString(i)), 
service.topologyService().localMember(), loza);
+        // Raft listener that simply drains the given iterators.
+        RaftGroupListener raftGroupListener = new RaftGroupListener() {
+            @Override
+            public void onWrite(Iterator<CommandClosure<WriteCommand>> 
iterator) {
+                iterator.forEachRemaining(c -> c.result(null));
+            }
 
-                verify(messagingServiceMock, times(3 * (i + 1)))
-                        .invoke(any(ClusterNode.class), any(), anyLong());
+            @Override
+            public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) 
{
             }
-        } finally {
-            for (RaftGroupService srvc : grpSrvcs) {
-                srvc.shutdown();
 
-                loza.stopRaftNodes(srvc.groupId());
+            @Override
+            public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) 
{
             }
 
-            if (loza != null) {
-                assertThat(loza.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+            @Override
+            public boolean onSnapshotLoad(Path path) {
+                return true;
             }
 
-            if (service != null) {
-                assertThat(service.stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+            @Override
+            public void onShutdown() {
             }
-        }
+        };
+
+        LogStorageBudgetView volatileCfg = 
raftConfiguration.volatileRaft().logStorage().value();
+
+        LogStorageFactory volatileLogStorageFactory = 
volatileLogStorageFactoryCreator.factory(volatileCfg);
+
+        // Start two Raft nodes: one backed by a volatile storage and the 
other - by "shared" persistent storage.
+        var volatileRaftNodeId = new RaftNodeId(new 
TestReplicationGroupId("volatile"), peer);
+
+        CompletableFuture<RaftGroupService> volatileServiceFuture = 
loza.startRaftGroupNode(
+                volatileRaftNodeId,
+                configuration,
+                raftGroupListener,
+                RaftGroupEventsListener.noopLsnr,
+                RaftGroupOptions.forVolatileStores()
+                        .setLogStorageFactory(volatileLogStorageFactory)
+                        .raftMetaStorageFactory((groupId, raftOptions) -> new 
VolatileRaftMetaStorage())
+        );
+
+        var persistentNodeId = new RaftNodeId(new 
TestReplicationGroupId("persistent"), peer);
+
+        CompletableFuture<RaftGroupService> persistentServiceFuture = 
loza.startRaftGroupNode(
+                persistentNodeId,
+                configuration,
+                raftGroupListener,
+                RaftGroupEventsListener.noopLsnr,
+                RaftGroupOptions.forPersistentStores()
+        );
+
+        assertThat(allOf(volatileServiceFuture, persistentServiceFuture), 
willCompleteSuccessfully());
+
+        RaftGroupService volatileService = volatileServiceFuture.join();
+        RaftGroupService persistentService = persistentServiceFuture.join();
+
+        // Execute two write command in parallel. We then hope that these 
commands wil be batched together.
+        WriteCommand cmd = testWriteCommand("foo");
+
+        CompletableFuture<Void> f1 = volatileService.run(cmd);
+        CompletableFuture<Void> f2 = persistentService.run(cmd);
+
+        assertThat(f1, willCompleteSuccessfully());
+        assertThat(f2, willCompleteSuccessfully());
+
+        // Inspect the raft log and validates that both writes have been 
committed to the storage.
+        validateLastLogEntry(volatileRaftNodeId);
+        validateLastLogEntry(persistentNodeId);
+    }
+
+    private void validateLastLogEntry(RaftNodeId raftNodeId) {
+        Node raftNode = ((JraftServerImpl) 
loza.server()).raftGroupService(raftNodeId).getRaftNode();
+
+        LogManager logManager = getFieldValue(raftNode, "logManager");
+
+        long lastLogIndex = logManager.getLastLogIndex();
+
+        LogEntry entry = logManager.getEntry(lastLogIndex);
+
+        assertThat(entry, is(notNullValue()));
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
index 89901affe1..763b1c93ef 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
@@ -41,6 +41,7 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.apache.ignite.raft.jraft.util.Platform;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 import org.rocksdb.AbstractNativeReference;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -205,7 +206,8 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
     }
 
     /**
-     * Returns a thread-local {@link WriteBatch} instance, attached to current 
factory, append data from multiple storages at the same time.
+     * Returns or creates a thread-local {@link WriteBatch} instance, attached 
to current factory, for appending data
+     * from multiple storages at the same time.
      */
     WriteBatch getOrCreateThreadLocalWriteBatch() {
         WriteBatch writeBatch = threadLocalWriteBatch.get();
@@ -220,16 +222,23 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
     }
 
     /**
-     * Clears {@link WriteBatch} returned by {@link 
#getOrCreateThreadLocalWriteBatch()}.
+     * Returns a thread-local {@link WriteBatch} instance, attached to current 
factory, for appending append data from multiple storages
+     * at the same time.
+     *
+     * @return {@link WriteBatch} instance or {@code null} if it was never 
created.
      */
-    void clearThreadLocalWriteBatch() {
-        WriteBatch writeBatch = threadLocalWriteBatch.get();
+    @Nullable
+    WriteBatch getThreadLocalWriteBatch() {
+        return threadLocalWriteBatch.get();
+    }
 
-        if (writeBatch != null) {
-            writeBatch.close();
+    /**
+     * Clears {@link WriteBatch} returned by {@link 
#getOrCreateThreadLocalWriteBatch()}.
+     */
+    void clearThreadLocalWriteBatch(WriteBatch writeBatch) {
+        writeBatch.close();
 
-            threadLocalWriteBatch.set(null);
-        }
+        threadLocalWriteBatch.remove();
     }
 
     /**
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
index 2382f1aeb3..3c15dfc6a4 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
@@ -487,19 +487,23 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
 
     /**
      * Writes batch, previously filled by {@link #appendEntriesToBatch(List)} 
calls, into a rocksdb storage and clears the batch by calling
-     * {@link DefaultLogStorageFactory#clearThreadLocalWriteBatch()}.
+     * {@link DefaultLogStorageFactory#clearThreadLocalWriteBatch}.
      */
     void commitWriteBatch() {
-        try {
-            WriteBatch writeBatch = 
logStorageFactory.getOrCreateThreadLocalWriteBatch();
+        WriteBatch writeBatch = logStorageFactory.getThreadLocalWriteBatch();
 
+        if (writeBatch == null) {
+            return;
+        }
+
+        try {
             if (writeBatch.count() > 0) {
                 db.write(this.writeOptions, writeBatch);
             }
         } catch (RocksDBException e) {
             LOG.error("Execute batch failed with rocksdb exception.", e);
         } finally {
-            logStorageFactory.clearThreadLocalWriteBatch();
+            logStorageFactory.clearThreadLocalWriteBatch(writeBatch);
         }
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
index 66c77c3c71..9f9d423c1b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
@@ -227,23 +227,22 @@ public class StripeAwareLogManager extends LogManagerImpl 
{
                 appendBatcher.appendToStorage();
             }
 
-            if (!appendBatchers.isEmpty()) {
-                // Since the storage is shared, any batcher can flush it.
-                // This is a little confusing and hacky, but it doesn't 
require explicit access to the log storage factory,
-                // which makes it far easier to use in current jraft code.
-                // The reason why we don't call this method on log factory, 
for example, is because the factory doesn't have proper access
-                // to the RAFT configuration, and can't say, whether it should 
use "fsync" or not, for example.
-                try {
-                    appendBatchers.iterator().next().commitWriteBatch();
-                } catch (Exception e) {
-                    LOG.error("**Critical error**, failed to appendEntries.", 
e);
-
-                    for (StripeAwareAppendBatcher appendBatcher : 
appendBatchers) {
-                        appendBatcher.reportError(RaftError.EIO.getNumber(), 
"Fail to append log entries");
-                    }
+            // Calling "commitWriteBatch" on StripeAwareAppendBatcher is 
confusing and hacky, but it doesn't require explicit access
+            // to the log storage factory, which makes it far easier to use in 
current jraft code.
+            // The reason why we don't call this method on log factory, for 
example, is because the factory doesn't have proper access
+            // to the RAFT configuration, and can't say, whether it should use 
"fsync" or not, for example.
+            try {
+                for (StripeAwareAppendBatcher appendBatcher : appendBatchers) {
+                    appendBatcher.commitWriteBatch();
+                }
+            } catch (Exception e) {
+                LOG.error("**Critical error**, failed to appendEntries.", e);
 
-                    return;
+                for (StripeAwareAppendBatcher appendBatcher : appendBatchers) {
+                    appendBatcher.reportError(RaftError.EIO.getNumber(), "Fail 
to append log entries");
                 }
+
+                return;
             }
 
             // When data is committed, we can notify all stable closures and 
send response messages.

Reply via email to