This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-18446 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit bdbea97b9a6a3a800c2c84826a99207956a8750c Author: Semyon Danilov <[email protected]> AuthorDate: Wed Jan 18 14:58:17 2023 +0200 IGNITE-18446 Add busylock to raft service. --- .../cluster/management/raft/CmgRaftService.java | 4 +- .../matchers/CompletableFutureMatcher.java | 35 ++++++++-- .../impl/ItMetaStorageManagerImplTest.java | 52 ++++++++++++-- .../metastorage/impl/MetaStorageManagerImpl.java | 21 ++++++ .../metastorage/impl/MetaStorageService.java | 6 +- .../metastorage/impl/MetaStorageServiceImpl.java | 5 ++ .../ignite/internal/raft/RaftGroupServiceImpl.java | 79 ++++++++++++++-------- 7 files changed, 159 insertions(+), 43 deletions(-) diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java index 9459c855d0..75d4525ac7 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java @@ -193,9 +193,9 @@ public class CmgRaftService { } /** - * Returns a list of consistent IDs of the voting nodes of the CMG. + * Returns a set of consistent IDs of the voting nodes of the CMG. * - * @return List of consistent IDs of the voting nodes of the CMG. + * @return Set of consistent IDs of the voting nodes of the CMG. */ public Set<String> nodeNames() { List<Peer> peers = raftService.peers(); diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java index bdbd3df5d9..18bb7661cd 100644 --- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -95,7 +96,7 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu } return matcher.matches(res); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException | ExecutionException | TimeoutException | CancellationException e) { if (causeOfFail != null) { assertTrue(hasCause(e, causeOfFail, null)); @@ -150,10 +151,9 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu } /** - * Creates a matcher that matches a future that completes successfully and decently fast. + * Creates a matcher that matches a future that completes exceptionally and decently fast. * - * @param cause If {@code null}, the future should be completed successfully, otherwise it specifies the class of cause - * throwable. + * @param cause The class of cause throwable. * @return matcher. */ public static CompletableFutureMatcher<Object> willFailFast(Class<? extends Throwable> cause) { @@ -161,18 +161,39 @@ public class CompletableFutureMatcher<T> extends TypeSafeMatcher<CompletableFutu } /** - * Creates a matcher that matches a future that completes successfully with any result within the given timeout. + * Creates a matcher that matches a future that completes exceptionally within the given timeout. * * @param time Timeout. * @param timeUnit Time unit for timeout. - * @param cause If {@code null}, the future should be completed successfully, otherwise it specifies the class of cause - * throwable. + * @param cause The class of cause throwable. * @return matcher. */ public static CompletableFutureMatcher<Object> willFailIn(int time, TimeUnit timeUnit, Class<? extends Throwable> cause) { + assert cause != null; + return new CompletableFutureMatcher<>(anything(), time, timeUnit, cause); } + /** + * Creates a matcher that matches a future that will be cancelled and decently fast. + * + * @return matcher. + */ + public static CompletableFutureMatcher<Object> willBeCancelledFast() { + return willBeCancelledIn(1, TimeUnit.SECONDS); + } + + /** + * Creates a matcher that matches a future that will be cancelled within the given timeout. + * + * @param time Timeout. + * @param timeUnit Time unit for timeout. + * @return matcher. + */ + public static CompletableFutureMatcher<Object> willBeCancelledIn(int time, TimeUnit timeUnit) { + return new CompletableFutureMatcher<>(anything(), time, timeUnit, CancellationException.class); + } + /** * A shorter version of {@link #willBe} to be used with some matchers for aesthetic reasons. */ diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index ba2481725b..ecf3406336 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -21,6 +21,8 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBeCancelledFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast; import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -41,14 +43,13 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.Entry; -import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; import org.apache.ignite.internal.metastorage.dsl.Conditions; import org.apache.ignite.internal.metastorage.dsl.Operations; +import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.raft.Loza; -import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -78,9 +79,11 @@ public class ItMetaStorageManagerImplTest { private ClusterService clusterService; - private RaftManager raftManager; + private Loza raftManager; - private MetaStorageManager metaStorageManager; + private KeyValueStorage storage; + + private MetaStorageManagerImpl metaStorageManager; @BeforeEach void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration RaftConfiguration raftConfiguration) @@ -98,7 +101,7 @@ public class ItMetaStorageManagerImplTest { when(cmgManager.metaStorageNodes()) .thenReturn(completedFuture(Set.of(clusterService.localConfiguration().getName()))); - var storage = new RocksDbKeyValueStorage(clusterService.localConfiguration().getName(), workDir.resolve("metastorage")); + storage = new RocksDbKeyValueStorage(clusterService.localConfiguration().getName(), workDir.resolve("metastorage")); metaStorageManager = new MetaStorageManagerImpl( vaultManager, @@ -236,4 +239,43 @@ public class ItMetaStorageManagerImplTest { public void onError(Throwable e) {} }; } + + @Test + void testMetaStorageStopClosesRaftService() throws Exception { + MetaStorageServiceImpl svc = metaStorageManager.metaStorageServiceFuture().join(); + + metaStorageManager.stop(); + + CompletableFuture<Entry> fut = svc.get(ByteArray.fromString("ignored")); + + assertThat(fut, willFailFast(NodeStoppingException.class)); + } + + @Test + void testMetaStorageStopBeforeRaftServiceStarted() throws Exception { + metaStorageManager.stop(); // Close MetaStorage that is created in setUp. + + ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class); + + Set<String> msNodes = Set.of(clusterService.localConfiguration().getName()); + CompletableFuture<Set<String>> cmgFut = new CompletableFuture<>(); + + when(cmgManager.metaStorageNodes()).thenReturn(cmgFut); + + metaStorageManager = new MetaStorageManagerImpl( + vaultManager, + clusterService, + cmgManager, + raftManager, + storage + ); + + metaStorageManager.stop(); + + // Unblock the future so raft service can be initialized. Although the future should be cancelled already by the + // stop method. + cmgFut.complete(msNodes); + + assertThat(metaStorageManager.metaStorageServiceFuture(), willBeCancelledFast()); + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index eec4461432..d40aacaf17 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -68,6 +68,7 @@ import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.TopologyEventHandler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * MetaStorage manager. @@ -320,10 +321,25 @@ public class MetaStorageManagerImpl implements MetaStorageManager { IgniteUtils.closeAll( () -> raftMgr.stopRaftNodes(MetastorageGroupId.INSTANCE), + this::closeMetaStorageService, storage::close ); } + private void closeMetaStorageService() { + if (metaStorageSvcFut.isCancelled() || metaStorageSvcFut.isCompletedExceptionally()) { + return; + } + + assert metaStorageSvcFut.isDone(); + + MetaStorageServiceImpl metaStorageService = metaStorageSvcFut.join(); + + assert metaStorageService != null; + + metaStorageService.close(); + } + @Override public long appliedRevision() { return appliedRevision; @@ -791,4 +807,9 @@ public class MetaStorageManagerImpl implements MetaStorageManager { } } } + + @TestOnly + CompletableFuture<MetaStorageServiceImpl> metaStorageServiceFuture() { + return metaStorageSvcFut; + } } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java index b774971013..b1f7f7a7f1 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.dsl.Condition; import org.apache.ignite.internal.metastorage.dsl.If; @@ -35,7 +36,7 @@ import org.jetbrains.annotations.Nullable; /** * Defines interface for access to a meta storage service. */ -public interface MetaStorageService { +public interface MetaStorageService extends ManuallyCloseable { /** * Retrieves an entry for the given key. * @@ -309,4 +310,7 @@ public interface MetaStorageService { * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. */ CompletableFuture<Void> closeCursors(String nodeId); + + @Override + void close(); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index a1bbe42ca1..1d64da7871 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -314,6 +314,11 @@ public class MetaStorageServiceImpl implements MetaStorageService { return metaStorageRaftGrpSvc.run(commandsFactory.cursorsCloseCommand().nodeId(nodeId).build()); } + @Override + public void close() { + metaStorageRaftGrpSvc.shutdown(); + } + private static List<OperationInfo> toOperationInfos(Collection<Operation> ops, MetaStorageCommandsFactory commandsFactory) { List<OperationInfo> res = new ArrayList<>(ops.size()); diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java index 5f41d131c1..af07da5175 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java @@ -58,7 +58,9 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NetworkMessage; @@ -108,6 +110,9 @@ public class RaftGroupServiceImpl implements RaftGroupService { /** Executor for scheduling retries of {@link RaftGroupServiceImpl#sendWithRetry} invocations. */ private final ScheduledExecutorService executor; + /** Busy lock. */ + private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + /** * Constructor. * @@ -477,7 +482,7 @@ public class RaftGroupServiceImpl implements RaftGroupService { @Override public void shutdown() { - // No-op. + busyLock.block(); } @Override @@ -507,38 +512,48 @@ public class RaftGroupServiceImpl implements RaftGroupService { private <R extends NetworkMessage> void sendWithRetry( Peer peer, Function<Peer, ? extends NetworkMessage> requestFactory, long stopTime, CompletableFuture<R> fut ) { - if (currentTimeMillis() >= stopTime) { - fut.completeExceptionally(new TimeoutException()); + if (!busyLock.enterBusy()) { + fut.completeExceptionally(new NodeStoppingException()); return; } - NetworkMessage request = requestFactory.apply(peer); - - //TODO: IGNITE-15389 org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock inside - resolvePeer(peer) - .thenCompose(node -> cluster.messagingService().invoke(node, request, rpcTimeout)) - .whenCompleteAsync((resp, err) -> { - if (LOG.isTraceEnabled()) { - LOG.trace("sendWithRetry resp={} from={} to={} err={}", - S.toString(resp), - cluster.topologyService().localMember().address(), - peer.consistentId(), - err == null ? null : err.getMessage()); - } + try { + if (currentTimeMillis() >= stopTime) { + fut.completeExceptionally(new TimeoutException()); - if (err != null) { - handleThrowable(err, peer, request, requestFactory, stopTime, fut); - } else if (resp instanceof ErrorResponse) { - handleErrorResponse((ErrorResponse) resp, peer, request, requestFactory, stopTime, fut); - } else if (resp instanceof SMErrorResponse) { - handleSmErrorResponse((SMErrorResponse) resp, fut); - } else { - leader = peer; // The OK response was received from a leader. + return; + } - fut.complete((R) resp); - } - }); + NetworkMessage request = requestFactory.apply(peer); + + //TODO: IGNITE-15389 org.apache.ignite.internal.metastorage.client.CursorImpl has potential deadlock inside + resolvePeer(peer) + .thenCompose(node -> cluster.messagingService().invoke(node, request, rpcTimeout)) + .whenCompleteAsync((resp, err) -> { + if (LOG.isTraceEnabled()) { + LOG.trace("sendWithRetry resp={} from={} to={} err={}", + S.toString(resp), + cluster.topologyService().localMember().address(), + peer.consistentId(), + err == null ? null : err.getMessage()); + } + + if (err != null) { + handleThrowable(err, peer, request, requestFactory, stopTime, fut); + } else if (resp instanceof ErrorResponse) { + handleErrorResponse((ErrorResponse) resp, peer, request, requestFactory, stopTime, fut); + } else if (resp instanceof SMErrorResponse) { + handleSmErrorResponse((SMErrorResponse) resp, fut); + } else { + leader = peer; // The OK response was received from a leader. + + fut.complete((R) resp); + } + }); + } finally { + busyLock.leaveBusy(); + } } private void handleThrowable( @@ -550,7 +565,7 @@ public class RaftGroupServiceImpl implements RaftGroupService { CompletableFuture<? extends NetworkMessage> fut ) { if (recoverable(err)) { - LOG.warn( + LOG.trace( "Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ", err, sentRequest.getClass().getSimpleName() ); @@ -679,6 +694,14 @@ public class RaftGroupServiceImpl implements RaftGroupService { int lastPeerIndex = excludedPeer == null ? -1 : peers0.indexOf(excludedPeer); + if (peers0.size() == 1) { + if (lastPeerIndex != -1) { + return excludedPeer; + } else { + return peers0.get(0); + } + } + ThreadLocalRandom random = current(); int newIdx = 0;
