This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bcd1f19cf9 IGNITE-18087 DistributionZoneManager populated with CMG 
listeners to logical topology events (#1436)
bcd1f19cf9 is described below

commit bcd1f19cf931b3b784101cc624942fefc2a4e6cb
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Dec 19 09:30:58 2022 +0300

    IGNITE-18087 DistributionZoneManager populated with CMG listeners to 
logical topology events (#1436)
---
 .../management/topology/LogicalTopologyImpl.java   |   2 +-
 modules/distribution-zones/build.gradle            |   1 +
 modules/distribution-zones/pom.xml                 |   6 +
 .../distributionzones/DistributionZoneManager.java | 169 ++++++-
 .../distributionzones/DistributionZonesUtil.java   |  54 ++-
 ...ibutionZoneManagerConfigurationChangesTest.java |  14 +-
 ...butionZoneManagerLogicalTopologyEventsTest.java | 526 +++++++++++++++++++++
 .../DistributionZoneManagerTest.java               |   4 +-
 .../ItDistributedConfigurationStorageTest.java     |   2 +
 .../internal/runner/app/ItTablesApiTest.java       |   1 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 .../storage/DistributedConfigurationStorage.java   |   9 +-
 12 files changed, 767 insertions(+), 23 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
index 798fc41cea..5188a3b342 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
@@ -44,7 +44,7 @@ public class LogicalTopologyImpl implements LogicalTopology {
     private static final IgniteLogger LOG = 
Loggers.forClass(LogicalTopologyImpl.class);
 
     /** Storage key for the logical topology. */
-    private static final byte[] LOGICAL_TOPOLOGY_KEY = 
"logical".getBytes(UTF_8);
+    public static final byte[] LOGICAL_TOPOLOGY_KEY = 
"logical".getBytes(UTF_8);
 
     private final ClusterStateStorage storage;
 
diff --git a/modules/distribution-zones/build.gradle 
b/modules/distribution-zones/build.gradle
index 4135d200cf..c1e9ae2500 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -46,6 +46,7 @@ dependencies {
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-configuration')))
     testImplementation(testFixtures(project(':ignite-metastorage-server')))
+    testImplementation(testFixtures(project(':ignite-cluster-management')))
 }
 
 description = 'ignite-distribution-zones'
diff --git a/modules/distribution-zones/pom.xml 
b/modules/distribution-zones/pom.xml
index ebfacfa8f1..1e6aa16652 100644
--- a/modules/distribution-zones/pom.xml
+++ b/modules/distribution-zones/pom.xml
@@ -101,6 +101,12 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-cluster-management</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index c6d9adfe6b..87eb81edae 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -21,20 +21,28 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static org.apache.ignite.internal.metastorage.client.Conditions.value;
 import static org.apache.ignite.internal.metastorage.client.Operations.ops;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
 
+import java.util.Arrays;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.NamedListChange;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
 import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
@@ -46,6 +54,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.client.CompoundCondition;
+import org.apache.ignite.internal.metastorage.client.Condition;
 import org.apache.ignite.internal.metastorage.client.If;
 import org.apache.ignite.internal.metastorage.client.Update;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -74,21 +83,48 @@ public class DistributionZoneManager implements 
IgniteComponent {
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
+    /** Prevents double stopping of the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    /** Logical topology service to track topology changes. */
+    private final LogicalTopologyService logicalTopologyService;
+
+    /** Listener for a topology events. */
+    private final LogicalTopologyEventListener topologyEventListener = new 
LogicalTopologyEventListener() {
+        @Override
+        public void onAppeared(ClusterNode appearedNode, 
LogicalTopologySnapshot newTopology) {
+            updateLogicalTopologyInMetaStorage(newTopology, false);
+        }
+
+        @Override
+        public void onDisappeared(ClusterNode disappearedNode, 
LogicalTopologySnapshot newTopology) {
+            updateLogicalTopologyInMetaStorage(newTopology, false);
+        }
+
+        @Override
+        public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+            updateLogicalTopologyInMetaStorage(newTopology, true);
+        }
+    };
+
     /**
      * Creates a new distribution zone manager.
      *
      * @param zonesConfiguration Distribution zones configuration.
      * @param metaStorageManager Meta Storage manager.
      * @param cmgManager Cluster management group manager.
+     * @param logicalTopologyService Logical topology service.
      */
     public DistributionZoneManager(
             DistributionZonesConfiguration zonesConfiguration,
             MetaStorageManager metaStorageManager,
-            ClusterManagementGroupManager cmgManager
+            ClusterManagementGroupManager cmgManager,
+            LogicalTopologyService logicalTopologyService
     ) {
         this.zonesConfiguration = zonesConfiguration;
         this.metaStorageManager = metaStorageManager;
         this.cmgManager = cmgManager;
+        this.logicalTopologyService = logicalTopologyService;
     }
 
     /**
@@ -235,12 +271,22 @@ public class DistributionZoneManager implements 
IgniteComponent {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new 
ZonesConfigurationListener());
+
+        logicalTopologyService.addEventListener(topologyEventListener);
+
+        initMetaStorageKeysOnStart();
     }
 
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
 
+        logicalTopologyService.removeEventListener(topologyEventListener);
     }
 
     private class ZonesConfigurationListener implements 
ConfigurationNamedListListener<DistributionZoneView> {
@@ -281,24 +327,24 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
 
         try {
-            Set<ClusterNode> clusterNodes;
+            Set<ClusterNode> logicalTopology;
 
-            //TODO temporary code, will be removed in 
https://issues.apache.org/jira/browse/IGNITE-18087
+            //TODO temporary code, will be removed in 
https://issues.apache.org/jira/browse/IGNITE-18121
             try {
-                clusterNodes = cmgManager.logicalTopology().get().nodes();
+                logicalTopology = cmgManager.logicalTopology().get().nodes();
             } catch (InterruptedException | ExecutionException e) {
                 throw new IgniteInternalException(e);
             }
 
+            assert !logicalTopology.isEmpty() : "Logical topology cannot be 
empty.";
+
             // Update data nodes for a zone only if the revision of the event 
is newer than value in that trigger key,
             // so we do not react on a stale events
             CompoundCondition triggerKeyCondition = 
triggerKeyCondition(revision);
 
-            Set<String> nodesConsistentIds = 
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
-
-            byte[] logicalTopologyBytes = 
ByteUtils.toBytes(nodesConsistentIds);
+            Set<String> nodesConsistentIds = 
logicalTopology.stream().map(ClusterNode::name).collect(Collectors.toSet());
 
-            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+            Update dataNodesAndTriggerKeyUpd = 
updateDataNodesAndTriggerKey(zoneId, revision, nodesConsistentIds);
 
             If iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));
 
@@ -344,7 +390,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
     }
 
     /**
-     * Method deleted data nodes value for the specified zone,
+     * Method deletes data nodes value for the specified zone,
      * also sets {@code revision} to the {@link 
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
      *
      * @param zoneId Unique id of a zone
@@ -373,4 +419,109 @@ public class DistributionZoneManager implements 
IgniteComponent {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Updates {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and 
{@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()}
+     * in meta storage.
+     *
+     * @param newTopology Logical topology snapshot.
+     * @param topologyLeap Flag that indicates whether this updates was 
trigger by
+     *                     {@link 
LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} or not.
+     */
+    private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot 
newTopology, boolean topologyLeap) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+        }
+
+        try {
+            Set<String> topologyFromCmg = 
newTopology.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+            Condition updateCondition;
+
+            if (topologyLeap) {
+                updateCondition = 
value(zonesLogicalTopologyVersionKey()).lt(ByteUtils.longToBytes(newTopology.version()));
+            } else {
+                // This condition may be stronger, as far as we receive 
topology events one by one.
+                updateCondition = 
value(zonesLogicalTopologyVersionKey()).eq(ByteUtils.longToBytes(newTopology.version()
 - 1));
+            }
+
+            If iff = If.iif(
+                    updateCondition,
+                    updateLogicalTopologyAndVersion(topologyFromCmg, 
newTopology.version()),
+                    ops().yield(false)
+            );
+
+            metaStorageManager.invoke(iff).thenAccept(res -> {
+                if (res.getAsBoolean()) {
+                    LOG.debug(
+                            "Distribution zones' logical topology and version 
keys were updated [topology = {}, version = {}]",
+                            Arrays.toString(topologyFromCmg.toArray()),
+                            newTopology.version()
+                    );
+                } else {
+                    LOG.debug(
+                            "Failed to update distribution zones' logical 
topology and version keys [topology = {}, version = {}]",
+                            Arrays.toString(topologyFromCmg.toArray()),
+                            newTopology.version()
+                    );
+                }
+            });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Initialises {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and
+     * {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} from 
meta storage on the start of {@link DistributionZoneManager}.
+     */
+    private void initMetaStorageKeysOnStart() {
+        logicalTopologyService.logicalTopologyOnLeader().thenAccept(snapshot 
-> {
+            if (!busyLock.enterBusy()) {
+                throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+            }
+
+            try {
+                long topologyVersionFromCmg = snapshot.version();
+
+                byte[] topVerFromMetastorage;
+
+                try {
+                    topVerFromMetastorage = 
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new IgniteInternalException(UNEXPECTED_ERR, e);
+                }
+
+                if (topVerFromMetastorage == null || 
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
+                    Set<String> topologyFromCmg = 
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+                    Condition topologyVersionCondition = 
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+
+                    If iff = If.iif(topologyVersionCondition,
+                            updateLogicalTopologyAndVersion(topologyFromCmg, 
topologyVersionFromCmg),
+                            ops().yield(false)
+                    );
+
+                    metaStorageManager.invoke(iff).thenAccept(res -> {
+                        if (res.getAsBoolean()) {
+                            LOG.debug(
+                                    "Distribution zones' logical topology and 
version keys were initialised [topology = {}, version = {}]",
+                                    Arrays.toString(topologyFromCmg.toArray()),
+                                    topologyVersionFromCmg
+                            );
+                        } else {
+                            LOG.debug(
+                                    "Failed to initialize distribution zones' 
logical topology "
+                                            + "and version keys [topology = 
{}, version = {}]",
+                                    Arrays.toString(topologyFromCmg.toArray()),
+                                    topologyVersionFromCmg
+                            );
+                        }
+                    });
+                }
+            } finally {
+                busyLock.leaveBusy();
+            }
+        });
+    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 5262657590..a76902a1c5 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.metastorage.client.Operations.ops;
 import static org.apache.ignite.internal.metastorage.client.Operations.put;
 import static org.apache.ignite.internal.metastorage.client.Operations.remove;
 
+import java.util.Set;
 import org.apache.ignite.internal.metastorage.client.CompoundCondition;
 import org.apache.ignite.internal.metastorage.client.Update;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -36,8 +37,21 @@ class DistributionZonesUtil {
     /** Key prefix for zone's data nodes. */
     private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX = 
"distributionZone.dataNodes.";
 
+    /** Key prefix for zones' logical topology nodes. */
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY = 
"distributionZones.logicalTopology";
+
+    /** Key prefix for zones' logical topology version. */
+    private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION = 
"distributionZones.logicalTopologyVersion";
+
     /** The key, needed for processing the event about zones' update was 
triggered only once. */
-    private static final ByteArray ZONES_CHANGE_TRIGGER_KEY = new 
ByteArray("distributionZones.change.trigger");
+    private static final ByteArray DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY = new 
ByteArray("distributionZones.change.trigger");
+
+    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
+    private static final ByteArray DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY = 
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
+
+    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
+    private static final ByteArray 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
+            new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
 
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
     static ByteArray zoneDataNodesKey(int zoneId) {
@@ -48,7 +62,23 @@ class DistributionZonesUtil {
      * The key, needed for processing the event about zones' update was 
triggered only once.
      */
     static ByteArray zonesChangeTriggerKey() {
-        return ZONES_CHANGE_TRIGGER_KEY;
+        return DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY;
+    }
+
+    /**
+     * The key that represents logical topology nodes, needed for distribution 
zones. It is needed to store them in the metastore
+     * to serialize data nodes changes triggered by topology changes and 
changes of distribution zones configurations.
+     */
+    static ByteArray zonesLogicalTopologyKey() {
+        return DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY;
+    }
+
+    /**
+     * The key needed for processing the events about logical topology changes.
+     * Needed for the defencing against stale updates of logical topology 
nodes.
+     */
+    static ByteArray zonesLogicalTopologyVersionKey() {
+        return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
     }
 
     /**
@@ -70,16 +100,30 @@ class DistributionZonesUtil {
      *
      * @param zoneId Distribution zone id
      * @param revision Revision of the event.
-     * @param logicalTopologyBytes Logical topology.
+     * @param logicalTopology Logical topology.
      * @return Update command for the meta storage.
      */
-    static Update updateDataNodesAndTriggerKey(int zoneId, long revision, 
byte[] logicalTopologyBytes) {
+    static Update updateDataNodesAndTriggerKey(int zoneId, long revision, 
Set<String> logicalTopology) {
         return ops(
-                put(zoneDataNodesKey(zoneId), logicalTopologyBytes),
+                put(zoneDataNodesKey(zoneId), 
ByteUtils.toBytes(logicalTopology)),
                 put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
         ).yield(true);
     }
 
+    /**
+     * Updates logical topology and logical topology version values for zones.
+     *
+     * @param logicalTopology Logical topology.
+     * @param topologyVersion Logical topology version.
+     * @return Update command for the meta storage.
+     */
+    static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology, 
long topologyVersion) {
+        return ops(
+                put(zonesLogicalTopologyVersionKey(), 
ByteUtils.longToBytes(topologyVersion)),
+                put(zonesLogicalTopologyKey(), 
ByteUtils.toBytes(logicalTopology))
+        ).yield(true);
+    }
+
     /**
      * Sets {@code revision} to {@link 
DistributionZonesUtil#zonesChangeTriggerKey()}.
      *
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 2ea6019e54..e8325efc94 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.internal.metastorage.client.MetaStorageServiceIm
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -41,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
@@ -84,6 +86,9 @@ public class DistributionZoneManagerConfigurationChangesTest 
extends IgniteAbstr
 
     private ConfigurationManager clusterCfgMgr;
 
+    @Mock
+    private LogicalTopologyServiceImpl logicalTopologyService;
+
     @BeforeEach
     public void setUp() {
         clusterCfgMgr = new ConfigurationManager(
@@ -101,14 +106,21 @@ public class 
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         cmgManager = mock(ClusterManagementGroupManager.class);
 
+        logicalTopologyService = mock(LogicalTopologyServiceImpl.class);
+
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
                 metaStorageManager,
-                cmgManager
+                cmgManager,
+                logicalTopologyService
         );
 
         clusterCfgMgr.start();
 
+        doNothing().when(logicalTopologyService).addEventListener(any());
+
+        
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
 LogicalTopologySnapshot(1, Set.of())));
+
         distributionZoneManager.start();
 
         AtomicLong raftIndex = new AtomicLong();
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
new file mode 100644
index 0000000000..cc7272217a
--- /dev/null
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static 
org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import 
org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import 
org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import 
org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
+import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests reactions to topology changes in accordance with distribution zones 
logic.
+ */
+public class DistributionZoneManagerLogicalTopologyEventsTest {
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private LogicalTopology topology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZoneManager prepareDistributionZoneManager() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        DistributionZonesConfiguration zonesConfiguration = 
clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        topology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new 
LogicalTopologyServiceImpl(topology, cmgManager);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                logicalTopologyService
+        );
+
+        clusterCfgMgr.start();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new 
MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any(WriteCommand.class));
+
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new 
CompletableFuture<>();
+
+                    CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public ReadCommand command() {
+                            return (ReadCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onRead(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new 
IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any(ReadCommand.class));
+
+        MetaStorageCommandsFactory commandsFactory = new 
MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = 
commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, 
commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> 
new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+
+        lenient().doAnswer(invocationClose -> {
+            ByteArray key = invocationClose.getArgument(0);
+
+            GetCommand getCommand = 
commandsFactory.getCommand().key(key.bytes()).build();
+
+            return metaStorageService.run(getCommand).thenApply(bi -> {
+                SingleEntryResponse resp = (SingleEntryResponse) bi;
+
+                return new EntryImpl(new ByteArray(resp.key()), resp.value(), 
resp.revision(), resp.updateCounter());
+            });
+        }).when(metaStorageManager).get(any());
+
+        return distributionZoneManager;
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+
+        clusterStateStorage.destroy();
+    }
+
+    @Test
+    void testMetaStorageKeysInitializedOnStartWhenTopVerEmpty() throws 
Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
+
+        mockCmgLocalNodes(1L, clusterNodes);
+
+        distributionZoneManager1.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        assertLogicalTopVer(1L);
+
+        assertLogicalTopology(clusterNodes);
+    }
+
+    @Test
+    void testMetaStorageKeysInitializedOnStartWhenTopVerIsLessThanCmgTopVer() 
throws Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
+
+        mockCmgLocalNodes(2L, clusterNodes);
+
+        keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(), 
ByteUtils.longToBytes(1L));
+
+        distributionZoneManager1.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        assertLogicalTopVer(2L);
+
+        assertLogicalTopology(clusterNodes);
+    }
+
+    @Test
+    void testMetaStorageKeysInitializedOnStartWhenTopVerEqualsToCmgTopVer() 
throws Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
+
+        mockCmgLocalNodes(2L, clusterNodes);
+
+        keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(), 
ByteUtils.longToBytes(2L));
+
+        distributionZoneManager1.start();
+
+        verify(keyValueStorage, after(500).never()).invoke(any());
+
+        assertLogicalTopVer(2L);
+
+        assertLogicalTopology(null);
+    }
+
+    @Test
+    void testMetaStorageKeysInitializedOnStartWhenTopVerGreaterThanCmgTopVer() 
throws Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", 
null));
+
+        mockCmgLocalNodes(2L, clusterNodes);
+
+        keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(), 
ByteUtils.longToBytes(3L));
+
+        distributionZoneManager1.start();
+
+        verify(keyValueStorage, after(500).never()).invoke(any());
+
+        assertLogicalTopVer(3L);
+
+        assertLogicalTopology(null);
+    }
+
+    @Test
+    void testNodeAddingUpdatesLogicalTopologyInMetaStorage() throws Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        mockCmgLocalNodes(1L, clusterNodes);
+
+        distributionZoneManager1.start();
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node2);
+
+        var clusterNodes2 = Set.of(node1, node2);
+
+        assertLogicalTopology(clusterNodes2);
+
+        assertLogicalTopVer(2L);
+    }
+
+    @Test
+    void testNodeStaleAddingDoNotUpdatesLogicalTopologyInMetaStorage() throws 
Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        mockCmgLocalNodes(1L, clusterNodes);
+
+        distributionZoneManager1.start();
+
+        keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(), 
ByteUtils.longToBytes(4L));
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node2);
+
+        assertEquals(2L, topology.getLogicalTopology().version());
+
+        assertLogicalTopology(clusterNodes);
+
+        assertLogicalTopVer(4L);
+    }
+
+    @Test
+    void testNodeRemovingUpdatesLogicalTopologyInMetaStorage() throws 
Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        topology.putNode(node2);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1, node2);
+
+        mockCmgLocalNodes(2L, clusterNodes);
+
+        distributionZoneManager1.start();
+
+        assertLogicalTopology(clusterNodes);
+
+        topology.removeNodes(Set.of(node2));
+
+        var clusterNodes2 = Set.of(node1);
+
+        assertLogicalTopology(clusterNodes2);
+
+        assertLogicalTopVer(3L);
+    }
+
+    @Test
+    void testNodeStaleRemovingDoNotUpdatesLogicalTopologyInMetaStorage() 
throws Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        topology.putNode(node2);
+
+        assertEquals(2L, topology.getLogicalTopology().version());
+
+        Set<ClusterNode> clusterNodes = Set.of(node1, node2);
+
+        mockCmgLocalNodes(2L, clusterNodes);
+
+        distributionZoneManager1.start();
+
+        keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(), 
ByteUtils.longToBytes(4L));
+
+        topology.removeNodes(Set.of(node2));
+
+        assertLogicalTopology(clusterNodes);
+
+        assertLogicalTopVer(4L);
+    }
+
+    @Test
+    void testTopologyLeapUpdatesLogicalTopologyInMetaStorage() throws 
Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        mockCmgLocalNodes(2L, clusterNodes);
+
+        distributionZoneManager1.start();
+
+        assertLogicalTopology(clusterNodes);
+
+        var clusterNodes2 = Set.of(node1, node2);
+
+        clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new 
LogicalTopologySnapshot(10L, clusterNodes2)));
+
+        topology.fireTopologyLeap();
+
+        assertLogicalTopology(clusterNodes2);
+
+        assertLogicalTopVer(10L);
+    }
+
+    @Test
+    void testStaleTopologyLeapDoNotUpdatesLogicalTopologyInMetaStorage() 
throws Exception {
+        DistributionZoneManager distributionZoneManager1 = 
prepareDistributionZoneManager();
+
+        ClusterNode node1 = new ClusterNode("1", "name1", new 
NetworkAddress("localhost", 123));
+
+        ClusterNode node2 = new ClusterNode("2", "name2", new 
NetworkAddress("localhost", 123));
+
+        topology.putNode(node1);
+
+        Set<ClusterNode> clusterNodes = Set.of(node1);
+
+        mockCmgLocalNodes(2L, clusterNodes);
+
+        distributionZoneManager1.start();
+
+        assertLogicalTopology(clusterNodes);
+
+        var clusterNodes2 = Set.of(node1, node2);
+
+        clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new 
LogicalTopologySnapshot(10L, clusterNodes2)));
+
+        keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(), 
ByteUtils.longToBytes(11L));
+
+        topology.fireTopologyLeap();
+
+        assertLogicalTopology(clusterNodes);
+
+        assertLogicalTopVer(11L);
+    }
+
+    private LogicalTopologySnapshot mockCmgLocalNodes(long version, 
Set<ClusterNode> clusterNodes) {
+        LogicalTopologySnapshot logicalTopologySnapshot = new 
LogicalTopologySnapshot(version, clusterNodes);
+
+        
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
+
+        return logicalTopologySnapshot;
+    }
+
+    private void assertLogicalTopVer(long topVer) throws InterruptedException {
+        assertTrue(
+                waitForCondition(
+                        () -> 
ByteUtils.bytesToLong(keyValueStorage.get(zonesLogicalTopologyVersionKey().bytes()).value())
 == topVer, 1000
+                )
+        );
+    }
+
+    private void assertLogicalTopology(@Nullable Set<ClusterNode> 
clusterNodes) throws InterruptedException {
+        byte[] nodes = clusterNodes == null
+                ? null
+                : 
ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+        assertTrue(waitForCondition(() -> 
Arrays.equals(keyValueStorage.get(zonesLogicalTopologyKey().bytes()).value(), 
nodes), 1000));
+    }
+}
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
index f3b43f7b9f..9db029085d 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.ConfigurationChangeException;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
@@ -70,7 +71,8 @@ class DistributionZoneManagerTest extends IgniteAbstractTest {
         distributionZoneManager = new DistributionZoneManager(
                 zonesConfiguration,
                 mock(MetaStorageManager.class),
-                mock(ClusterManagementGroupManager.class)
+                mock(ClusterManagementGroupManager.class),
+                mock(LogicalTopologyServiceImpl.class)
         );
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 122fc48577..1b27cdee86 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -159,6 +160,7 @@ public class ItDistributedConfigurationStorageTest {
      * @see <a 
href="https://issues.apache.org/jira/browse/IGNITE-15213";>IGNITE-15213</a>
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18410";)
     void testRestartWithPds(@WorkDirectory Path workDir, TestInfo testInfo) 
throws Exception {
         var node = new Node(testInfo, workDir);
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index ba9eb80043..30f2a12a9e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -158,6 +158,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
      * @throws Exception If failed.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18410";)
     public void testTableAlreadyCreatedFromLaggedNode() throws Exception {
         clusterNodes.forEach(ign -> 
assertNull(ign.tables().table(TABLE_NAME)));
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 70af11ea08..aa2930acb5 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -352,7 +352,7 @@ public class IgniteImpl implements Ignite {
         DistributionZonesConfiguration zonesConfiguration = 
clusterCfgMgr.configurationRegistry()
                 .getConfiguration(DistributionZonesConfiguration.KEY);
 
-        distributionZoneManager = new 
DistributionZoneManager(zonesConfiguration, metaStorageMgr, cmgMgr);
+        distributionZoneManager = new 
DistributionZoneManager(zonesConfiguration, metaStorageMgr, cmgMgr, 
logicalTopologyService);
 
         restComponent = createRestComponent(name);
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index cba14d829f..c2add413ee 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -210,8 +210,6 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
     private long resolveRevision(@Nullable VaultEntry appliedRevEntry, 
@Nullable VaultEntry revisionsEntry) {
         long appliedRevision = appliedRevEntry == null ? 0L : 
ByteUtils.bytesToLong(appliedRevEntry.value());
 
-        long cfgRevision = appliedRevision;
-
         if (revisionsEntry != null) {
             byte[] value = revisionsEntry.value();
             long prevMasterKeyRevision = ByteUtils.bytesToLong(value, 0);
@@ -219,10 +217,11 @@ public class DistributedConfigurationStorage implements 
ConfigurationStorage {
 
             // If current master key revision is higher than applied revision, 
then node failed
             // before applied revision changed, so we have to use previous 
master key revision
-            cfgRevision = curMasterKeyRevision <= appliedRevision ? 
curMasterKeyRevision : prevMasterKeyRevision;
+            return curMasterKeyRevision <= appliedRevision ? 
curMasterKeyRevision : prevMasterKeyRevision;
+        } else {
+            // Configuration has not been updated yet, so it is safe to return 
0 as the revision for the master key.
+            return 0L;
         }
-
-        return cfgRevision;
     }
 
     private Data readDataOnRecovery0(long cfgRevision) {

Reply via email to