This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-19777 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit e8dfda7d059da0aecd4e8b2ee8dcacc3e2f1c0dc Author: Semyon Danilov <[email protected]> AuthorDate: Thu Jun 22 18:29:34 2023 +0400 IGNITE-19777 Initial --- .../internal/metastorage/MetaStorageManager.java | 2 + ...MetaStorageSafeTimePropagationAbstractTest.java | 2 +- .../command/GetCurrentRevisionCommand.java | 26 ++++ .../command/MetastorageCommandsMessageGroup.java | 8 +- .../metastorage/impl/MetaStorageManagerImpl.java | 71 ++++++++- .../metastorage/impl/MetaStorageService.java | 2 + .../metastorage/impl/MetaStorageServiceImpl.java | 8 + .../metastorage/server/KeyValueStorage.java | 3 +- .../server/persistence/RocksDbKeyValueStorage.java | 45 +++--- .../server/raft/MetaStorageListener.java | 5 + .../MetaStorageDeployWatchesCorrectnessTest.java | 4 +- .../impl/MetaStorageManagerRecoveryTest.java | 163 +++++++++++++++++++++ .../server/BasicOperationsKeyValueStorageTest.java | 6 +- .../server/RocksDbKeyValueStorageTest.java | 2 +- .../server/SimpleInMemoryKeyValueStorage.java | 15 +- .../runner/app/ItIgniteNodeRestartTest.java | 70 ++++++++- .../org/apache/ignite/internal/app/IgniteImpl.java | 16 +- .../recovery/ConfigurationCatchUpListener.java | 1 + 18 files changed, 407 insertions(+), 42 deletions(-) diff --git a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index 5d1d411f70..2cf4470e56 100644 --- a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -181,4 +181,6 @@ public interface MetaStorageManager extends IgniteComponent { * @return Cluster time. */ ClusterTime clusterTime(); + + CompletableFuture<Void> ready(); } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java index 7192723bb6..c73c6ac301 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java @@ -48,7 +48,7 @@ public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends Abstr public void setUp() { super.setUp(); - storage.startWatches((e, t) -> { + storage.startWatches(0, (e, t) -> { time.updateSafeTime(t); return CompletableFuture.completedFuture(null); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java new file mode 100644 index 0000000000..14f72a5c8e --- /dev/null +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.command; + +import org.apache.ignite.internal.raft.ReadCommand; +import org.apache.ignite.network.annotations.Transferable; + +/** Get command for MetaStorageCommandListener that retrieves current revision. */ +@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION) +public interface GetCurrentRevisionCommand extends ReadCommand { +} diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java index e34c8acafd..b2915865df 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java @@ -50,6 +50,9 @@ public interface MetastorageCommandsMessageGroup { /** Message type for {@link GetAndRemoveAllCommand}. */ short GET_AND_REMOVE_ALL = 32; + /** Message type for {@link GetCurrentRevisionCommand}. */ + short GET_CURRENT_REVISION = 33; + /** Message type for {@link PutCommand}. */ short PUT = 40; @@ -68,9 +71,6 @@ public interface MetastorageCommandsMessageGroup { /** Message type for {@link GetPrefixCommand}. */ short GET_PREFIX = 61; - /** Message type for {@link HybridTimestampMessage}. */ - short HYBRID_TS = 70; - /** Message type for {@link SyncTimeCommand}. */ - short SYNC_TIME = 71; + short SYNC_TIME = 70; } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 082249a8fa..bf3f0c8e0e 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -29,11 +29,14 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.hlc.HybridClock; @@ -113,6 +116,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager { /** Prevents double stopping of the component. */ private final AtomicBoolean isStopped = new AtomicBoolean(); + private final CompletableFuture<Void> readyFuture = new CompletableFuture<>(); + private final ClusterTimeImpl clusterTime; private volatile long appliedRevision; @@ -146,6 +151,58 @@ public class MetaStorageManagerImpl implements MetaStorageManager { this.clusterTime = new ClusterTimeImpl(busyLock, clock); } + private CompletableFuture<MetaStorageServiceImpl> recover(MetaStorageServiceImpl service) { + if (!busyLock.enterBusy()) { + return CompletableFuture.failedFuture(new NodeStoppingException()); + } + + try { + CompletableFuture<MetaStorageServiceImpl> res = new CompletableFuture<>(); + + ExecutorService recoveryExecutor = Executors.newSingleThreadExecutor(); + + service.currentRevisionAndTime().whenCompleteAsync((revision, throwable) -> { + if (throwable != null) { + res.completeExceptionally(throwable); + + return; + } + + assert revision != null; + + while (storage.revision() < revision) { + if (!busyLock.enterBusy()) { + res.completeExceptionally(new NodeStoppingException()); + + return; + } + + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + // Ignore. If we are being shut down, then busy lock will stop us. + } finally { + busyLock.leaveBusy(); + } + } + + startRevision.set(revision); + + res.complete(service); + }, recoveryExecutor); + + return res.whenComplete((s, t) -> { + recoveryExecutor.shutdown(); + }); + } finally { + busyLock.leaveBusy(); + } + } + + /** + * TODO: Restore state and remove everything that is not in the state. + */ + private CompletableFuture<MetaStorageServiceImpl> initializeMetaStorage(Set<String> metaStorageNodes) { String thisNodeName = clusterService.nodeName(); @@ -197,6 +254,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager { return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime)); } + private final AtomicLong startRevision = new AtomicLong(); + @Override public void start() { storage.start(); @@ -215,11 +274,14 @@ public class MetaStorageManagerImpl implements MetaStorageManager { busyLock.leaveBusy(); } }) + .thenCompose(this::recover) .whenComplete((service, e) -> { if (e != null) { metaStorageSvcFut.completeExceptionally(e); + readyFuture.completeExceptionally(e); } else { metaStorageSvcFut.complete(service); + readyFuture.complete(null); } }); } @@ -285,8 +347,10 @@ public class MetaStorageManagerImpl implements MetaStorageManager { try { return metaStorageSvcFut.thenRun(() -> inBusyLock(busyLock, () -> { + long revision = startRevision.get(); + // Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault. - storage.startWatches(this::onRevisionApplied); + storage.startWatches(revision + 1, this::onRevisionApplied); })); } finally { busyLock.leaveBusy(); @@ -641,6 +705,11 @@ public class MetaStorageManagerImpl implements MetaStorageManager { return clusterTime; } + @Override + public CompletableFuture<Void> ready() { + return readyFuture; + } + @TestOnly CompletableFuture<MetaStorageServiceImpl> metaStorageServiceFuture() { return metaStorageSvcFut; diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java index 0ec2879fba..861cd150a0 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java @@ -301,4 +301,6 @@ public interface MetaStorageService extends ManuallyCloseable { * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. */ CompletableFuture<Void> compact(); + + CompletableFuture<Long> currentRevisionAndTime(); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index d0fc3ff04f..55b4dc67b6 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.metastorage.command.GetAndPutCommand; import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand; import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand; import org.apache.ignite.internal.metastorage.command.GetCommand; +import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand; import org.apache.ignite.internal.metastorage.command.InvokeCommand; import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand; @@ -291,6 +292,13 @@ public class MetaStorageServiceImpl implements MetaStorageService { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture<Long> currentRevisionAndTime() { + GetCurrentRevisionCommand cmd = context.commandsFactory().getCurrentRevisionCommand().build(); + + return context.raftService().run(cmd); + } + @Override public void close() { context.close(); diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index 3da54cb036..f2ccd2eeaa 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -232,10 +232,11 @@ public interface KeyValueStorage extends ManuallyCloseable { * * <p>Before calling this method, watches will not receive any updates. * + * @param startRevision Revision to start processing updates from. * @param revisionCallback Callback that will be invoked after all watches of a particular revision are processed, with the * revision and modified entries (processed by at least one watch) as its argument. */ - void startWatches(OnRevisionAppliedCallback revisionCallback); + void startWatches(long startRevision, OnRevisionAppliedCallback revisionCallback); /** * Unregisters a watch listener. diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java index 15ba8667af..2145a09077 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java @@ -363,11 +363,6 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } finally { rwLock.writeLock().unlock(); } - - // Replay updates if startWatches() has already been called. - if (recoveryStatus.compareAndSet(RecoveryStatus.PENDING, RecoveryStatus.IN_PROGRESS)) { - replayUpdates(currentRevision); - } } @Override @@ -961,7 +956,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } @Override - public void startWatches(OnRevisionAppliedCallback revisionCallback) { + public void startWatches(long startRevision, OnRevisionAppliedCallback revisionCallback) { long currentRevision; rwLock.readLock().lock(); @@ -986,7 +981,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } if (currentRevision != 0) { - replayUpdates(currentRevision); + replayUpdates(startRevision, currentRevision); } } @@ -1039,6 +1034,26 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { return incrementPrefix(key); } + private long timestamp(long revision) { + if (revision == 0) { + return 0; + } + + long ts; + + try { + byte[] tsBytes = revisionToTs.get(longToBytes(revision)); + + assert tsBytes != null; + + ts = bytesToLong(tsBytes); + } catch (RocksDBException e) { + throw new MetaStorageException(OP_EXECUTION_ERR, e); + } + + return ts; + } + /** * Adds a key to a batch marking the value as a tombstone. * @@ -1390,8 +1405,10 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { watchProcessor.notifyWatches(copy.updatedEntries, copy.ts); } - private void replayUpdates(long upperRevision) { - long minWatchRevision = watchProcessor.minWatchRevision().orElse(-1); + private void replayUpdates(long lowerRevision, long upperRevision) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be Math.max, so we start from the revision that + // components restored their state to (lowerRevision). + long minWatchRevision = Math.min(lowerRevision, watchProcessor.minWatchRevision().orElse(-1)); if (minWatchRevision == -1 || minWatchRevision > upperRevision) { // No events to replay, we can start processing more recent events from the event queue. @@ -1453,15 +1470,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage { } private HybridTimestamp timestampByRevision(long revision) { - try { - byte[] tsBytes = revisionToTs.get(longToBytes(revision)); - - assert tsBytes != null; - - return HybridTimestamp.hybridTimestamp(bytesToLong(tsBytes)); - } catch (RocksDBException e) { - throw new MetaStorageException(OP_EXECUTION_ERR, e); - } + return HybridTimestamp.hybridTimestamp(timestamp(revision)); } private void finishReplay() { diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java index afb97bb2d7..ab7db8bfbc 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.metastorage.Entry; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.command.GetAllCommand; import org.apache.ignite.internal.metastorage.command.GetCommand; +import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand; import org.apache.ignite.internal.metastorage.command.GetPrefixCommand; import org.apache.ignite.internal.metastorage.command.GetRangeCommand; import org.apache.ignite.internal.metastorage.command.PaginationCommand; @@ -109,6 +110,10 @@ public class MetaStorageListener implements RaftGroupListener { byte[] keyTo = storage.nextKey(prefixCmd.prefix()); clo.result(handlePaginationCommand(keyFrom, keyTo, prefixCmd)); + } else if (command instanceof GetCurrentRevisionCommand) { + long revision = storage.revision(); + + clo.result(revision); } else { assert false : "Command was not found [cmd=" + command + ']'; } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java index 9f68f87de2..615c289ca6 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java @@ -45,7 +45,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; /** - * Tests that check correctness of an invocation {@link MetaStorageManager#deployWatches()}. + * Tests that check correctness of an invocation {@link MetaStorageManager#deployWatches(long)}. */ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest { /** Vault manager. */ @@ -100,7 +100,7 @@ public class MetaStorageDeployWatchesCorrectnessTest extends IgniteAbstractTest } /** - * Invokes {@link MetaStorageManager#deployWatches()} and checks result. + * Invokes {@link MetaStorageManager#deployWatches(long)} and checks result. * * @param metastore Meta storage. */ diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java new file mode 100644 index 0000000000..b021164e4f --- /dev/null +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.impl; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand; +import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; +import org.apache.ignite.internal.raft.RaftManager; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.vault.VaultManager; +import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.network.MessagingService; +import org.apache.ignite.network.NodeMetadata; +import org.apache.ignite.network.TopologyService; +import org.apache.ignite.network.serialization.MessageSerializationRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Tests MetaStorage manager recovery basics. */ +public class MetaStorageManagerRecoveryTest { + private static final String NODE_NAME = "node"; + + private static final String LEADER_NAME = "ms-leader"; + + private static final long TARGET_REVISION = 10; + + private MetaStorageManagerImpl metaStorageManager; + + private KeyValueStorage kvs; + + private HybridClock clock; + + @BeforeEach + void setUp() throws Exception { + VaultManager vault = new VaultManager(new InMemoryVaultService()); + ClusterService clusterService = cs(); + ClusterManagementGroupManager cmgManager = cmg(); + LogicalTopologyService topologyService = mock(LogicalTopologyService.class); + RaftManager raftManager = raftManager(); + + clock = new HybridClockImpl(); + kvs = spy(new SimpleInMemoryKeyValueStorage(NODE_NAME)); + + metaStorageManager = new MetaStorageManagerImpl( + vault, + clusterService, + cmgManager, + topologyService, + raftManager, + kvs, + clock + ); + } + + private RaftManager raftManager() throws Exception { + RaftManager raft = mock(RaftManager.class); + + RaftGroupService service = mock(RaftGroupService.class); + + when(service.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation -> { + return CompletableFuture.completedFuture(TARGET_REVISION); + }); + + when(raft.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), any(), any(), any())) + .thenAnswer(invocation -> CompletableFuture.completedFuture(service)); + + return raft; + } + + private ClusterService cs() { + return new ClusterService() { + @Override + public String nodeName() { + return "node"; + } + + @Override + public TopologyService topologyService() { + return null; + } + + @Override + public MessagingService messagingService() { + return null; + } + + @Override + public MessageSerializationRegistry serializationRegistry() { + return null; + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void updateMetadata(NodeMetadata metadata) { + + } + + @Override + public void start() { + + } + }; + } + + private ClusterManagementGroupManager cmg() { + ClusterManagementGroupManager mock = mock(ClusterManagementGroupManager.class); + + when(mock.metaStorageNodes()) + .thenAnswer(invocation -> CompletableFuture.completedFuture(Set.of(LEADER_NAME))); + + return mock; + } + + @Test + void test() { + metaStorageManager.start(); + + CompletableFuture<Void> msDeployFut = metaStorageManager.deployWatches(); + + for (int i = 0; i < TARGET_REVISION; i++) { + kvs.put(new byte[0], new byte[0], clock.now()); + } + + assertThat(msDeployFut, willSucceedFast()); + + verify(kvs).startWatches(eq(TARGET_REVISION + 1), any()); + } +} diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java index 029980fc28..0ed93fbf07 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java @@ -1887,7 +1887,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu long appliedRevision = storage.revision(); - storage.startWatches((event, ts) -> completedFuture(null)); + storage.startWatches(0, (event, ts) -> completedFuture(null)); CompletableFuture<byte[]> fut = new CompletableFuture<>(); @@ -2228,7 +2228,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu when(mockCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null)); - storage.startWatches(mockCallback); + storage.startWatches(0, mockCallback); putToMs(key, value); @@ -2423,7 +2423,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu } }); - storage.startWatches((event, ts) -> completedFuture(null)); + storage.startWatches(0, (event, ts) -> completedFuture(null)); return resultFuture; } diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java index d60e46416b..d932fcec39 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java @@ -102,7 +102,7 @@ public class RocksDbKeyValueStorageTest extends BasicOperationsKeyValueStorageTe } }); - storage.startWatches((event, ts) -> CompletableFuture.completedFuture(null)); + storage.startWatches(0, (event, ts) -> CompletableFuture.completedFuture(null)); storage.restoreSnapshot(snapshotPath); diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index 07e9a913d7..297c1f9693 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -32,7 +32,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.OptionalLong; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; @@ -437,24 +436,26 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { } @Override - public void startWatches(OnRevisionAppliedCallback revisionCallback) { + public void startWatches(long startRevision, OnRevisionAppliedCallback revisionCallback) { synchronized (mux) { areWatchesEnabled = true; watchProcessor.setRevisionCallback(revisionCallback); - replayUpdates(); + replayUpdates(startRevision); } } - private void replayUpdates() { - OptionalLong minWatchRevision = watchProcessor.minWatchRevision(); + private void replayUpdates(long startRevision) { + // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be Math.max, so we start from the revision that + // components restored their state to (lowerRevision). + long minWatchRevision = Math.min(startRevision, watchProcessor.minWatchRevision().orElse(-1)); - if (minWatchRevision.isEmpty()) { + if (minWatchRevision <= 0) { return; } - revsIdx.tailMap(minWatchRevision.getAsLong()) + revsIdx.tailMap(minWatchRevision) .forEach((revision, entries) -> { entries.forEach((key, value) -> { var entry = new EntryImpl(key, value.bytes(), revision, value.updateCounter()); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 8bec4299f1..322ab1b5f6 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -44,6 +44,9 @@ import java.util.Objects; import java.util.ServiceLoader; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.IntFunction; import java.util.function.LongFunction; @@ -81,13 +84,18 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; +import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; import org.apache.ignite.internal.network.configuration.NetworkConfiguration; import org.apache.ignite.internal.network.recovery.VaultStateIds; import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaService; @@ -111,10 +119,13 @@ import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.vault.VaultManager; +import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteStringFormatter; import org.apache.ignite.network.NettyBootstrapFactory; import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; +import org.apache.ignite.raft.jraft.RaftGroupService; +import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.Session; @@ -128,6 +139,8 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; /** * These tests check node restart scenarios. @@ -712,11 +725,66 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { checkTableWithData(ignite, TABLE_NAME); } + /** + * Restarts the node which stores some data. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void metastorageRecoveryTest(boolean useSnapshot) throws InterruptedException { + List<IgniteImpl> nodes = startNodes(2); + IgniteImpl main = nodes.get(0); + + createTableWithData(List.of(main), TABLE_NAME, 1); + + stopNode(1); + + MetaStorageManager metaStorageManager = main.metaStorageManager(); + + CompletableFuture[] futs = new CompletableFuture[10]; + + for (int i = 0; i < 10; i++) { + ByteArray key = ByteArray.fromString("some-test-key-" + i); + futs[i] = metaStorageManager.put(key, new byte[]{(byte) 0}); + } + + CompletableFuture.allOf(futs).join(); + + if (useSnapshot) { + // Force snapshot installation + JraftServerImpl server = (JraftServerImpl) main.raftManager().server(); + List<Peer> peers = server.localPeers(MetastorageGroupId.INSTANCE); + + Peer learnerPeer = peers.stream().filter(peer -> peer.idx() == 0).findFirst().orElseThrow( + () -> new IllegalStateException(String.format("No leader peer")) + ); + + var nodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, learnerPeer); + RaftGroupService raftGroupService = server.raftGroupService(nodeId); + + for (int i = 0; i < 2; i++) { + CountDownLatch snapshotLatch = new CountDownLatch(1); + AtomicReference<Status> snapshotStatus = new AtomicReference<>(); + + raftGroupService.getRaftNode().snapshot(status -> { + snapshotStatus.set(status); + snapshotLatch.countDown(); + }); + + assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time"); + assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get()); + } + } + + IgniteImpl second = startNode(1); + + checkTableWithData(second, TABLE_NAME); + } + /** * Restarts the node which stores some data. */ @Test - public void nodeWithDataAndIndexRebuildTest() throws InterruptedException { + public void nodeWithDataAndIndexRebuildTest() { IgniteImpl ignite = startNode(0); int partitions = 20; diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 322252bfd5..b6ab3d7230 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -690,14 +690,24 @@ public class IgniteImpl implements Ignite { LOG.info("Components started, joining the cluster"); return cmgMgr.joinFuture() - // using the default executor to avoid blocking the CMG Manager threads + // Using the default executor to avoid blocking the CMG Manager threads. + .thenComposeAsync(unused -> { + LOG.info("Join complete, starting MetaStorage"); + + try { + lifecycleManager.startComponent(metaStorageMgr); + } catch (NodeStoppingException e) { + throw new CompletionException(e); + } + + return metaStorageMgr.ready(); + }) .thenRunAsync(() -> { - LOG.info("Join complete, starting the remaining components"); + LOG.info("MetaStorage started, starting the remaining components"); // Start all other components after the join request has completed and the node has been validated. try { lifecycleManager.startComponents( - metaStorageMgr, clusterCfgMgr, metricManager, distributionZoneManager, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java index ef5cc3c4f0..d1ff10ad1e 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java @@ -84,6 +84,7 @@ public class ConfigurationCatchUpListener implements ConfigurationStorageRevisio private CompletableFuture<?> checkRevisionUpToDate(long appliedRevision) { return cfgStorage.lastRevision().thenAccept(rev -> { synchronized (targetRevisionUpdateMutex) { + // TODO: actual metastorage revision can be higher than configuration revision assert rev >= appliedRevision : IgniteStringFormatter.format( "Configuration revision must be greater than local node applied revision [msRev={}, appliedRev={}", rev, appliedRevision);
