This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bcd1f19cf9 IGNITE-18087 DistributionZoneManager populated with CMG
listeners to logical topology events (#1436)
bcd1f19cf9 is described below
commit bcd1f19cf931b3b784101cc624942fefc2a4e6cb
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Dec 19 09:30:58 2022 +0300
IGNITE-18087 DistributionZoneManager populated with CMG listeners to
logical topology events (#1436)
---
.../management/topology/LogicalTopologyImpl.java | 2 +-
modules/distribution-zones/build.gradle | 1 +
modules/distribution-zones/pom.xml | 6 +
.../distributionzones/DistributionZoneManager.java | 169 ++++++-
.../distributionzones/DistributionZonesUtil.java | 54 ++-
...ibutionZoneManagerConfigurationChangesTest.java | 14 +-
...butionZoneManagerLogicalTopologyEventsTest.java | 526 +++++++++++++++++++++
.../DistributionZoneManagerTest.java | 4 +-
.../ItDistributedConfigurationStorageTest.java | 2 +
.../internal/runner/app/ItTablesApiTest.java | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../storage/DistributedConfigurationStorage.java | 9 +-
12 files changed, 767 insertions(+), 23 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
index 798fc41cea..5188a3b342 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
@@ -44,7 +44,7 @@ public class LogicalTopologyImpl implements LogicalTopology {
private static final IgniteLogger LOG =
Loggers.forClass(LogicalTopologyImpl.class);
/** Storage key for the logical topology. */
- private static final byte[] LOGICAL_TOPOLOGY_KEY =
"logical".getBytes(UTF_8);
+ public static final byte[] LOGICAL_TOPOLOGY_KEY =
"logical".getBytes(UTF_8);
private final ClusterStateStorage storage;
diff --git a/modules/distribution-zones/build.gradle
b/modules/distribution-zones/build.gradle
index 4135d200cf..c1e9ae2500 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -46,6 +46,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-metastorage-server')))
+ testImplementation(testFixtures(project(':ignite-cluster-management')))
}
description = 'ignite-distribution-zones'
diff --git a/modules/distribution-zones/pom.xml
b/modules/distribution-zones/pom.xml
index ebfacfa8f1..1e6aa16652 100644
--- a/modules/distribution-zones/pom.xml
+++ b/modules/distribution-zones/pom.xml
@@ -101,6 +101,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-cluster-management</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index c6d9adfe6b..87eb81edae 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -21,20 +21,28 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesKeyAndUpdateTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.triggerKeyCondition;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static org.apache.ignite.internal.metastorage.client.Conditions.value;
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
+import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedListChange;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
@@ -46,6 +54,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.CompoundCondition;
+import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.If;
import org.apache.ignite.internal.metastorage.client.Update;
import org.apache.ignite.internal.util.ByteUtils;
@@ -74,21 +83,48 @@ public class DistributionZoneManager implements
IgniteComponent {
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+ /** Prevents double stopping of the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /** Logical topology service to track topology changes. */
+ private final LogicalTopologyService logicalTopologyService;
+
+ /** Listener for a topology events. */
+ private final LogicalTopologyEventListener topologyEventListener = new
LogicalTopologyEventListener() {
+ @Override
+ public void onAppeared(ClusterNode appearedNode,
LogicalTopologySnapshot newTopology) {
+ updateLogicalTopologyInMetaStorage(newTopology, false);
+ }
+
+ @Override
+ public void onDisappeared(ClusterNode disappearedNode,
LogicalTopologySnapshot newTopology) {
+ updateLogicalTopologyInMetaStorage(newTopology, false);
+ }
+
+ @Override
+ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+ updateLogicalTopologyInMetaStorage(newTopology, true);
+ }
+ };
+
/**
* Creates a new distribution zone manager.
*
* @param zonesConfiguration Distribution zones configuration.
* @param metaStorageManager Meta Storage manager.
* @param cmgManager Cluster management group manager.
+ * @param logicalTopologyService Logical topology service.
*/
public DistributionZoneManager(
DistributionZonesConfiguration zonesConfiguration,
MetaStorageManager metaStorageManager,
- ClusterManagementGroupManager cmgManager
+ ClusterManagementGroupManager cmgManager,
+ LogicalTopologyService logicalTopologyService
) {
this.zonesConfiguration = zonesConfiguration;
this.metaStorageManager = metaStorageManager;
this.cmgManager = cmgManager;
+ this.logicalTopologyService = logicalTopologyService;
}
/**
@@ -235,12 +271,22 @@ public class DistributionZoneManager implements
IgniteComponent {
@Override
public void start() {
zonesConfiguration.distributionZones().listenElements(new
ZonesConfigurationListener());
+
+ logicalTopologyService.addEventListener(topologyEventListener);
+
+ initMetaStorageKeysOnStart();
}
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+ logicalTopologyService.removeEventListener(topologyEventListener);
}
private class ZonesConfigurationListener implements
ConfigurationNamedListListener<DistributionZoneView> {
@@ -281,24 +327,24 @@ public class DistributionZoneManager implements
IgniteComponent {
}
try {
- Set<ClusterNode> clusterNodes;
+ Set<ClusterNode> logicalTopology;
- //TODO temporary code, will be removed in
https://issues.apache.org/jira/browse/IGNITE-18087
+ //TODO temporary code, will be removed in
https://issues.apache.org/jira/browse/IGNITE-18121
try {
- clusterNodes = cmgManager.logicalTopology().get().nodes();
+ logicalTopology = cmgManager.logicalTopology().get().nodes();
} catch (InterruptedException | ExecutionException e) {
throw new IgniteInternalException(e);
}
+ assert !logicalTopology.isEmpty() : "Logical topology cannot be
empty.";
+
// Update data nodes for a zone only if the revision of the event
is newer than value in that trigger key,
// so we do not react on a stale events
CompoundCondition triggerKeyCondition =
triggerKeyCondition(revision);
- Set<String> nodesConsistentIds =
clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet());
-
- byte[] logicalTopologyBytes =
ByteUtils.toBytes(nodesConsistentIds);
+ Set<String> nodesConsistentIds =
logicalTopology.stream().map(ClusterNode::name).collect(Collectors.toSet());
- Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, logicalTopologyBytes);
+ Update dataNodesAndTriggerKeyUpd =
updateDataNodesAndTriggerKey(zoneId, revision, nodesConsistentIds);
If iif = If.iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd,
ops().yield(false));
@@ -344,7 +390,7 @@ public class DistributionZoneManager implements
IgniteComponent {
}
/**
- * Method deleted data nodes value for the specified zone,
+ * Method deletes data nodes value for the specified zone,
* also sets {@code revision} to the {@link
DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
*
* @param zoneId Unique id of a zone
@@ -373,4 +419,109 @@ public class DistributionZoneManager implements
IgniteComponent {
busyLock.leaveBusy();
}
}
+
+ /**
+ * Updates {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and
{@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()}
+ * in meta storage.
+ *
+ * @param newTopology Logical topology snapshot.
+ * @param topologyLeap Flag that indicates whether this updates was
trigger by
+ * {@link
LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} or not.
+ */
+ private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot
newTopology, boolean topologyLeap) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ Set<String> topologyFromCmg =
newTopology.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+ Condition updateCondition;
+
+ if (topologyLeap) {
+ updateCondition =
value(zonesLogicalTopologyVersionKey()).lt(ByteUtils.longToBytes(newTopology.version()));
+ } else {
+ // This condition may be stronger, as far as we receive
topology events one by one.
+ updateCondition =
value(zonesLogicalTopologyVersionKey()).eq(ByteUtils.longToBytes(newTopology.version()
- 1));
+ }
+
+ If iff = If.iif(
+ updateCondition,
+ updateLogicalTopologyAndVersion(topologyFromCmg,
newTopology.version()),
+ ops().yield(false)
+ );
+
+ metaStorageManager.invoke(iff).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug(
+ "Distribution zones' logical topology and version
keys were updated [topology = {}, version = {}]",
+ Arrays.toString(topologyFromCmg.toArray()),
+ newTopology.version()
+ );
+ } else {
+ LOG.debug(
+ "Failed to update distribution zones' logical
topology and version keys [topology = {}, version = {}]",
+ Arrays.toString(topologyFromCmg.toArray()),
+ newTopology.version()
+ );
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Initialises {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and
+ * {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} from
meta storage on the start of {@link DistributionZoneManager}.
+ */
+ private void initMetaStorageKeysOnStart() {
+ logicalTopologyService.logicalTopologyOnLeader().thenAccept(snapshot
-> {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ long topologyVersionFromCmg = snapshot.version();
+
+ byte[] topVerFromMetastorage;
+
+ try {
+ topVerFromMetastorage =
metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalException(UNEXPECTED_ERR, e);
+ }
+
+ if (topVerFromMetastorage == null ||
ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
+ Set<String> topologyFromCmg =
snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+ Condition topologyVersionCondition =
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+
+ If iff = If.iif(topologyVersionCondition,
+ updateLogicalTopologyAndVersion(topologyFromCmg,
topologyVersionFromCmg),
+ ops().yield(false)
+ );
+
+ metaStorageManager.invoke(iff).thenAccept(res -> {
+ if (res.getAsBoolean()) {
+ LOG.debug(
+ "Distribution zones' logical topology and
version keys were initialised [topology = {}, version = {}]",
+ Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ } else {
+ LOG.debug(
+ "Failed to initialize distribution zones'
logical topology "
+ + "and version keys [topology =
{}, version = {}]",
+ Arrays.toString(topologyFromCmg.toArray()),
+ topologyVersionFromCmg
+ );
+ }
+ });
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 5262657590..a76902a1c5 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.internal.metastorage.client.Operations.put;
import static org.apache.ignite.internal.metastorage.client.Operations.remove;
+import java.util.Set;
import org.apache.ignite.internal.metastorage.client.CompoundCondition;
import org.apache.ignite.internal.metastorage.client.Update;
import org.apache.ignite.internal.util.ByteUtils;
@@ -36,8 +37,21 @@ class DistributionZonesUtil {
/** Key prefix for zone's data nodes. */
private static final String DISTRIBUTION_ZONE_DATA_NODES_PREFIX =
"distributionZone.dataNodes.";
+ /** Key prefix for zones' logical topology nodes. */
+ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY =
"distributionZones.logicalTopology";
+
+ /** Key prefix for zones' logical topology version. */
+ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION =
"distributionZones.logicalTopologyVersion";
+
/** The key, needed for processing the event about zones' update was
triggered only once. */
- private static final ByteArray ZONES_CHANGE_TRIGGER_KEY = new
ByteArray("distributionZones.change.trigger");
+ private static final ByteArray DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY = new
ByteArray("distributionZones.change.trigger");
+
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
+ private static final ByteArray DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
+
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
+ private static final ByteArray
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
+ new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_PREFIX}. */
static ByteArray zoneDataNodesKey(int zoneId) {
@@ -48,7 +62,23 @@ class DistributionZonesUtil {
* The key, needed for processing the event about zones' update was
triggered only once.
*/
static ByteArray zonesChangeTriggerKey() {
- return ZONES_CHANGE_TRIGGER_KEY;
+ return DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY;
+ }
+
+ /**
+ * The key that represents logical topology nodes, needed for distribution
zones. It is needed to store them in the metastore
+ * to serialize data nodes changes triggered by topology changes and
changes of distribution zones configurations.
+ */
+ static ByteArray zonesLogicalTopologyKey() {
+ return DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY;
+ }
+
+ /**
+ * The key needed for processing the events about logical topology changes.
+ * Needed for the defencing against stale updates of logical topology
nodes.
+ */
+ static ByteArray zonesLogicalTopologyVersionKey() {
+ return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
}
/**
@@ -70,16 +100,30 @@ class DistributionZonesUtil {
*
* @param zoneId Distribution zone id
* @param revision Revision of the event.
- * @param logicalTopologyBytes Logical topology.
+ * @param logicalTopology Logical topology.
* @return Update command for the meta storage.
*/
- static Update updateDataNodesAndTriggerKey(int zoneId, long revision,
byte[] logicalTopologyBytes) {
+ static Update updateDataNodesAndTriggerKey(int zoneId, long revision,
Set<String> logicalTopology) {
return ops(
- put(zoneDataNodesKey(zoneId), logicalTopologyBytes),
+ put(zoneDataNodesKey(zoneId),
ByteUtils.toBytes(logicalTopology)),
put(zonesChangeTriggerKey(), ByteUtils.longToBytes(revision))
).yield(true);
}
+ /**
+ * Updates logical topology and logical topology version values for zones.
+ *
+ * @param logicalTopology Logical topology.
+ * @param topologyVersion Logical topology version.
+ * @return Update command for the meta storage.
+ */
+ static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology,
long topologyVersion) {
+ return ops(
+ put(zonesLogicalTopologyVersionKey(),
ByteUtils.longToBytes(topologyVersion)),
+ put(zonesLogicalTopologyKey(),
ByteUtils.toBytes(logicalTopology))
+ ).yield(true);
+ }
+
/**
* Sets {@code revision} to {@link
DistributionZonesUtil#zonesChangeTriggerKey()}.
*
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 2ea6019e54..e8325efc94 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.internal.metastorage.client.MetaStorageServiceIm
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -41,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
@@ -84,6 +86,9 @@ public class DistributionZoneManagerConfigurationChangesTest
extends IgniteAbstr
private ConfigurationManager clusterCfgMgr;
+ @Mock
+ private LogicalTopologyServiceImpl logicalTopologyService;
+
@BeforeEach
public void setUp() {
clusterCfgMgr = new ConfigurationManager(
@@ -101,14 +106,21 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
cmgManager = mock(ClusterManagementGroupManager.class);
+ logicalTopologyService = mock(LogicalTopologyServiceImpl.class);
+
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
metaStorageManager,
- cmgManager
+ cmgManager,
+ logicalTopologyService
);
clusterCfgMgr.start();
+ doNothing().when(logicalTopologyService).addEventListener(any());
+
+
when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(new
LogicalTopologySnapshot(1, Set.of())));
+
distributionZoneManager.start();
AtomicLong raftIndex = new AtomicLong();
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
new file mode 100644
index 0000000000..cc7272217a
--- /dev/null
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import
org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import
org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import
org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
+import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.ReadCommand;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests reactions to topology changes in accordance with distribution zones
logic.
+ */
+public class DistributionZoneManagerLogicalTopologyEventsTest {
+ @Mock
+ private ClusterManagementGroupManager cmgManager;
+
+ private DistributionZoneManager distributionZoneManager;
+
+ private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+ private ConfigurationManager clusterCfgMgr;
+
+ private LogicalTopology topology;
+
+ private ClusterStateStorage clusterStateStorage;
+
+ private DistributionZoneManager prepareDistributionZoneManager() {
+ clusterCfgMgr = new ConfigurationManager(
+ List.of(DistributionZonesConfiguration.KEY),
+ Map.of(),
+ new TestConfigurationStorage(DISTRIBUTED),
+ List.of(),
+ List.of()
+ );
+
+ DistributionZonesConfiguration zonesConfiguration =
clusterCfgMgr.configurationRegistry()
+ .getConfiguration(DistributionZonesConfiguration.KEY);
+
+ MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
+
+ cmgManager = mock(ClusterManagementGroupManager.class);
+
+ clusterStateStorage = new TestClusterStateStorage();
+
+ topology = new LogicalTopologyImpl(clusterStateStorage);
+
+ LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(topology, cmgManager);
+
+ distributionZoneManager = new DistributionZoneManager(
+ zonesConfiguration,
+ metaStorageManager,
+ cmgManager,
+ logicalTopologyService
+ );
+
+ clusterCfgMgr.start();
+
+ AtomicLong raftIndex = new AtomicLong();
+
+ keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+ MetaStorageListener metaStorageListener = new
MetaStorageListener(keyValueStorage);
+
+ RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+ // Delegate directly to listener.
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public WriteCommand command() {
+ return (WriteCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onWrite(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any(WriteCommand.class));
+
+ lenient().doAnswer(
+ invocationClose -> {
+ Command cmd = invocationClose.getArgument(0);
+
+ long commandIndex = raftIndex.incrementAndGet();
+
+ CompletableFuture<Serializable> res = new
CompletableFuture<>();
+
+ CommandClosure<ReadCommand> clo = new CommandClosure<>() {
+ /** {@inheritDoc} */
+ @Override
+ public long index() {
+ return commandIndex;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReadCommand command() {
+ return (ReadCommand) cmd;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void result(@Nullable Serializable r) {
+ if (r instanceof Throwable) {
+ res.completeExceptionally((Throwable) r);
+ } else {
+ res.complete(r);
+ }
+ }
+ };
+
+ try {
+ metaStorageListener.onRead(List.of(clo).iterator());
+ } catch (Throwable e) {
+ res.completeExceptionally(new
IgniteInternalException(e));
+ }
+
+ return res;
+ }
+ ).when(metaStorageService).run(any(ReadCommand.class));
+
+ MetaStorageCommandsFactory commandsFactory = new
MetaStorageCommandsFactory();
+
+ lenient().doAnswer(invocationClose -> {
+ If iif = invocationClose.getArgument(0);
+
+ MultiInvokeCommand multiInvokeCommand =
commandsFactory.multiInvokeCommand().iif(toIfInfo(iif,
commandsFactory)).build();
+
+ return metaStorageService.run(multiInvokeCommand).thenApply(bi ->
new StatementResult(((StatementResultInfo) bi).result()));
+ }).when(metaStorageManager).invoke(any());
+
+ lenient().doAnswer(invocationClose -> {
+ ByteArray key = invocationClose.getArgument(0);
+
+ GetCommand getCommand =
commandsFactory.getCommand().key(key.bytes()).build();
+
+ return metaStorageService.run(getCommand).thenApply(bi -> {
+ SingleEntryResponse resp = (SingleEntryResponse) bi;
+
+ return new EntryImpl(new ByteArray(resp.key()), resp.value(),
resp.revision(), resp.updateCounter());
+ });
+ }).when(metaStorageManager).get(any());
+
+ return distributionZoneManager;
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ distributionZoneManager.stop();
+
+ clusterCfgMgr.stop();
+
+ keyValueStorage.close();
+
+ clusterStateStorage.destroy();
+ }
+
+ @Test
+ void testMetaStorageKeysInitializedOnStartWhenTopVerEmpty() throws
Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
+
+ mockCmgLocalNodes(1L, clusterNodes);
+
+ distributionZoneManager1.start();
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ assertLogicalTopVer(1L);
+
+ assertLogicalTopology(clusterNodes);
+ }
+
+ @Test
+ void testMetaStorageKeysInitializedOnStartWhenTopVerIsLessThanCmgTopVer()
throws Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
+
+ mockCmgLocalNodes(2L, clusterNodes);
+
+ keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytes(1L));
+
+ distributionZoneManager1.start();
+
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+ assertLogicalTopVer(2L);
+
+ assertLogicalTopology(clusterNodes);
+ }
+
+ @Test
+ void testMetaStorageKeysInitializedOnStartWhenTopVerEqualsToCmgTopVer()
throws Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
+
+ mockCmgLocalNodes(2L, clusterNodes);
+
+ keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytes(2L));
+
+ distributionZoneManager1.start();
+
+ verify(keyValueStorage, after(500).never()).invoke(any());
+
+ assertLogicalTopVer(2L);
+
+ assertLogicalTopology(null);
+ }
+
+ @Test
+ void testMetaStorageKeysInitializedOnStartWhenTopVerGreaterThanCmgTopVer()
throws Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1",
null));
+
+ mockCmgLocalNodes(2L, clusterNodes);
+
+ keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytes(3L));
+
+ distributionZoneManager1.start();
+
+ verify(keyValueStorage, after(500).never()).invoke(any());
+
+ assertLogicalTopVer(3L);
+
+ assertLogicalTopology(null);
+ }
+
+ @Test
+ void testNodeAddingUpdatesLogicalTopologyInMetaStorage() throws Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+ mockCmgLocalNodes(1L, clusterNodes);
+
+ distributionZoneManager1.start();
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node2);
+
+ var clusterNodes2 = Set.of(node1, node2);
+
+ assertLogicalTopology(clusterNodes2);
+
+ assertLogicalTopVer(2L);
+ }
+
+ @Test
+ void testNodeStaleAddingDoNotUpdatesLogicalTopologyInMetaStorage() throws
Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+ mockCmgLocalNodes(1L, clusterNodes);
+
+ distributionZoneManager1.start();
+
+ keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytes(4L));
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node2);
+
+ assertEquals(2L, topology.getLogicalTopology().version());
+
+ assertLogicalTopology(clusterNodes);
+
+ assertLogicalTopVer(4L);
+ }
+
+ @Test
+ void testNodeRemovingUpdatesLogicalTopologyInMetaStorage() throws
Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ topology.putNode(node2);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1, node2);
+
+ mockCmgLocalNodes(2L, clusterNodes);
+
+ distributionZoneManager1.start();
+
+ assertLogicalTopology(clusterNodes);
+
+ topology.removeNodes(Set.of(node2));
+
+ var clusterNodes2 = Set.of(node1);
+
+ assertLogicalTopology(clusterNodes2);
+
+ assertLogicalTopVer(3L);
+ }
+
+ @Test
+ void testNodeStaleRemovingDoNotUpdatesLogicalTopologyInMetaStorage()
throws Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ topology.putNode(node2);
+
+ assertEquals(2L, topology.getLogicalTopology().version());
+
+ Set<ClusterNode> clusterNodes = Set.of(node1, node2);
+
+ mockCmgLocalNodes(2L, clusterNodes);
+
+ distributionZoneManager1.start();
+
+ keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytes(4L));
+
+ topology.removeNodes(Set.of(node2));
+
+ assertLogicalTopology(clusterNodes);
+
+ assertLogicalTopVer(4L);
+ }
+
+ @Test
+ void testTopologyLeapUpdatesLogicalTopologyInMetaStorage() throws
Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+ mockCmgLocalNodes(2L, clusterNodes);
+
+ distributionZoneManager1.start();
+
+ assertLogicalTopology(clusterNodes);
+
+ var clusterNodes2 = Set.of(node1, node2);
+
+ clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new
LogicalTopologySnapshot(10L, clusterNodes2)));
+
+ topology.fireTopologyLeap();
+
+ assertLogicalTopology(clusterNodes2);
+
+ assertLogicalTopVer(10L);
+ }
+
+ @Test
+ void testStaleTopologyLeapDoNotUpdatesLogicalTopologyInMetaStorage()
throws Exception {
+ DistributionZoneManager distributionZoneManager1 =
prepareDistributionZoneManager();
+
+ ClusterNode node1 = new ClusterNode("1", "name1", new
NetworkAddress("localhost", 123));
+
+ ClusterNode node2 = new ClusterNode("2", "name2", new
NetworkAddress("localhost", 123));
+
+ topology.putNode(node1);
+
+ Set<ClusterNode> clusterNodes = Set.of(node1);
+
+ mockCmgLocalNodes(2L, clusterNodes);
+
+ distributionZoneManager1.start();
+
+ assertLogicalTopology(clusterNodes);
+
+ var clusterNodes2 = Set.of(node1, node2);
+
+ clusterStateStorage.put(LOGICAL_TOPOLOGY_KEY, ByteUtils.toBytes(new
LogicalTopologySnapshot(10L, clusterNodes2)));
+
+ keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytes(11L));
+
+ topology.fireTopologyLeap();
+
+ assertLogicalTopology(clusterNodes);
+
+ assertLogicalTopVer(11L);
+ }
+
+ private LogicalTopologySnapshot mockCmgLocalNodes(long version,
Set<ClusterNode> clusterNodes) {
+ LogicalTopologySnapshot logicalTopologySnapshot = new
LogicalTopologySnapshot(version, clusterNodes);
+
+
when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
+
+ return logicalTopologySnapshot;
+ }
+
+ private void assertLogicalTopVer(long topVer) throws InterruptedException {
+ assertTrue(
+ waitForCondition(
+ () ->
ByteUtils.bytesToLong(keyValueStorage.get(zonesLogicalTopologyVersionKey().bytes()).value())
== topVer, 1000
+ )
+ );
+ }
+
+ private void assertLogicalTopology(@Nullable Set<ClusterNode>
clusterNodes) throws InterruptedException {
+ byte[] nodes = clusterNodes == null
+ ? null
+ :
ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+
+ assertTrue(waitForCondition(() ->
Arrays.equals(keyValueStorage.get(zonesLogicalTopologyKey().bytes()).value(),
nodes), 1000));
+ }
+}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
index f3b43f7b9f..9db029085d 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.ConfigurationChangeException;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import
org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
@@ -70,7 +71,8 @@ class DistributionZoneManagerTest extends IgniteAbstractTest {
distributionZoneManager = new DistributionZoneManager(
zonesConfiguration,
mock(MetaStorageManager.class),
- mock(ClusterManagementGroupManager.class)
+ mock(ClusterManagementGroupManager.class),
+ mock(LogicalTopologyServiceImpl.class)
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 122fc48577..1b27cdee86 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -159,6 +160,7 @@ public class ItDistributedConfigurationStorageTest {
* @see <a
href="https://issues.apache.org/jira/browse/IGNITE-15213">IGNITE-15213</a>
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18410")
void testRestartWithPds(@WorkDirectory Path workDir, TestInfo testInfo)
throws Exception {
var node = new Node(testInfo, workDir);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index ba9eb80043..30f2a12a9e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -158,6 +158,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
* @throws Exception If failed.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18410")
public void testTableAlreadyCreatedFromLaggedNode() throws Exception {
clusterNodes.forEach(ign ->
assertNull(ign.tables().table(TABLE_NAME)));
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 70af11ea08..aa2930acb5 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -352,7 +352,7 @@ public class IgniteImpl implements Ignite {
DistributionZonesConfiguration zonesConfiguration =
clusterCfgMgr.configurationRegistry()
.getConfiguration(DistributionZonesConfiguration.KEY);
- distributionZoneManager = new
DistributionZoneManager(zonesConfiguration, metaStorageMgr, cmgMgr);
+ distributionZoneManager = new
DistributionZoneManager(zonesConfiguration, metaStorageMgr, cmgMgr,
logicalTopologyService);
restComponent = createRestComponent(name);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index cba14d829f..c2add413ee 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -210,8 +210,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
private long resolveRevision(@Nullable VaultEntry appliedRevEntry,
@Nullable VaultEntry revisionsEntry) {
long appliedRevision = appliedRevEntry == null ? 0L :
ByteUtils.bytesToLong(appliedRevEntry.value());
- long cfgRevision = appliedRevision;
-
if (revisionsEntry != null) {
byte[] value = revisionsEntry.value();
long prevMasterKeyRevision = ByteUtils.bytesToLong(value, 0);
@@ -219,10 +217,11 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
// If current master key revision is higher than applied revision,
then node failed
// before applied revision changed, so we have to use previous
master key revision
- cfgRevision = curMasterKeyRevision <= appliedRevision ?
curMasterKeyRevision : prevMasterKeyRevision;
+ return curMasterKeyRevision <= appliedRevision ?
curMasterKeyRevision : prevMasterKeyRevision;
+ } else {
+ // Configuration has not been updated yet, so it is safe to return
0 as the revision for the master key.
+ return 0L;
}
-
- return cfgRevision;
}
private Data readDataOnRecovery0(long cfgRevision) {