This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch ignite-22900
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 5ef2b4f0371832c1f1472d077e006203bbcbbb35
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Sep 6 18:13:10 2024 +0400

    IGNITE-22900 Implement ability to get MG index+term from node
---
 .../metastorage/impl/MetaStorageManagerImpl.java   | 55 +++++++++++---
 .../ignite/internal/raft/IndexWithTerm.java}       | 31 +++++---
 .../apache/ignite/internal/raft/RaftManager.java   | 10 +++
 .../java/org/apache/ignite/internal/raft/Loza.java | 17 +++++
 .../ignite/internal/raft/server/RaftServer.java    | 10 +++
 .../internal/raft/server/impl/JraftServerImpl.java | 15 ++++
 .../java/org/apache/ignite/raft/jraft/Node.java    |  8 +-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 11 +++
 .../org/apache/ignite/internal/app/IgniteImpl.java | 19 ++---
 ...ava => MetastorageIndexTermRequestMessage.java} | 13 ++--
 ...va => MetastorageIndexTermResponseMessage.java} | 18 +++--
 .../SystemDisasterRecoveryMessageGroup.java        | 10 +++
 modules/system-disaster-recovery/build.gradle      |  1 +
 .../system/ItMetastorageIndexGettingTest.java      | 87 ++++++++++++++++++++++
 .../internal/disaster/system/ClusterIdService.java |  2 +
 .../system/SystemDisasterRecoveryManagerImpl.java  | 24 +++++-
 .../SystemDisasterRecoveryManagerImplTest.java     | 24 +++++-
 17 files changed, 309 insertions(+), 46 deletions(-)

diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index f1db841f2d..00eeaeac47 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
@@ -58,6 +59,7 @@ import 
org.apache.ignite.internal.metastorage.server.time.ClusterTime;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -140,6 +142,9 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
     private final RaftGroupOptionsConfigurer raftGroupOptionsConfigurer;
 
+    /** Gets completed when raft service is started. */
+    private final CompletableFuture<Void> raftNodeStarted = new 
CompletableFuture<>();
+
     /**
      * The constructor.
      *
@@ -290,17 +295,20 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
 
             var disruptorConfig = new 
RaftNodeDisruptorConfiguration("metastorage", 1);
 
-            CompletableFuture<? extends RaftGroupService> raftServiceFuture = 
metaStorageNodes.contains(thisNodeName)
+            CompletableFuture<? extends RaftGroupService> 
raftNodeStartedFuture = metaStorageNodes.contains(thisNodeName)
                     ? startFollowerNode(metaStorageNodes, disruptorConfig)
                     : startLearnerNode(metaStorageNodes, disruptorConfig);
 
-            return raftServiceFuture.thenApply(raftService -> new 
MetaStorageServiceImpl(
-                    thisNodeName,
-                    raftService,
-                    busyLock,
-                    clusterTime,
-                    () -> clusterService.topologyService().localMember().id())
-            );
+            raftNodeStarted.complete(null);
+
+            return raftNodeStartedFuture
+                    .thenApply(raftService -> new MetaStorageServiceImpl(
+                            thisNodeName,
+                            raftService,
+                            busyLock,
+                            clusterTime,
+                            () -> 
clusterService.topologyService().localMember().id())
+                    );
         } catch (NodeStoppingException e) {
             return failedFuture(e);
         }
@@ -325,7 +333,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         followerListener = new MetaStorageListener(storage, clusterTime);
 
         CompletableFuture<TopologyAwareRaftGroupService> raftServiceFuture = 
raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                raftNodeId(localPeer),
                 configuration,
                 followerListener,
                 RaftGroupEventsListener.noopLsnr,
@@ -370,7 +378,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         learnerListener = new MetaStorageListener(storage, clusterTime);
 
         return raftMgr.startRaftGroupNodeAndWaitNodeReadyFuture(
-                new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
+                raftNodeId(localPeer),
                 configuration,
                 learnerListener,
                 RaftGroupEventsListener.noopLsnr,
@@ -379,6 +387,10 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         );
     }
 
+    private static RaftNodeId raftNodeId(Peer localPeer) {
+        return new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer);
+    }
+
     /**
      * Sets the Meta Storage configuration.
      *
@@ -837,6 +849,29 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
         return recoveryFinishedFuture;
     }
 
+    /**
+     * Returns a future that will be completed with information about index 
and term of the Metastorage Raft group.
+     *
+     * <p>This method is special in the following regard: it can be called 
before the component gets started. The returned
+     * future will be completed after the component start.
+     */
+    public CompletableFuture<IndexWithTerm> raftNodeIndex() {
+        return raftNodeStarted.thenCompose(unused -> inBusyLock(busyLock, () 
-> {
+            RaftNodeId nodeId = raftNodeId(new 
Peer(clusterService.nodeName()));
+
+            IndexWithTerm indexWithTerm;
+            try {
+                indexWithTerm = raftMgr.raftNodeIndex(nodeId, 
raftGroupOptionsConfigurer);
+            } catch (NodeStoppingException e) {
+                return failedFuture(e);
+            }
+
+            assert indexWithTerm != null : "Attempt to get index and term when 
Raft node is not started yet or already stopped)";
+
+            return completedFuture(indexWithTerm);
+        }));
+    }
+
     @TestOnly
     public CompletableFuture<MetaStorageServiceImpl> metaStorageService() {
         return metaStorageSvcFut;
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
similarity index 61%
copy from 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
copy to 
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
index c6f0b0fa07..b95be4417b 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
@@ -15,17 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.disaster.system.message;
-
-import org.apache.ignite.internal.network.annotations.MessageGroup;
+package org.apache.ignite.internal.raft;
 
 /**
- * Message Group for disaster recovery of system groups.
+ * Raft index with the corresponding term.
  */
-@MessageGroup(groupType = 15, groupName = "SystemDisasterRecoveryMessages")
-public class SystemDisasterRecoveryMessageGroup {
-    /**
-     * Message type for {@link ResetClusterMessage}.
-     */
-    public static final short RESET_CLUSTER = 1;
+public class IndexWithTerm {
+    private final long index;
+    private final long term;
+
+    /** Constructor. */
+    public IndexWithTerm(long index, long term) {
+        this.index = index;
+        this.term = term;
+    }
+
+    /** Returns the index. */
+    public long index() {
+        return index;
+    }
+
+    /** Returns the term corresponding to the index. */
+    public long term() {
+        return term;
+    }
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
index f990cc0804..91b1c1fda0 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftManager.java
@@ -189,4 +189,14 @@ public interface RaftManager extends IgniteComponent {
      */
     void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptionsConfigurer 
raftGroupOptionsConfigurer)
             throws NodeStoppingException;
+
+    /**
+     * Returns information about index and term of the given node, or {@code 
null} if the group is not started.
+     *
+     * @param nodeId ID of the Raft node.
+     * @param raftGroupOptionsConfigurer Group options configurer.
+     * @throws NodeStoppingException If the node is already being stopped.
+     */
+    @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId, 
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer)
+            throws NodeStoppingException;
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 24d3a52ed8..54979dc990 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -414,6 +414,23 @@ public class Loza implements RaftManager {
         }
     }
 
+    @Override
+    public @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId, 
RaftGroupOptionsConfigurer raftGroupOptionsConfigurer)
+            throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            RaftGroupOptions groupOptions = RaftGroupOptions.defaults();
+            raftGroupOptionsConfigurer.configure(groupOptions);
+
+            return raftServer.raftNodeIndex(nodeId, groupOptions);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     private <T extends RaftGroupService> CompletableFuture<T> 
startRaftGroupNodeInternal(
             RaftNodeId nodeId,
             PeersAndLearners configuration,
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 17004e53d9..861cecd8f2 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -28,6 +29,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -108,6 +110,14 @@ public interface RaftServer extends IgniteComponent {
      */
     void destroyRaftNodeStorages(RaftNodeId nodeId, RaftGroupOptions 
groupOptions);
 
+    /**
+     * Returns information about index and term of the given node, or {@code 
null} if the group is not started.
+     *
+     * @param nodeId ID of the Raft node.
+     * @param groupOptions Options for this group.
+     */
+    @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId, RaftGroupOptions 
groupOptions);
+
     /**
      * Returns local nodes running the given Raft group.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index ce499affeb..5403d06af9 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.raft.Marshaller;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -85,6 +86,7 @@ import 
org.apache.ignite.raft.jraft.core.NodeImpl.LogEntryAndClosure;
 import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl.ReadIndexEvent;
 import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
+import org.apache.ignite.raft.jraft.entity.LogId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -577,6 +579,19 @@ public class JraftServerImpl implements RaftServer {
         }
     }
 
+    @Override
+    public @Nullable IndexWithTerm raftNodeIndex(RaftNodeId nodeId, 
RaftGroupOptions groupOptions) {
+        RaftGroupService service = nodes.get(nodeId);
+
+        if (service == null) {
+            return null;
+        }
+
+        LogId logId = service.getRaftNode().lastLogIndexAndTerm();
+
+        return new IndexWithTerm(logId.getIndex(), logId.getTerm());
+    }
+
     /**
      * Performs a {@code resetPeers} operation on raft node.
      *
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
index 3cc5e80dc4..7137292aaf 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java
@@ -22,7 +22,7 @@ import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.core.NodeMetrics;
 import org.apache.ignite.raft.jraft.core.Replicator;
 import org.apache.ignite.raft.jraft.core.State;
-import org.apache.ignite.raft.jraft.entity.NodeId;
+import org.apache.ignite.raft.jraft.entity.LogId;import 
org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.Task;
 import org.apache.ignite.raft.jraft.entity.UserLog;
@@ -328,4 +328,10 @@ public interface Node extends Lifecycle<NodeOptions>, 
Describer {
      * Returns the value of last replicated log index. Corresponding log entry 
might not yet be written to the log storage (no flush).
      */
     long lastLogIndex();
+
+    /**
+     * Returns the value of last replicated log index (with its term). 
Corresponding log entry might not yet be written to the log storage
+     * (no flush).
+     */
+    LogId lastLogIndexAndTerm();
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 0fcf832981..3bfbae526b 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -2787,6 +2787,17 @@ public class NodeImpl implements Node, RaftServerService 
{
         }
     }
 
+    @Override
+    public LogId lastLogIndexAndTerm() {
+        this.readLock.lock();
+        try {
+            return logManager.getLastLogId(false).copy();
+        }
+        finally {
+            this.readLock.unlock();
+        }
+    }
+
     @OnlyForTest
     ConfigurationEntry getConf() {
         this.readLock.lock();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d117637b69..5545170441 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -551,14 +551,6 @@ public class IgniteImpl implements Ignite {
                 failureProcessor
         );
 
-        systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl(
-                name,
-                clusterSvc.topologyService(),
-                clusterSvc.messagingService(),
-                vaultMgr,
-                restarter
-        );
-
         clock = new HybridClockImpl();
 
         clockWaiter = new ClockWaiter(name, clock);
@@ -701,6 +693,15 @@ public class IgniteImpl implements Ignite {
 
         
metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageExtensionConfiguration.KEY).metaStorage());
 
+        systemDisasterRecoveryManager = new SystemDisasterRecoveryManagerImpl(
+                name,
+                clusterSvc.topologyService(),
+                clusterSvc.messagingService(),
+                vaultMgr,
+                restarter,
+                metaStorageMgr::raftNodeIndex
+        );
+
         SchemaSynchronizationConfiguration schemaSyncConfig = 
clusterConfigRegistry
                 
.getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync();
 
@@ -1228,11 +1229,11 @@ public class IgniteImpl implements Ignite {
                     failureProcessor,
                     clusterStateStorage,
                     clusterIdService,
+                    systemDisasterRecoveryManager,
                     criticalWorkerRegistry,
                     nettyBootstrapFactory,
                     nettyWorkersRegistrar,
                     clusterSvc,
-                    systemDisasterRecoveryManager,
                     restComponent,
                     partitionsLogStorageFactory,
                     msLogStorageFactory,
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermRequestMessage.java
similarity index 70%
copy from 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
copy to 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermRequestMessage.java
index c6f0b0fa07..1b2ed51546 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermRequestMessage.java
@@ -17,15 +17,12 @@
 
 package org.apache.ignite.internal.disaster.system.message;
 
-import org.apache.ignite.internal.network.annotations.MessageGroup;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
 
 /**
- * Message Group for disaster recovery of system groups.
+ * Message for starting repair of Metastorage on the recipient node.
  */
-@MessageGroup(groupType = 15, groupName = "SystemDisasterRecoveryMessages")
-public class SystemDisasterRecoveryMessageGroup {
-    /**
-     * Message type for {@link ResetClusterMessage}.
-     */
-    public static final short RESET_CLUSTER = 1;
+@Transferable(SystemDisasterRecoveryMessageGroup.METASTORAGE_INDEX_TERM_REQUEST)
+public interface MetastorageIndexTermRequestMessage extends NetworkMessage {
 }
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermResponseMessage.java
similarity index 59%
copy from 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
copy to 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermResponseMessage.java
index c6f0b0fa07..85eecbfbb6 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/MetastorageIndexTermResponseMessage.java
@@ -17,15 +17,21 @@
 
 package org.apache.ignite.internal.disaster.system.message;
 
-import org.apache.ignite.internal.network.annotations.MessageGroup;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
 
 /**
- * Message Group for disaster recovery of system groups.
+ * Response to {@link MetastorageIndexTermRequestMessage}.
  */
-@MessageGroup(groupType = 15, groupName = "SystemDisasterRecoveryMessages")
-public class SystemDisasterRecoveryMessageGroup {
+@Transferable(SystemDisasterRecoveryMessageGroup.METASTORAGE_INDEX_TERM_RESPONSE)
+public interface MetastorageIndexTermResponseMessage extends NetworkMessage {
     /**
-     * Message type for {@link ResetClusterMessage}.
+     * Returns Raft index of the Metastorage group on the sender of this 
message.
      */
-    public static final short RESET_CLUSTER = 1;
+    long raftIndex();
+
+    /**
+     * Returns term corresponding to the Raft index of the Metastorage group 
on the sender of this message.
+     */
+    long raftTerm();
 }
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
index c6f0b0fa07..e94ea8bd9c 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/SystemDisasterRecoveryMessageGroup.java
@@ -28,4 +28,14 @@ public class SystemDisasterRecoveryMessageGroup {
      * Message type for {@link ResetClusterMessage}.
      */
     public static final short RESET_CLUSTER = 1;
+
+    /**
+     * Message type for {@link MetastorageIndexTermRequestMessage}.
+     */
+    public static final short METASTORAGE_INDEX_TERM_REQUEST = 2;
+
+    /**
+     * Message type for {@link MetastorageIndexTermResponseMessage}.
+     */
+    public static final short METASTORAGE_INDEX_TERM_RESPONSE = 3;
 }
diff --git a/modules/system-disaster-recovery/build.gradle 
b/modules/system-disaster-recovery/build.gradle
index e4b9ec8d05..6989f69f5b 100644
--- a/modules/system-disaster-recovery/build.gradle
+++ b/modules/system-disaster-recovery/build.gradle
@@ -28,6 +28,7 @@ dependencies {
     implementation project(':ignite-network-api')
     implementation project(':ignite-vault')
     implementation project(':ignite-cluster-management')
+    implementation project(':ignite-raft-api')
     implementation libs.jetbrains.annotations
 
     testImplementation libs.hamcrest.core
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageIndexGettingTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageIndexGettingTest.java
new file mode 100644
index 0000000000..00b9e0173b
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItMetastorageIndexGettingTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.junit.jupiter.api.Test;
+
+class ItMetastorageIndexGettingTest extends ClusterPerTestIntegrationTest {
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    @Override
+    protected int initialNodes() {
+        return 0;
+    }
+
+    @Test
+    void indexAndTermAreReturnedFromRunningNode() throws Exception {
+        cluster.startAndInit(2, parametersBuilder -> {
+            parametersBuilder.cmgNodeNames(cluster.nodeName(0));
+            parametersBuilder.metaStorageNodeNames(cluster.nodeName(1));
+        });
+
+        MetastorageIndexTermResponseMessage response = 
getMetastorageIndexAndTermFrom(1);
+
+        assertThat(response.raftIndex(), is(greaterThanOrEqualTo(1L)));
+        assertThat(response.raftTerm(), is(greaterThanOrEqualTo(1L)));
+    }
+
+    private MetastorageIndexTermResponseMessage 
getMetastorageIndexAndTermFrom(int targetNodeIndex)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        IgniteImpl ignite0 = igniteImpl(0);
+
+        CompletableFuture<NetworkMessage> future = 
ignite0.clusterService().messagingService().invoke(
+                cluster.nodeName(targetNodeIndex),
+                messagesFactory.metastorageIndexTermRequestMessage().build(),
+                SECONDS.toMillis(10)
+        );
+        return (MetastorageIndexTermResponseMessage) future.get(10, SECONDS);
+    }
+
+    @Test
+    void indexAndTermAreReturnedFromNodeThatHangsOnStartingMetastorage() 
throws Exception {
+        cluster.startAndInit(3, parametersBuilder -> {
+            parametersBuilder.cmgNodeNames(cluster.nodeName(0));
+            parametersBuilder.metaStorageNodeNames(cluster.nodeName(1));
+        });
+
+        // Stopping node 1 will make Metastorage lose majority.
+        IntStream.of(1, 2).parallel().forEach(cluster::stopNode);
+
+        // This will not be able to finish its startup.
+        cluster.startEmbeddedNode(2);
+
+        MetastorageIndexTermResponseMessage response = 
getMetastorageIndexAndTermFrom(2);
+
+        assertThat(response.raftIndex(), is(greaterThanOrEqualTo(1L)));
+        assertThat(response.raftTerm(), is(greaterThanOrEqualTo(1L)));
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
index d8138db464..0887aa3764 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
@@ -32,6 +32,8 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Used to handle volatile information about cluster ID used to restrict which 
nodes can connect this one and vice versa.
+ *
+ * <p>This MUST be started after the Vault, but before networking.
  */
 public class ClusterIdService implements ClusterIdSupplier, ClusterIdStore, 
IgniteComponent {
     private final SystemDisasterRecoveryStorage storage;
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index a9c815c92a..bd638f715a 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -36,9 +36,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 import 
org.apache.ignite.internal.disaster.system.message.ResetClusterMessageBuilder;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
@@ -48,6 +51,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.ClusterNode;
@@ -61,6 +65,7 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
     private final TopologyService topologyService;
     private final MessagingService messagingService;
     private final ServerRestarter restarter;
+    private final Supplier<CompletableFuture<IndexWithTerm>> 
metastorageIndexWithTerm;
 
     private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
     private static final CmgMessagesFactory cmgMessagesFactory = new 
CmgMessagesFactory();
@@ -76,12 +81,14 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
             TopologyService topologyService,
             MessagingService messagingService,
             VaultManager vaultManager,
-            ServerRestarter restarter
+            ServerRestarter restarter,
+            Supplier<CompletableFuture<IndexWithTerm>> metastorageIndexWithTerm
     ) {
         this.thisNodeName = thisNodeName;
         this.topologyService = topologyService;
         this.messagingService = messagingService;
         this.restarter = restarter;
+        this.metastorageIndexWithTerm = metastorageIndexWithTerm;
 
         storage = new SystemDisasterRecoveryStorage(vaultManager);
         restartExecutor = new ThreadPerTaskExecutor(thisNodeName + 
"-restart-");
@@ -93,6 +100,9 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
             if (message instanceof ResetClusterMessage) {
                 assert correlationId != null;
                 handleResetClusterMessage((ResetClusterMessage) message, 
sender, correlationId);
+            } else if (message instanceof MetastorageIndexTermRequestMessage) {
+                assert correlationId != null;
+                handleMetastorageIndexTermRequest(sender, correlationId);
             }
         });
 
@@ -112,6 +122,18 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
         });
     }
 
+    private void handleMetastorageIndexTermRequest(ClusterNode sender, long 
correlationId) {
+        metastorageIndexWithTerm.get()
+                .thenAccept(indexWithTerm -> {
+                    MetastorageIndexTermResponseMessage response = 
messagesFactory.metastorageIndexTermResponseMessage()
+                            .raftIndex(indexWithTerm.index())
+                            .raftTerm(indexWithTerm.term())
+                            .build();
+
+                    messagingService.respond(sender, response, correlationId);
+                });
+    }
+
     private static SuccessResponseMessage successResponseMessage() {
         return cmgMessagesFactory.successResponseMessage().build();
     }
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index bfb06116e3..d00564ae6d 100644
--- 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -56,9 +56,11 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
 import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 import 
org.apache.ignite.internal.disaster.system.message.ResetClusterMessageBuilder;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
@@ -71,6 +73,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessageHandler;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.raft.IndexWithTerm;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -122,6 +125,9 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     @Mock
     private ServerRestarter restarter;
 
+    @Mock
+    private Supplier<CompletableFuture<IndexWithTerm>> 
metastorageIndexWithTerm;
+
     private SystemDisasterRecoveryManagerImpl manager;
 
     private final ComponentContext componentContext = new ComponentContext();
@@ -159,7 +165,8 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
                 topologyService,
                 messagingService,
                 vaultManager,
-                restarter
+                restarter,
+                metastorageIndexWithTerm
         );
         assertThat(manager.startAsync(componentContext), 
willCompleteSuccessfully());
     }
@@ -587,6 +594,21 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         assertThat(ex.getMessage(), is("Metastorage replication factor cannot 
exceed size of current physical topology (1)."));
     }
 
+    @Test
+    void returnsIndexAndTerm() {
+        when(metastorageIndexWithTerm.get()).thenReturn(completedFuture(new 
IndexWithTerm(234, 2)));
+
+        NetworkMessageHandler handler = extractMessageHandler();
+        
handler.onReceived(messagesFactory.metastorageIndexTermRequestMessage().build(),
 thisNode, 123L);
+
+        ArgumentCaptor<MetastorageIndexTermResponseMessage> captor = 
ArgumentCaptor.forClass(MetastorageIndexTermResponseMessage.class);
+        verify(messagingService).respond(eq(thisNode), captor.capture(), 
eq(123L));
+
+        MetastorageIndexTermResponseMessage response = captor.getValue();
+        assertThat(response.raftIndex(), is(234L));
+        assertThat(response.raftTerm(), is(2L));
+    }
+
     private enum ResetCluster {
         CMG_ONLY {
             @Override


Reply via email to