This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-19777
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit e8dfda7d059da0aecd4e8b2ee8dcacc3e2f1c0dc
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu Jun 22 18:29:34 2023 +0400

    IGNITE-19777 Initial
---
 .../internal/metastorage/MetaStorageManager.java   |   2 +
 ...MetaStorageSafeTimePropagationAbstractTest.java |   2 +-
 .../command/GetCurrentRevisionCommand.java         |  26 ++++
 .../command/MetastorageCommandsMessageGroup.java   |   8 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |  71 ++++++++-
 .../metastorage/impl/MetaStorageService.java       |   2 +
 .../metastorage/impl/MetaStorageServiceImpl.java   |   8 +
 .../metastorage/server/KeyValueStorage.java        |   3 +-
 .../server/persistence/RocksDbKeyValueStorage.java |  45 +++---
 .../server/raft/MetaStorageListener.java           |   5 +
 .../MetaStorageDeployWatchesCorrectnessTest.java   |   4 +-
 .../impl/MetaStorageManagerRecoveryTest.java       | 163 +++++++++++++++++++++
 .../server/BasicOperationsKeyValueStorageTest.java |   6 +-
 .../server/RocksDbKeyValueStorageTest.java         |   2 +-
 .../server/SimpleInMemoryKeyValueStorage.java      |  15 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  70 ++++++++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  16 +-
 .../recovery/ConfigurationCatchUpListener.java     |   1 +
 18 files changed, 407 insertions(+), 42 deletions(-)

diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 5d1d411f70..2cf4470e56 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -181,4 +181,6 @@ public interface MetaStorageManager extends IgniteComponent 
{
      * @return Cluster time.
      */
     ClusterTime clusterTime();
+
+    CompletableFuture<Void> ready();
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index 7192723bb6..c73c6ac301 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -48,7 +48,7 @@ public abstract class 
ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
     public void setUp() {
         super.setUp();
 
-        storage.startWatches((e, t) -> {
+        storage.startWatches(0, (e, t) -> {
             time.updateSafeTime(t);
 
             return CompletableFuture.completedFuture(null);
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
new file mode 100644
index 0000000000..14f72a5c8e
--- /dev/null
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/GetCurrentRevisionCommand.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.command;
+
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.network.annotations.Transferable;
+
+/** Get command for MetaStorageCommandListener that retrieves current 
revision. */
+@Transferable(MetastorageCommandsMessageGroup.GET_CURRENT_REVISION)
+public interface GetCurrentRevisionCommand extends ReadCommand {
+}
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
index e34c8acafd..b2915865df 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/MetastorageCommandsMessageGroup.java
@@ -50,6 +50,9 @@ public interface MetastorageCommandsMessageGroup {
     /** Message type for {@link GetAndRemoveAllCommand}. */
     short GET_AND_REMOVE_ALL = 32;
 
+    /** Message type for {@link GetCurrentRevisionCommand}. */
+    short GET_CURRENT_REVISION = 33;
+
     /** Message type for {@link PutCommand}. */
     short PUT = 40;
 
@@ -68,9 +71,6 @@ public interface MetastorageCommandsMessageGroup {
     /** Message type for {@link GetPrefixCommand}. */
     short GET_PREFIX = 61;
 
-    /** Message type for {@link HybridTimestampMessage}. */
-    short HYBRID_TS = 70;
-
     /** Message type for {@link SyncTimeCommand}. */
-    short SYNC_TIME = 71;
+    short SYNC_TIME = 70;
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 082249a8fa..bf3f0c8e0e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -29,11 +29,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -113,6 +116,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
     /** Prevents double stopping of the component. */
     private final AtomicBoolean isStopped = new AtomicBoolean();
 
+    private final CompletableFuture<Void> readyFuture = new 
CompletableFuture<>();
+
     private final ClusterTimeImpl clusterTime;
 
     private volatile long appliedRevision;
@@ -146,6 +151,58 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         this.clusterTime = new ClusterTimeImpl(busyLock, clock);
     }
 
+    private CompletableFuture<MetaStorageServiceImpl> 
recover(MetaStorageServiceImpl service) {
+        if (!busyLock.enterBusy()) {
+            return CompletableFuture.failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            CompletableFuture<MetaStorageServiceImpl> res = new 
CompletableFuture<>();
+
+            ExecutorService recoveryExecutor = 
Executors.newSingleThreadExecutor();
+
+            service.currentRevisionAndTime().whenCompleteAsync((revision, 
throwable) -> {
+                if (throwable != null) {
+                    res.completeExceptionally(throwable);
+
+                    return;
+                }
+
+                assert revision != null;
+
+                while (storage.revision() < revision) {
+                    if (!busyLock.enterBusy()) {
+                        res.completeExceptionally(new NodeStoppingException());
+
+                        return;
+                    }
+
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException ignored) {
+                        // Ignore. If we are being shut down, then busy lock 
will stop us.
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+
+                startRevision.set(revision);
+
+                res.complete(service);
+            }, recoveryExecutor);
+
+            return res.whenComplete((s, t) -> {
+                recoveryExecutor.shutdown();
+            });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * TODO: Restore state and remove everything that is not in the state.
+     */
+
     private CompletableFuture<MetaStorageServiceImpl> 
initializeMetaStorage(Set<String> metaStorageNodes) {
         String thisNodeName = clusterService.nodeName();
 
@@ -197,6 +254,8 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(thisNodeName, raftService, busyLock, clusterTime));
     }
 
+    private final AtomicLong startRevision = new AtomicLong();
+
     @Override
     public void start() {
         storage.start();
@@ -215,11 +274,14 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
                         busyLock.leaveBusy();
                     }
                 })
+                .thenCompose(this::recover)
                 .whenComplete((service, e) -> {
                     if (e != null) {
                         metaStorageSvcFut.completeExceptionally(e);
+                        readyFuture.completeExceptionally(e);
                     } else {
                         metaStorageSvcFut.complete(service);
+                        readyFuture.complete(null);
                     }
                 });
     }
@@ -285,8 +347,10 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
         try {
             return metaStorageSvcFut.thenRun(() -> inBusyLock(busyLock, () -> {
+                long revision = startRevision.get();
+
                 // Meta Storage contract states that all updated entries under 
a particular revision must be stored in the Vault.
-                storage.startWatches(this::onRevisionApplied);
+                storage.startWatches(revision + 1, this::onRevisionApplied);
             }));
         } finally {
             busyLock.leaveBusy();
@@ -641,6 +705,11 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         return clusterTime;
     }
 
+    @Override
+    public CompletableFuture<Void> ready() {
+        return readyFuture;
+    }
+
     @TestOnly
     CompletableFuture<MetaStorageServiceImpl> metaStorageServiceFuture() {
         return metaStorageSvcFut;
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
index 0ec2879fba..861cd150a0 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java
@@ -301,4 +301,6 @@ public interface MetaStorageService extends 
ManuallyCloseable {
      * @throws OperationTimeoutException If the operation is timed out. Will 
be thrown on getting future result.
      */
     CompletableFuture<Void> compact();
+
+    CompletableFuture<Long> currentRevisionAndTime();
 }
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index d0fc3ff04f..55b4dc67b6 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.metastorage.command.GetAndPutCommand;
 import org.apache.ignite.internal.metastorage.command.GetAndRemoveAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetAndRemoveCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
+import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
 import org.apache.ignite.internal.metastorage.command.InvokeCommand;
 import 
org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
@@ -291,6 +292,13 @@ public class MetaStorageServiceImpl implements 
MetaStorageService {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<Long> currentRevisionAndTime() {
+        GetCurrentRevisionCommand cmd = 
context.commandsFactory().getCurrentRevisionCommand().build();
+
+        return context.raftService().run(cmd);
+    }
+
     @Override
     public void close() {
         context.close();
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 3da54cb036..f2ccd2eeaa 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -232,10 +232,11 @@ public interface KeyValueStorage extends 
ManuallyCloseable {
      *
      * <p>Before calling this method, watches will not receive any updates.
      *
+     * @param startRevision Revision to start processing updates from.
      * @param revisionCallback Callback that will be invoked after all watches 
of a particular revision are processed, with the
      *         revision and modified entries (processed by at least one watch) 
as its argument.
      */
-    void startWatches(OnRevisionAppliedCallback revisionCallback);
+    void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback);
 
     /**
      * Unregisters a watch listener.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 15ba8667af..2145a09077 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -363,11 +363,6 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         } finally {
             rwLock.writeLock().unlock();
         }
-
-        // Replay updates if startWatches() has already been called.
-        if (recoveryStatus.compareAndSet(RecoveryStatus.PENDING, 
RecoveryStatus.IN_PROGRESS)) {
-            replayUpdates(currentRevision);
-        }
     }
 
     @Override
@@ -961,7 +956,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public void startWatches(OnRevisionAppliedCallback revisionCallback) {
+    public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
         long currentRevision;
 
         rwLock.readLock().lock();
@@ -986,7 +981,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         }
 
         if (currentRevision != 0) {
-            replayUpdates(currentRevision);
+            replayUpdates(startRevision, currentRevision);
         }
     }
 
@@ -1039,6 +1034,26 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         return incrementPrefix(key);
     }
 
+    private long timestamp(long revision) {
+        if (revision == 0) {
+            return 0;
+        }
+
+        long ts;
+
+        try {
+            byte[] tsBytes = revisionToTs.get(longToBytes(revision));
+
+            assert tsBytes != null;
+
+            ts = bytesToLong(tsBytes);
+        } catch (RocksDBException e) {
+            throw new MetaStorageException(OP_EXECUTION_ERR, e);
+        }
+
+        return ts;
+    }
+
     /**
      * Adds a key to a batch marking the value as a tombstone.
      *
@@ -1390,8 +1405,10 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
         watchProcessor.notifyWatches(copy.updatedEntries, copy.ts);
     }
 
-    private void replayUpdates(long upperRevision) {
-        long minWatchRevision = watchProcessor.minWatchRevision().orElse(-1);
+    private void replayUpdates(long lowerRevision, long upperRevision) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be 
Math.max, so we start from the revision that
+        // components restored their state to (lowerRevision).
+        long minWatchRevision = Math.min(lowerRevision, 
watchProcessor.minWatchRevision().orElse(-1));
 
         if (minWatchRevision == -1 || minWatchRevision > upperRevision) {
             // No events to replay, we can start processing more recent events 
from the event queue.
@@ -1453,15 +1470,7 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     private HybridTimestamp timestampByRevision(long revision) {
-        try {
-            byte[] tsBytes = revisionToTs.get(longToBytes(revision));
-
-            assert tsBytes != null;
-
-            return HybridTimestamp.hybridTimestamp(bytesToLong(tsBytes));
-        } catch (RocksDBException e) {
-            throw new MetaStorageException(OP_EXECUTION_ERR, e);
-        }
+        return HybridTimestamp.hybridTimestamp(timestamp(revision));
     }
 
     private void finishReplay() {
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index afb97bb2d7..ab7db8bfbc 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.command.GetAllCommand;
 import org.apache.ignite.internal.metastorage.command.GetCommand;
+import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
 import org.apache.ignite.internal.metastorage.command.GetPrefixCommand;
 import org.apache.ignite.internal.metastorage.command.GetRangeCommand;
 import org.apache.ignite.internal.metastorage.command.PaginationCommand;
@@ -109,6 +110,10 @@ public class MetaStorageListener implements 
RaftGroupListener {
                     byte[] keyTo = storage.nextKey(prefixCmd.prefix());
 
                     clo.result(handlePaginationCommand(keyFrom, keyTo, 
prefixCmd));
+                } else if (command instanceof GetCurrentRevisionCommand) {
+                    long revision = storage.revision();
+
+                    clo.result(revision);
                 } else {
                     assert false : "Command was not found [cmd=" + command + 
']';
                 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
index 9f68f87de2..615c289ca6 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java
@@ -45,7 +45,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /**
- * Tests that check correctness of an invocation {@link 
MetaStorageManager#deployWatches()}.
+ * Tests that check correctness of an invocation {@link 
MetaStorageManager#deployWatches(long)}.
  */
 public class MetaStorageDeployWatchesCorrectnessTest extends 
IgniteAbstractTest {
     /** Vault manager. */
@@ -100,7 +100,7 @@ public class MetaStorageDeployWatchesCorrectnessTest 
extends IgniteAbstractTest
     }
 
     /**
-     * Invokes {@link MetaStorageManager#deployWatches()} and checks result.
+     * Invokes {@link MetaStorageManager#deployWatches(long)} and checks 
result.
      *
      * @param metastore Meta storage.
      */
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
new file mode 100644
index 0000000000..b021164e4f
--- /dev/null
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import 
org.apache.ignite.internal.metastorage.command.GetCurrentRevisionCommand;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NodeMetadata;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests MetaStorage manager recovery basics. */
+public class MetaStorageManagerRecoveryTest {
+    private static final String NODE_NAME = "node";
+
+    private static final String LEADER_NAME = "ms-leader";
+
+    private static final long TARGET_REVISION = 10;
+
+    private MetaStorageManagerImpl metaStorageManager;
+
+    private KeyValueStorage kvs;
+
+    private HybridClock clock;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        VaultManager vault = new VaultManager(new InMemoryVaultService());
+        ClusterService clusterService = cs();
+        ClusterManagementGroupManager cmgManager = cmg();
+        LogicalTopologyService topologyService = 
mock(LogicalTopologyService.class);
+        RaftManager raftManager = raftManager();
+
+        clock = new HybridClockImpl();
+        kvs = spy(new SimpleInMemoryKeyValueStorage(NODE_NAME));
+
+        metaStorageManager = new MetaStorageManagerImpl(
+                vault,
+                clusterService,
+                cmgManager,
+                topologyService,
+                raftManager,
+                kvs,
+                clock
+        );
+    }
+
+    private RaftManager raftManager() throws Exception {
+        RaftManager raft = mock(RaftManager.class);
+
+        RaftGroupService service = mock(RaftGroupService.class);
+
+        
when(service.run(any(GetCurrentRevisionCommand.class))).thenAnswer(invocation 
-> {
+            return CompletableFuture.completedFuture(TARGET_REVISION);
+        });
+
+        when(raft.startRaftGroupNodeAndWaitNodeReadyFuture(any(), any(), 
any(), any(), any()))
+                .thenAnswer(invocation -> 
CompletableFuture.completedFuture(service));
+
+        return raft;
+    }
+
+    private ClusterService cs() {
+        return new ClusterService() {
+            @Override
+            public String nodeName() {
+                return "node";
+            }
+
+            @Override
+            public TopologyService topologyService() {
+                return null;
+            }
+
+            @Override
+            public MessagingService messagingService() {
+                return null;
+            }
+
+            @Override
+            public MessageSerializationRegistry serializationRegistry() {
+                return null;
+            }
+
+            @Override
+            public boolean isStopped() {
+                return false;
+            }
+
+            @Override
+            public void updateMetadata(NodeMetadata metadata) {
+
+            }
+
+            @Override
+            public void start() {
+
+            }
+        };
+    }
+
+    private ClusterManagementGroupManager cmg() {
+        ClusterManagementGroupManager mock = 
mock(ClusterManagementGroupManager.class);
+
+        when(mock.metaStorageNodes())
+                .thenAnswer(invocation -> 
CompletableFuture.completedFuture(Set.of(LEADER_NAME)));
+
+        return mock;
+    }
+
+    @Test
+    void test() {
+        metaStorageManager.start();
+
+        CompletableFuture<Void> msDeployFut = 
metaStorageManager.deployWatches();
+
+        for (int i = 0; i < TARGET_REVISION; i++) {
+            kvs.put(new byte[0], new byte[0], clock.now());
+        }
+
+        assertThat(msDeployFut, willSucceedFast());
+
+        verify(kvs).startWatches(eq(TARGET_REVISION + 1), any());
+    }
+}
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 029980fc28..0ed93fbf07 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1887,7 +1887,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         long appliedRevision = storage.revision();
 
-        storage.startWatches((event, ts) -> completedFuture(null));
+        storage.startWatches(0, (event, ts) -> completedFuture(null));
 
         CompletableFuture<byte[]> fut = new CompletableFuture<>();
 
@@ -2228,7 +2228,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
 
         when(mockCallback.onRevisionApplied(any(), 
any())).thenReturn(completedFuture(null));
 
-        storage.startWatches(mockCallback);
+        storage.startWatches(0, mockCallback);
 
         putToMs(key, value);
 
@@ -2423,7 +2423,7 @@ public abstract class BasicOperationsKeyValueStorageTest 
extends AbstractKeyValu
             }
         });
 
-        storage.startWatches((event, ts) -> completedFuture(null));
+        storage.startWatches(0, (event, ts) -> completedFuture(null));
 
         return resultFuture;
     }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index d60e46416b..d932fcec39 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -102,7 +102,7 @@ public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTe
             }
         });
 
-        storage.startWatches((event, ts) -> 
CompletableFuture.completedFuture(null));
+        storage.startWatches(0, (event, ts) -> 
CompletableFuture.completedFuture(null));
 
         storage.restoreSnapshot(snapshotPath);
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 07e9a913d7..297c1f9693 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -32,7 +32,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.OptionalLong;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -437,24 +436,26 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     @Override
-    public void startWatches(OnRevisionAppliedCallback revisionCallback) {
+    public void startWatches(long startRevision, OnRevisionAppliedCallback 
revisionCallback) {
         synchronized (mux) {
             areWatchesEnabled = true;
 
             watchProcessor.setRevisionCallback(revisionCallback);
 
-            replayUpdates();
+            replayUpdates(startRevision);
         }
     }
 
-    private void replayUpdates() {
-        OptionalLong minWatchRevision = watchProcessor.minWatchRevision();
+    private void replayUpdates(long startRevision) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19778 Should be 
Math.max, so we start from the revision that
+        // components restored their state to (lowerRevision).
+        long minWatchRevision = Math.min(startRevision, 
watchProcessor.minWatchRevision().orElse(-1));
 
-        if (minWatchRevision.isEmpty()) {
+        if (minWatchRevision <= 0) {
             return;
         }
 
-        revsIdx.tailMap(minWatchRevision.getAsLong())
+        revsIdx.tailMap(minWatchRevision)
                 .forEach((revision, entries) -> {
                     entries.forEach((key, value) -> {
                         var entry = new EntryImpl(key, value.bytes(), 
revision, value.updateCounter());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 8bec4299f1..322ab1b5f6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -44,6 +44,9 @@ import java.util.Objects;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.IntFunction;
 import java.util.function.LongFunction;
@@ -81,13 +84,18 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.index.IndexManager;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.recovery.VaultStateIds;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -111,10 +119,13 @@ import 
org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
@@ -128,6 +139,8 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * These tests check node restart scenarios.
@@ -712,11 +725,66 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         checkTableWithData(ignite, TABLE_NAME);
     }
 
+    /**
+     * Restarts the node which stores some data.
+     */
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void metastorageRecoveryTest(boolean useSnapshot) throws 
InterruptedException {
+        List<IgniteImpl> nodes = startNodes(2);
+        IgniteImpl main = nodes.get(0);
+
+        createTableWithData(List.of(main), TABLE_NAME, 1);
+
+        stopNode(1);
+
+        MetaStorageManager metaStorageManager = main.metaStorageManager();
+
+        CompletableFuture[] futs = new CompletableFuture[10];
+
+        for (int i = 0; i < 10; i++) {
+            ByteArray key = ByteArray.fromString("some-test-key-" + i);
+            futs[i] = metaStorageManager.put(key, new byte[]{(byte) 0});
+        }
+
+        CompletableFuture.allOf(futs).join();
+
+        if (useSnapshot) {
+            // Force snapshot installation
+            JraftServerImpl server = (JraftServerImpl) 
main.raftManager().server();
+            List<Peer> peers = server.localPeers(MetastorageGroupId.INSTANCE);
+
+            Peer learnerPeer = peers.stream().filter(peer -> peer.idx() == 
0).findFirst().orElseThrow(
+                    () -> new IllegalStateException(String.format("No leader 
peer"))
+            );
+
+            var nodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, 
learnerPeer);
+            RaftGroupService raftGroupService = 
server.raftGroupService(nodeId);
+
+            for (int i = 0; i < 2; i++) {
+                CountDownLatch snapshotLatch = new CountDownLatch(1);
+                AtomicReference<Status> snapshotStatus = new 
AtomicReference<>();
+
+                raftGroupService.getRaftNode().snapshot(status -> {
+                    snapshotStatus.set(status);
+                    snapshotLatch.countDown();
+                });
+
+                assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), 
"Snapshot was not finished in time");
+                assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + 
snapshotStatus.get());
+            }
+        }
+
+        IgniteImpl second = startNode(1);
+
+        checkTableWithData(second, TABLE_NAME);
+    }
+
     /**
      * Restarts the node which stores some data.
      */
     @Test
-    public void nodeWithDataAndIndexRebuildTest() throws InterruptedException {
+    public void nodeWithDataAndIndexRebuildTest() {
         IgniteImpl ignite = startNode(0);
 
         int partitions = 20;
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 322252bfd5..b6ab3d7230 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -690,14 +690,24 @@ public class IgniteImpl implements Ignite {
             LOG.info("Components started, joining the cluster");
 
             return cmgMgr.joinFuture()
-                    // using the default executor to avoid blocking the CMG 
Manager threads
+                    // Using the default executor to avoid blocking the CMG 
Manager threads.
+                    .thenComposeAsync(unused -> {
+                        LOG.info("Join complete, starting MetaStorage");
+
+                        try {
+                            lifecycleManager.startComponent(metaStorageMgr);
+                        } catch (NodeStoppingException e) {
+                            throw new CompletionException(e);
+                        }
+
+                        return metaStorageMgr.ready();
+                    })
                     .thenRunAsync(() -> {
-                        LOG.info("Join complete, starting the remaining 
components");
+                        LOG.info("MetaStorage started, starting the remaining 
components");
 
                         // Start all other components after the join request 
has completed and the node has been validated.
                         try {
                             lifecycleManager.startComponents(
-                                    metaStorageMgr,
                                     clusterCfgMgr,
                                     metricManager,
                                     distributionZoneManager,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
index ef5cc3c4f0..d1ff10ad1e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/recovery/ConfigurationCatchUpListener.java
@@ -84,6 +84,7 @@ public class ConfigurationCatchUpListener implements 
ConfigurationStorageRevisio
     private CompletableFuture<?> checkRevisionUpToDate(long appliedRevision) {
         return cfgStorage.lastRevision().thenAccept(rev -> {
             synchronized (targetRevisionUpdateMutex) {
+                // TODO: actual metastorage  revision can be higher than 
configuration revision
                 assert rev >= appliedRevision : IgniteStringFormatter.format(
                     "Configuration revision must be greater than local node 
applied revision [msRev={}, appliedRev={}",
                     rev, appliedRevision);


Reply via email to