This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5aae4662f4 IGNITE-19574 Restore global states of
DistributionZoneManager after restart (#2172)
5aae4662f4 is described below
commit 5aae4662f4baa0abc0d91799f182261d146769ba
Author: Mirza Aliev <[email protected]>
AuthorDate: Tue Jun 20 15:17:57 2023 +0400
IGNITE-19574 Restore global states of DistributionZoneManager after restart
(#2172)
---
.../distributionzones/DistributionZoneManager.java | 192 +++++------
.../distributionzones/DistributionZonesUtil.java | 61 +++-
.../DistributionZoneManagerAlterFilterTest.java | 4 +-
...ibutionZoneManagerConfigurationChangesTest.java | 18 +-
.../DistributionZoneManagerFilterTest.java | 4 +-
...butionZoneManagerLogicalTopologyEventsTest.java | 42 +--
.../DistributionZoneManagerScaleUpTest.java | 50 ++-
.../DistributionZoneManagerWatchListenerTest.java | 28 +-
.../DistributionZonesTestUtil.java | 18 +-
.../ignite/internal/BaseIgniteRestartTest.java | 370 +++++++++++++++++++++
...niteDistributionZoneManagerNodeRestartTest.java | 343 +++++++++++++++++++
.../app/ItIgniteInMemoryNodeRestartTest.java | 37 +--
.../runner/app/ItIgniteNodeRestartTest.java | 303 ++---------------
13 files changed, 950 insertions(+), 520 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 2be2641924..70f082753c 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -41,15 +41,19 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVault;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesNodesAttributesVault;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -113,8 +117,8 @@ import
org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
@@ -233,7 +237,7 @@ public class DistributionZoneManager implements
IgniteComponent {
*
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/distribution-zones/tech-notes/filters.md">Filter
documentation</a>
*/
- private final Map<String, Map<String, String>> nodesAttributes;
+ private Map<String, Map<String, String>> nodesAttributes;
/** Watch listener for logical topology keys. */
private final WatchListener topologyWatchListener;
@@ -333,9 +337,9 @@ public class DistributionZoneManager implements
IgniteComponent {
metaStorageManager.registerPrefixWatch(zonesLogicalTopologyPrefix(),
topologyWatchListener);
metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(),
dataNodesWatchListener);
- initDataNodesFromVaultManager();
+ restoreGlobalStateFromVault();
- initLogicalTopologyAndVersionInMetaStorageOnStart();
+ initDataNodesFromVaultManager();
} finally {
busyLock.leaveBusy();
}
@@ -921,11 +925,16 @@ public class DistributionZoneManager implements
IgniteComponent {
Condition updateCondition;
- if (topologyLeap) {
- updateCondition =
value(zonesLogicalTopologyVersionKey()).lt(ByteUtils.longToBytes(newTopology.version()));
+ if (newTopology.version() == 1) {
+ // Very first start of the cluster, so we just initialize
zonesLogicalTopologyVersionKey
+ updateCondition = notExists(zonesLogicalTopologyVersionKey());
} else {
- // This condition may be stronger, as far as we receive
topology events one by one.
- updateCondition =
value(zonesLogicalTopologyVersionKey()).eq(ByteUtils.longToBytes(newTopology.version()
- 1));
+ if (topologyLeap) {
+ updateCondition =
value(zonesLogicalTopologyVersionKey()).lt(longToBytes(newTopology.version()));
+ } else {
+ // This condition may be stronger, as far as we receive
topology events one by one.
+ updateCondition =
value(zonesLogicalTopologyVersionKey()).eq(longToBytes(newTopology.version() -
1));
+ }
}
Iif iff = iif(
@@ -961,81 +970,11 @@ public class DistributionZoneManager implements
IgniteComponent {
}
}
- /**
- * Initialises {@link DistributionZonesUtil#zonesLogicalTopologyKey()} and
- * {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} from
meta storage on the start of {@link DistributionZoneManager}.
- */
- private void initLogicalTopologyAndVersionInMetaStorageOnStart() {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
-
- try {
- CompletableFuture<Entry> zonesTopologyVersionFuture =
metaStorageManager.get(zonesLogicalTopologyVersionKey());
-
- CompletableFuture<LogicalTopologySnapshot> logicalTopologyFuture =
logicalTopologyService.logicalTopologyOnLeader();
-
- logicalTopologyFuture.thenAcceptBoth(zonesTopologyVersionFuture,
(snapshot, topVerEntry) -> {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
- }
-
- try {
- long topologyVersionFromCmg = snapshot.version();
-
- byte[] topVerFromMetaStorage = topVerEntry.value();
-
- if (topVerFromMetaStorage == null ||
bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
- Set<LogicalNode> topologyFromCmg = snapshot.nodes();
-
- Condition topologyVersionCondition =
topVerFromMetaStorage == null
- ? notExists(zonesLogicalTopologyVersionKey()) :
-
value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
-
- Iif iff = iif(topologyVersionCondition,
-
updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
- ops().yield(false)
- );
-
- metaStorageManager.invoke(iff).whenComplete((res, e)
-> {
- if (e != null) {
- LOG.error(
- "Failed to initialize distribution
zones' logical topology "
- + "and version keys [topology
= {}, version = {}]",
- e,
-
Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
- );
- } else if (res.getAsBoolean()) {
- LOG.debug(
- "Distribution zones' logical topology
and version keys were initialised "
- + "[topology = {}, version =
{}]",
-
Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
- );
- } else {
- LOG.debug(
- "Failed to initialize distribution
zones' logical topology "
- + "and version keys [topology
= {}, version = {}]",
-
Arrays.toString(topologyFromCmg.toArray()),
- topologyVersionFromCmg
- );
- }
- });
- }
- } finally {
- busyLock.leaveBusy();
- }
- });
- } finally {
- busyLock.leaveBusy();
- }
- }
-
/**
* Initialises data nodes of distribution zones in meta storage
* from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
*/
+ // TODO: will be refactored in
https://issues.apache.org/jira/browse/IGNITE-19580
private void initDataNodesFromVaultManager() {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
@@ -1044,44 +983,50 @@ public class DistributionZoneManager implements
IgniteComponent {
try {
long appliedRevision = metaStorageManager.appliedRevision();
- VaultEntry topologyEntry =
vaultMgr.get(zonesLogicalTopologyKey()).join();
-
- if (topologyEntry != null && topologyEntry.value() != null) {
- assert appliedRevision > 0 : "The meta storage last applied
revision is 0 but the logical topology is not null.";
-
- logicalTopology = fromBytes(topologyEntry.value());
-
- logicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(),
n.nodeAttributes()));
-
+ if (!logicalTopology.isEmpty()) {
// init keys and data nodes for default zone
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
DEFAULT_ZONE_ID,
appliedRevision,
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
);
-
- zonesConfiguration.distributionZones().value().forEach(zone ->
{
- int zoneId = zone.zoneId();
-
- saveDataNodesAndUpdateTriggerKeysInMetaStorage(
- zoneId,
- appliedRevision,
-
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
- );
- });
}
zonesState.values().forEach(zoneState -> {
zoneState.nodes(logicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet()));
});
- assert topologyEntry == null || topologyEntry.value() == null ||
logicalTopology.equals(fromBytes(topologyEntry.value()))
- : "Initial value of logical topology was changed after
initialization from the vault manager.";
} finally {
busyLock.leaveBusy();
}
}
+ /**
+ * Restores from vault logical topology and nodes' attributes fields in
{@link DistributionZoneManager} after restart.
+ */
+ private void restoreGlobalStateFromVault() {
+ VaultEntry topologyEntry =
vaultMgr.get(zonesLogicalTopologyVault()).join();
+
+ VaultEntry nodeAttributesEntry =
vaultMgr.get(zonesNodesAttributesVault()).join();
+
+ if (topologyEntry != null && topologyEntry.value() != null) {
+ assert nodeAttributesEntry != null : "Nodes' attributes cannot be
null when logical topology is not null.";
+ assert nodeAttributesEntry.value() != null : "Nodes' attributes
cannot be null when logical topology is not null.";
+
+ logicalTopology = fromBytes(topologyEntry.value());
+
+ nodesAttributes = fromBytes(nodeAttributesEntry.value());
+ }
+
+ assert topologyEntry == null || topologyEntry.value() == null ||
logicalTopology.equals(fromBytes(topologyEntry.value()))
+ : "Initial value of logical topology was changed after
initialization from the vault manager.";
+
+ assert nodeAttributesEntry == null
+ || nodeAttributesEntry.value() == null
+ ||
nodesAttributes.equals(fromBytes(nodeAttributesEntry.value()))
+ : "Initial value of nodes' attributes was changed after
initialization from the vault manager.";
+ }
+
/**
* Creates watch listener which listens logical topology and logical
topology version.
*
@@ -1101,7 +1046,7 @@ public class DistributionZoneManager implements
IgniteComponent {
+ evt.entryEvents().stream().map(entry ->
entry.newEntry() == null ? "null" : entry.newEntry().key())
.collect(toList());
- byte[] newLogicalTopologyBytes = null;
+ byte[] newLogicalTopologyBytes;
Set<NodeWithAttributes> newLogicalTopology = null;
@@ -1122,6 +1067,17 @@ public class DistributionZoneManager implements
IgniteComponent {
assert newLogicalTopology != null : "The event doesn't
contain logical topology";
assert revision > 0 : "The event doesn't contain logical
topology version";
+ VaultEntry globalStateRevision =
vaultMgr.get(zonesGlobalStateRevision()).join();
+
+ if (globalStateRevision != null) {
+ // This means that we have already handled event with
this revision.
+ // It is possible when node was restarted after this
listener completed,
+ // but applied revision didn't have time to be
propagated to the Vault.
+ if (bytesToLong(globalStateRevision.value()) >=
revision) {
+ return completedFuture(null);
+ }
+ }
+
Set<NodeWithAttributes> newLogicalTopology0 =
newLogicalTopology;
Set<Node> removedNodes =
@@ -1136,10 +1092,6 @@ public class DistributionZoneManager implements
IgniteComponent {
.map(NodeWithAttributes::node)
.collect(toSet());
- newLogicalTopology.forEach(n ->
nodesAttributes.put(n.nodeId(), n.nodeAttributes()));
-
- logicalTopology = newLogicalTopology;
-
NamedConfigurationTree<DistributionZoneConfiguration,
DistributionZoneView, DistributionZoneChange> zones =
zonesConfiguration.distributionZones();
@@ -1153,6 +1105,8 @@ public class DistributionZoneManager implements
IgniteComponent {
scheduleTimers(defaultZoneView, addedNodes, removedNodes,
revision);
+
updateLogicalTopologyNodeAttributesAndSaveToVault(newLogicalTopology, revision);
+
return completedFuture(null);
} finally {
busyLock.leaveBusy();
@@ -1166,6 +1120,29 @@ public class DistributionZoneManager implements
IgniteComponent {
};
}
+ /**
+ * Updates local representation of logical topology and nodes' attributes
map and saves it to vault atomically in one batch.
+ * After restart it could be used to restore these fields.
+ *
+ * @param newLogicalTopology Logical topology.
+ * @param revision Revision of the event, which triggers update.
+ */
+ private void
updateLogicalTopologyNodeAttributesAndSaveToVault(Set<NodeWithAttributes>
newLogicalTopology, long revision) {
+ newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(),
n.nodeAttributes()));
+
+ logicalTopology = newLogicalTopology;
+
+ Map<ByteArray, byte[]> batch = IgniteUtils.newHashMap(2);
+
+ batch.put(zonesLogicalTopologyVault(), toBytes(newLogicalTopology));
+
+ batch.put(zonesNodesAttributesVault(), toBytes(nodesAttributes()));
+
+ batch.put(zonesGlobalStateRevision(), longToBytes(revision));
+
+ vaultMgr.putAll(batch).join();
+ }
+
/**
* Creates watch listener which listens data nodes, scale up revision and
scale down revision.
*
@@ -1871,4 +1848,9 @@ public class DistributionZoneManager implements
IgniteComponent {
Map<Integer, ZoneState> zonesState() {
return zonesState;
}
+
+ @TestOnly
+ public Set<NodeWithAttributes> logicalTopology() {
+ return logicalTopology;
+ }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 57b5e6ca7c..2046629c2a 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -73,9 +73,18 @@ public class DistributionZonesUtil {
/** Key prefix for zones' logical topology nodes and logical topology
version. */
private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX =
"distributionZones.logicalTopology.";
+ /** Key value for zones' nodes' attributes in vault. */
+ private static final String DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT =
"vault.distributionZones.nodesAttributes";
+
+ /** Key value for zones' global state revision in vault. */
+ private static final String DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT
= "vault.distributionZones.globalState.revision";
+
/** Key prefix for zones' logical topology nodes. */
private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY =
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes";
+ /** Key prefix for zones' logical topology nodes in vault. */
+ private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VAULT =
"vault." + DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY;
+
/** Key prefix for zones' logical topology version. */
private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION =
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "version";
@@ -83,7 +92,17 @@ public class DistributionZonesUtil {
private static final String DISTRIBUTION_ZONES_CHANGE_TRIGGER_KEY_PREFIX =
"distributionZones.change.trigger.";
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY}. */
- private static final ByteArray DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
+ private static final ByteArray DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY);
+
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VAULT}. */
+ private static final ByteArray
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VAULT_KEY = new
ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VAULT);
+
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT}. */
+ private static final ByteArray
DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT_KEY = new
ByteArray(DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT);
+
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT}. */
+ public static final ByteArray
DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY =
+ new ByteArray(DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT);
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
private static final ByteArray
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
@@ -123,7 +142,7 @@ public class DistributionZonesUtil {
*
* @return ByteArray representation.
*/
- public static ByteArray zonesLogicalTopologyPrefix() {
+ static ByteArray zonesLogicalTopologyPrefix() {
return new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX);
}
@@ -155,13 +174,6 @@ public class DistributionZonesUtil {
return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX
+ zoneId);
}
- /**
- * The key prefix needed for processing an event about zone's data node
propagation on scale up.
- */
- public static ByteArray zoneScaleUpChangeTriggerKey() {
- return new ByteArray(DISTRIBUTION_ZONE_SCALE_UP_CHANGE_TRIGGER_PREFIX);
- }
-
/**
* The key needed for processing an event about zone's data node
propagation on scale down.
* With this key we can be sure that event was triggered only once.
@@ -170,19 +182,12 @@ public class DistributionZonesUtil {
return new
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX + zoneId);
}
- /**
- * The key prefix needed for processing an event about zone's data node
propagation on scale down.
- */
- public static ByteArray zoneScaleDownChangeTriggerKey() {
- return new
ByteArray(DISTRIBUTION_ZONE_SCALE_DOWN_CHANGE_TRIGGER_PREFIX);
- }
-
/**
* The key that represents logical topology nodes, needed for distribution
zones. It is needed to store them in the metastore
* to serialize data nodes changes triggered by topology changes and
changes of distribution zones configurations.
*/
public static ByteArray zonesLogicalTopologyKey() {
- return DISTRIBUTION_ZONE_LOGICAL_TOPOLOGY_KEY;
+ return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_KEY;
}
/**
@@ -193,6 +198,28 @@ public class DistributionZonesUtil {
return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY;
}
+ /**
+ * The key that represents logical topology nodes in vault.
+ */
+ public static ByteArray zonesLogicalTopologyVault() {
+ return DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VAULT_KEY;
+ }
+
+ /**
+ * The key that represents nodes' attributes in vault.
+ */
+ public static ByteArray zonesNodesAttributesVault() {
+ return DISTRIBUTION_ZONES_NODES_ATTRIBUTES_VAULT_KEY;
+ }
+
+ /**
+ * The key represents zones' global state revision in vault. This is the
revision of the event that triggered saving the global state
+ * of Distribution Zone Manager to Vault.
+ */
+ public static ByteArray zonesGlobalStateRevision() {
+ return DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY;
+ }
+
/**
* The key prefix needed for processing an event about zone's data nodes.
*/
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
index 844bd0ec37..e44b9a8b48 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
@@ -329,12 +329,12 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
* @throws Exception If failed
*/
private void preparePrerequisites(int scaleUpTimer, int scaleDownTimer,
int zoneId) throws Exception {
+ startDistributionZoneManager();
+
topology.putNode(A);
topology.putNode(B);
topology.putNode(C);
- startDistributionZoneManager();
-
if (zoneId == DEFAULT_ZONE_ID) {
distributionZoneManager.alterZone(
DEFAULT_ZONE_NAME,
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 5da82e9a2b..dc8f0b217d 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -27,8 +27,10 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVault;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesNodesAttributesVault;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
@@ -43,7 +45,9 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
@@ -113,8 +117,6 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
// Mock logical topology for distribution zone.
vaultMgr = new VaultManager(new InMemoryVaultService());
- assertThat(vaultMgr.put(zonesLogicalTopologyKey(), toBytes(nodes)),
willCompleteSuccessfully());
- assertThat(vaultMgr.put(zonesLogicalTopologyVersionKey(),
longToBytes(0)), willCompleteSuccessfully());
LogicalTopologyService logicalTopologyService =
mock(LogicalTopologyService.class);
@@ -129,6 +131,16 @@ public class
DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
metaStorageManager = StandaloneMetaStorageManager.create(vaultMgr,
keyValueStorage);
+ assertThat(vaultMgr.put(zonesLogicalTopologyVault(), toBytes(nodes)),
willCompleteSuccessfully());
+
+ Map<String, Map<String, String>> nodesAttributes = new
ConcurrentHashMap<>();
+ nodes.forEach(n -> nodesAttributes.put(n.nodeId(),
n.nodeAttributes()));
+ assertThat(vaultMgr.put(zonesNodesAttributesVault(),
toBytes(nodesAttributes)), willCompleteSuccessfully());
+
+ assertThat(vaultMgr.put(zonesGlobalStateRevision(),
longToBytes(metaStorageManager.appliedRevision())), willCompleteSuccessfully());
+
+ assertThat(vaultMgr.put(zonesLogicalTopologyVersionKey(),
longToBytes(0)), willCompleteSuccessfully());
+
metaStorageManager.put(zonesLogicalTopologyVersionKey(),
longToBytes(0));
distributionZoneManager = new DistributionZoneManager(
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
index 236cd8adb9..afcff858dc 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
@@ -117,12 +117,12 @@ public class DistributionZoneManagerFilterTest extends
BaseDistributionZoneManag
private void preparePrerequisites() throws Exception {
String filter = "$[?(@.storage == 'SSD' || @.region == 'US')]";
+ startDistributionZoneManager();
+
topology.putNode(A);
topology.putNode(B);
topology.putNode(C);
- startDistributionZoneManager();
-
distributionZoneManager.createZone(
new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
.dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index 21e14195cb..aec1781fc1 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -41,27 +41,13 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
@Test
void testMetaStorageKeysInitializedOnStartWhenTopVerEmpty() throws
Exception {
- topology.putNode(NODE_1);
-
distributionZoneManager.start();
- assertLogicalTopologyVersion(1L, keyValueStorage);
-
- assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
- }
-
- @Test
- void testMetaStorageKeysInitializedOnStartWhenTopVerIsLessThanCmgTopVer()
throws Exception {
topology.putNode(NODE_1);
- topology.putNode(NODE_2);
-
- keyValueStorage.put(zonesLogicalTopologyVersionKey().bytes(),
ByteUtils.longToBytes(1L), HybridTimestamp.MIN_VALUE);
- distributionZoneManager.start();
-
- assertLogicalTopologyVersion(2L, keyValueStorage);
+ assertLogicalTopologyVersion(1L, keyValueStorage);
- assertLogicalTopology(Set.of(NODE_1, NODE_2), keyValueStorage);
+ assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
}
@Test
@@ -88,10 +74,10 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
@Test
void testNodeAddingUpdatesLogicalTopologyInMetaStorage() throws Exception {
- topology.putNode(NODE_1);
-
distributionZoneManager.start();
+ topology.putNode(NODE_1);
+
topology.putNode(NODE_2);
var clusterNodes2 = Set.of(NODE_1, NODE_2);
@@ -103,12 +89,12 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
@Test
void testNodeStaleAddingDoNotUpdatesLogicalTopologyInMetaStorage() throws
Exception {
+ distributionZoneManager.start();
+
topology.putNode(NODE_1);
Set<LogicalNode> clusterNodes = Set.of(NODE_1);
- distributionZoneManager.start();
-
// Wait for Zone Manager to initialize Meta Storage on start.
assertLogicalTopologyVersion(1L, keyValueStorage);
@@ -125,14 +111,14 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
@Test
void testNodeRemovingUpdatesLogicalTopologyInMetaStorage() throws
Exception {
+ distributionZoneManager.start();
+
topology.putNode(NODE_1);
topology.putNode(NODE_2);
Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
- distributionZoneManager.start();
-
assertLogicalTopology(clusterNodes, keyValueStorage);
topology.removeNodes(Set.of(NODE_2));
@@ -146,6 +132,8 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
@Test
void testNodeStaleRemovingDoNotUpdatesLogicalTopologyInMetaStorage()
throws Exception {
+ distributionZoneManager.start();
+
topology.putNode(NODE_1);
topology.putNode(NODE_2);
@@ -154,8 +142,6 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
- distributionZoneManager.start();
-
// Wait for Zone Manager to initialize Meta Storage on start.
assertLogicalTopologyVersion(2L, keyValueStorage);
@@ -170,12 +156,12 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
@Test
void testTopologyLeapUpdatesLogicalTopologyInMetaStorage() throws
Exception {
+ distributionZoneManager.start();
+
topology.putNode(NODE_1);
Set<LogicalNode> clusterNodes = Set.of(NODE_1);
- distributionZoneManager.start();
-
assertLogicalTopology(clusterNodes, keyValueStorage);
var clusterNodes2 = Set.of(NODE_1, NODE_2);
@@ -191,12 +177,12 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest extends BaseDistri
@Test
void testStaleTopologyLeapDoNotUpdatesLogicalTopologyInMetaStorage()
throws Exception {
+ distributionZoneManager.start();
+
topology.putNode(NODE_1);
Set<LogicalNode> clusterNodes = Set.of(NODE_1);
- distributionZoneManager.start();
-
assertLogicalTopology(clusterNodes, keyValueStorage);
var clusterNodes2 = Set.of(NODE_1, NODE_2);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 753358bccd..d6b47b81ea 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -89,12 +89,12 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testDataNodesPropagationAfterScaleUpTriggered() throws Exception {
+ startDistributionZoneManager();
+
topology.putNode(NODE_1);
Set<LogicalNode> clusterNodes = Set.of(NODE_1);
- startDistributionZoneManager();
-
assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
topology.putNode(NODE_2);
@@ -145,14 +145,14 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testDataNodesPropagationAfterScaleDownTriggered() throws Exception {
+ startDistributionZoneManager();
+
topology.putNode(NODE_1);
topology.putNode(NODE_2);
Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
- startDistributionZoneManager();
-
assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
distributionZoneManager.createZone(
@@ -175,12 +175,12 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testDataNodesPropagationForDefaultZoneAfterScaleUpTriggered() throws
Exception {
+ startDistributionZoneManager();
+
topology.putNode(NODE_1);
Set<LogicalNode> clusterNodes = Set.of(NODE_1);
- startDistributionZoneManager();
-
assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
distributionZoneManager.alterZone(
@@ -210,14 +210,14 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testDataNodesPropagationForDefaultZoneAfterScaleDownTriggered()
throws Exception {
+ startDistributionZoneManager();
+
topology.putNode(NODE_1);
topology.putNode(NODE_2);
Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
- startDistributionZoneManager();
-
assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
distributionZoneManager.alterZone(
@@ -247,10 +247,10 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testDropZoneDoNotPropagateDataNodesAfterScaleUp() throws Exception {
- topology.putNode(NODE_1);
-
startDistributionZoneManager();
+ topology.putNode(NODE_1);
+
topology.putNode(NODE_2);
Set<LogicalNode> clusterNodes2 = Set.of(NODE_1, NODE_2);
@@ -277,12 +277,12 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testDropZoneDoNotPropagateDataNodesAfterScaleDown() throws Exception {
+ startDistributionZoneManager();
+
topology.putNode(NODE_1);
topology.putNode(NODE_2);
- startDistributionZoneManager();
-
topology.removeNodes(Set.of(NODE_2));
Set<LogicalNode> clusterNodes2 = Set.of(NODE_1);
@@ -617,8 +617,6 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
void testEmptyDataNodesOnStart() throws Exception {
startDistributionZoneManager();
- assertLogicalTopology(Set.of(), keyValueStorage);
-
distributionZoneManager.createZone(
new
DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(0).build()
).get();
@@ -636,8 +634,6 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
void testUpdateZoneScaleUpTriggersDataNodePropagation() throws Exception {
startDistributionZoneManager();
- assertLogicalTopology(Set.of(), keyValueStorage);
-
distributionZoneManager.createZone(
new
DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(100).build()
).get();
@@ -660,10 +656,10 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testUpdateZoneScaleDownTriggersDataNodePropagation() throws Exception
{
- topology.putNode(NODE_1);
-
startDistributionZoneManager();
+ topology.putNode(NODE_1);
+
assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
distributionZoneManager.createZone(
@@ -722,8 +718,6 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
void testScaleUpSetToMaxInt() throws Exception {
startDistributionZoneManager();
- assertLogicalTopology(Set.of(), keyValueStorage);
-
distributionZoneManager.createZone(
new
DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(100).build()
).get();
@@ -750,10 +744,10 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void testScaleDownSetToMaxInt() throws Exception {
- topology.putNode(NODE_1);
-
startDistributionZoneManager();
+ topology.putNode(NODE_1);
+
assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
distributionZoneManager.createZone(
@@ -782,15 +776,13 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
void
testScaleUpDidNotChangeDataNodesWhenTriggerKeyWasConcurrentlyChanged() throws
Exception {
startDistributionZoneManager();
- assertLogicalTopology(Set.of(), keyValueStorage);
-
distributionZoneManager.createZone(
new
DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE).build()
).get();
assertDataNodesForZone(ZONE_1_ID, Set.of(), keyValueStorage);
- assertZoneScaleDownChangeTriggerKey(3L, ZONE_1_ID, keyValueStorage);
+ assertZoneScaleUpChangeTriggerKey(2L, ZONE_1_ID, keyValueStorage);
doAnswer(invocation -> {
If iif = invocation.getArgument(0);
@@ -822,10 +814,10 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
@Test
void
testScaleDownDidNotChangeDataNodesWhenTriggerKeyWasConcurrentlyChanged() throws
Exception {
- topology.putNode(NODE_1);
-
startDistributionZoneManager();
+ topology.putNode(NODE_1);
+
assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1),
keyValueStorage);
@@ -1309,6 +1301,8 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
}
private void preparePrerequisites(@Nullable String filter) throws
Exception {
+ startDistributionZoneManager();
+
LogicalNode a = new LogicalNode("1", "A", new
NetworkAddress("localhost", 123));
LogicalNode b = new LogicalNode("2", "B", new
NetworkAddress("localhost", 123));
@@ -1321,8 +1315,6 @@ public class DistributionZoneManagerScaleUpTest extends
BaseDistributionZoneMana
Set<LogicalNode> clusterNodes = Set.of(a, b, c);
- startDistributionZoneManager();
-
if (filter == null) {
distributionZoneManager.createZone(
new Builder(ZONE_1_NAME)
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index fdd6c16a58..f7576d6723 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -27,10 +27,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
import java.util.Collections;
import java.util.List;
@@ -39,6 +35,7 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -50,8 +47,9 @@ public class DistributionZoneManagerWatchListenerTest extends
BaseDistributionZo
private static final LogicalNode NODE_2 = new LogicalNode("node2",
"node2", new NetworkAddress("localhost", 123));
@Test
+ @Disabled("IGNITE-18564")
void testStaleWatchEvent() throws Exception {
- mockVaultZonesLogicalTopologyKey(Set.of(), vaultMgr);
+ mockVaultZonesLogicalTopologyKey(Set.of(NODE_1), vaultMgr,
metaStorageManager.appliedRevision());
startDistributionZoneManager();
@@ -75,7 +73,8 @@ public class DistributionZoneManagerWatchListenerTest extends
BaseDistributionZo
setLogicalTopologyInMetaStorage(nodes, 100, metaStorageManager);
- assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(), keyValueStorage);
+ // TODO: IGNITE-18564 This is incorrect to check that data nodes are
the same right after logical topology is changes manually.
+ assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1),
keyValueStorage);
}
@Test
@@ -89,12 +88,10 @@ public class DistributionZoneManagerWatchListenerTest
extends BaseDistributionZo
new LogicalNode(new ClusterNode("node2", "node2",
NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap())
);
- mockVaultZonesLogicalTopologyKey(nodes, vaultMgr);
+ mockVaultZonesLogicalTopologyKey(nodes, vaultMgr,
metaStorageManager.appliedRevision());
startDistributionZoneManager();
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
-
assertDataNodesForZone(DEFAULT_ZONE_ID, null, keyValueStorage);
}
@@ -105,21 +102,10 @@ public class DistributionZoneManagerWatchListenerTest
extends BaseDistributionZo
new LogicalNode(new ClusterNode("node2", "node2",
NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap())
);
- mockVaultZonesLogicalTopologyKey(nodes, vaultMgr);
+ mockVaultZonesLogicalTopologyKey(nodes, vaultMgr,
metaStorageManager.appliedRevision());
startDistributionZoneManager();
assertDataNodesForZone(DEFAULT_ZONE_ID, nodes, keyValueStorage);
}
-
- @Test
- void testLogicalTopologyIsNullOnZoneManagerStart1() {
- distributionZoneManager.start();
-
- // 1 invoke because only invoke to zones logical topology happens
- verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
-
-
assertNull(keyValueStorage.get(zoneDataNodesKey(DEFAULT_ZONE_ID).bytes()).value());
- assertNull(keyValueStorage.get(zoneDataNodesKey(1).bytes()).value());
- }
}
diff --git
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 16997064c5..92bd01f763 100644
---
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -24,23 +24,29 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVault;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesNodesAttributesVault;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -290,7 +296,7 @@ public class DistributionZonesTestUtil {
* @param keyValueStorage Key-value storage.
* @throws InterruptedException If thread was interrupted.
*/
- public static void assertLogicalTopologyVersion(long topVer,
KeyValueStorage keyValueStorage) throws InterruptedException {
+ public static void assertLogicalTopologyVersion(@Nullable Long topVer,
KeyValueStorage keyValueStorage) throws InterruptedException {
assertValueInStorage(
keyValueStorage,
zonesLogicalTopologyVersionKey().bytes(),
@@ -333,14 +339,20 @@ public class DistributionZonesTestUtil {
* @param nodes Logical topology
* @param vaultMgr Vault manager
*/
- public static void mockVaultZonesLogicalTopologyKey(Set<LogicalNode>
nodes, VaultManager vaultMgr) {
+ public static void mockVaultZonesLogicalTopologyKey(Set<LogicalNode>
nodes, VaultManager vaultMgr, long appliedRevision) {
Set<NodeWithAttributes> nodesWithAttributes = nodes.stream()
.map(n -> new NodeWithAttributes(n.name(), n.id(),
n.nodeAttributes()))
.collect(Collectors.toSet());
byte[] newLogicalTopology = toBytes(nodesWithAttributes);
- assertThat(vaultMgr.put(zonesLogicalTopologyKey(),
newLogicalTopology), willCompleteSuccessfully());
+ Map<String, Map<String, String>> nodesAttributes = new
ConcurrentHashMap<>();
+ nodesWithAttributes.forEach(n -> nodesAttributes.put(n.nodeId(),
n.nodeAttributes()));
+ assertThat(vaultMgr.put(zonesNodesAttributesVault(),
toBytes(nodesAttributes)), willCompleteSuccessfully());
+
+ assertThat(vaultMgr.put(zonesLogicalTopologyVault(),
newLogicalTopology), willCompleteSuccessfully());
+
+ assertThat(vaultMgr.put(zonesGlobalStateRevision(),
longToBytes(appliedRevision)), willCompleteSuccessfully());
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
new file mode 100644
index 0000000000..54c6b5fe73
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import static
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.configuration.ConfigurationModule;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.ConfigurationModules;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
+import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
+import
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
+import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.IgniteSystemProperties;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Base class for node's restart tests.
+ */
+public abstract class BaseIgniteRestartTest extends IgniteAbstractTest {
+ /** Default node port. */
+ protected static final int DEFAULT_NODE_PORT = 3344;
+
+ protected static final int DEFAULT_CLIENT_PORT = 10800;
+
+ @Language("HOCON")
+ protected static final String RAFT_CFG = "{\n"
+ + " fsync: false,\n"
+ + " retryDelay: 20\n"
+ + "}";
+
+ /** Nodes bootstrap configuration pattern. */
+ protected static final String NODE_BOOTSTRAP_CFG = "{\n"
+ + " network.port: {},\n"
+ + " network.nodeFinder.netClusterNodes: {}\n"
+ + " network.membership: {\n"
+ + " membershipSyncInterval: 1000,\n"
+ + " failurePingInterval: 500,\n"
+ + " scaleCube: {\n"
+ + " membershipSuspicionMultiplier: 1,\n"
+ + " failurePingRequestMembers: 1,\n"
+ + " gossipInterval: 10\n"
+ + " },\n"
+ + " },\n"
+ + " raft: " + RAFT_CFG + ",\n"
+ + " clientConnector.port: {}\n"
+ + "}";
+
+ public TestInfo testInfo;
+
+ protected static final List<String> CLUSTER_NODES_NAMES = new
ArrayList<>();
+
+ /** Cluster nodes. */
+ protected List<PartialNode> partialNodes;
+
+ protected static final long TIMEOUT_MILLIS = 10_000L;
+
+ @BeforeEach
+ void setUp(TestInfo testInfo) {
+ this.testInfo = testInfo;
+ this.partialNodes = new ArrayList<>();
+ }
+
+ /**
+ * Stops all started nodes.
+ */
+ @AfterEach
+ public void afterEachTest() throws Exception {
+ var closeables = new ArrayList<AutoCloseable>();
+
+ for (String name : CLUSTER_NODES_NAMES) {
+ if (name != null) {
+ closeables.add(() -> IgnitionManager.stop(name));
+ }
+ }
+
+ if (!partialNodes.isEmpty()) {
+ for (PartialNode partialNode : partialNodes) {
+ closeables.add(partialNode::stop);
+ }
+ }
+
+ IgniteUtils.closeAll(closeables);
+
+ CLUSTER_NODES_NAMES.clear();
+ }
+
+ /**
+ * Load configuration modules.
+ *
+ * @param log Log.
+ * @param classLoader Class loader.
+ * @return Configuration modules.
+ */
+ public static ConfigurationModules loadConfigurationModules(IgniteLogger
log, ClassLoader classLoader) {
+ var modulesProvider = new ServiceLoaderModulesProvider();
+ List<ConfigurationModule> modules =
modulesProvider.modules(classLoader);
+
+ if (log.isInfoEnabled()) {
+ log.info("Configuration modules loaded: {}", modules);
+ }
+
+ if (modules.isEmpty()) {
+ throw new IllegalStateException("No configuration modules were
loaded, this means Ignite cannot start. "
+ + "Please make sure that the classloader for loading
services is correct.");
+ }
+
+ var configModules = new ConfigurationModules(modules);
+
+ if (log.isInfoEnabled()) {
+ log.info("Local root keys: {}", configModules.local().rootKeys());
+ log.info("Distributed root keys: {}",
configModules.distributed().rootKeys());
+ }
+
+ return configModules;
+ }
+
+ /**
+ * Starts the Vault component.
+ */
+ public static VaultManager createVault(String nodeName, Path workDir) {
+ Path vaultPath = workDir.resolve(Paths.get("vault"));
+
+ try {
+ Files.createDirectories(vaultPath);
+ } catch (IOException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ return new VaultManager(new PersistentVaultService(nodeName,
vaultPath));
+ }
+
+ /**
+ * Find component of a given type in list.
+ * Note that it could be possible that in a list of components are
presented several instances of a one class.
+ *
+ * @param components Components list.
+ * @param cls Class.
+ * @param <T> Type parameter.
+ * @return Ignite component.
+ */
+ @Nullable
+ public static <T extends IgniteComponent> T
findComponent(List<IgniteComponent> components, Class<T> cls) {
+ for (IgniteComponent component : components) {
+ if (cls.isAssignableFrom(component.getClass())) {
+ return cls.cast(component);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Build a configuration string.
+ *
+ * @param idx Node index.
+ * @return Configuration string.
+ */
+ protected static String configurationString(int idx) {
+ int port = DEFAULT_NODE_PORT + idx;
+ int clientPort = DEFAULT_CLIENT_PORT + idx;
+
+ // The address of the first node.
+ @Language("HOCON") String connectAddr = "[localhost\":\"" +
DEFAULT_NODE_PORT + "]";
+
+ return IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, port,
connectAddr, clientPort);
+ }
+
+ /**
+ * Returns partial node. Chains deploying watches to configuration
notifications and waits for it,
+ * so returned partial node is started and ready to work.
+ *
+ * @param nodeCfgMgr Node configuration manager.
+ * @param clusterCfgMgr Cluster configuration manager..
+ * @param revisionCallback RevisionCallback Callback on storage revision
update.
+ * @param components Started components of a node.
+ * @param localConfigurationGenerator Local configuration generator.
+ * @param logicalTopology Logical topology.
+ * @param cfgStorage Distributed configuration storage..
+ * @param distributedConfigurationGenerator Distributes configuration
generator..
+ * @return Partial node.
+ */
+ public static PartialNode partialNode(
+ ConfigurationManager nodeCfgMgr,
+ ConfigurationManager clusterCfgMgr,
+ MetaStorageManager metaStorageMgr,
+ @Nullable Consumer<Long> revisionCallback,
+ List<IgniteComponent> components,
+ ConfigurationTreeGenerator localConfigurationGenerator,
+ LogicalTopologyImpl logicalTopology,
+ DistributedConfigurationStorage cfgStorage,
+ ConfigurationTreeGenerator distributedConfigurationGenerator,
+ ConfigurationRegistry clusterConfigRegistry
+ ) {
+ AtomicLong lastRevision = new AtomicLong();
+
+ Consumer<Long> revisionCallback0 = rev -> {
+ if (revisionCallback != null) {
+ revisionCallback.accept(rev);
+ }
+
+ lastRevision.set(rev);
+ };
+
+ CompletableFuture<Void> configurationCatchUpFuture =
RecoveryCompletionFutureFactory.create(
+ clusterCfgMgr,
+ fut -> new TestConfigurationCatchUpListener(cfgStorage, fut,
revisionCallback0)
+ );
+
+ CompletableFuture<?> startFuture = CompletableFuture.allOf(
+
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+ clusterConfigRegistry.notifyCurrentConfigurationListeners()
+ ).thenCompose(unused ->
+ // Deploy all registered watches because all components are
ready and have registered their listeners.
+ CompletableFuture.allOf(metaStorageMgr.deployWatches(),
configurationCatchUpFuture)
+ );
+
+ assertThat("Partial node was not started", startFuture,
willCompleteSuccessfully());
+
+ log.info("Completed recovery on partially started node, last revision
applied: " + lastRevision.get()
+ + ", acceptableDifference: " +
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
100)
+ );
+
+ return new PartialNode(
+ components,
+ List.of(localConfigurationGenerator,
distributedConfigurationGenerator),
+ logicalTopology
+ );
+ }
+
+ /**
+ * Node with partially started components.
+ */
+ public static class PartialNode {
+
+ private final List<IgniteComponent> startedComponents;
+
+ private final List<ManuallyCloseable> closeables;
+
+ private final LogicalTopology logicalTopology;
+
+ PartialNode(List<IgniteComponent> startedComponents,
List<ManuallyCloseable> closeables, LogicalTopology logicalTopology) {
+ this.startedComponents = startedComponents;
+ this.closeables = closeables;
+ this.logicalTopology = logicalTopology;
+ }
+
+ /**
+ * Stops node.
+ */
+ public void stop() {
+ ListIterator<IgniteComponent> iter =
startedComponents.listIterator(startedComponents.size());
+
+ while (iter.hasPrevious()) {
+ IgniteComponent prev = iter.previous();
+
+ prev.beforeNodeStop();
+ }
+
+ iter = startedComponents.listIterator(startedComponents.size());
+
+ while (iter.hasPrevious()) {
+ IgniteComponent prev = iter.previous();
+
+ try {
+ prev.stop();
+ } catch (Exception e) {
+ log.error("Error during component stop", e);
+ }
+ }
+
+ closeables.forEach(c -> {
+ try {
+ c.close();
+ } catch (Exception e) {
+ log.error("Error during close", e);
+ }
+ });
+ }
+
+ public List<IgniteComponent> startedComponents() {
+ return startedComponents;
+ }
+
+ public LogicalTopology logicalTopology() {
+ return logicalTopology;
+ }
+ }
+
+ /**
+ * Configuration catch-up listener for test.
+ */
+ public static class TestConfigurationCatchUpListener extends
ConfigurationCatchUpListener {
+ /** Callback called on revision update. */
+ private final Consumer<Long> revisionCallback;
+
+ /**
+ * Constructor.
+ *
+ * @param cfgStorage Configuration storage.
+ * @param catchUpFuture Catch-up future.
+ */
+ TestConfigurationCatchUpListener(
+ ConfigurationStorage cfgStorage,
+ CompletableFuture<Void> catchUpFuture,
+ Consumer<Long> revisionCallback
+ ) {
+ super(cfgStorage, catchUpFuture, log);
+
+ this.revisionCallback = revisionCallback;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> onUpdate(long appliedRevision) {
+ if (revisionCallback != null) {
+ revisionCallback.accept(appliedRevision);
+ }
+
+ return super.onUpdate(appliedRevision);
+ }
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
new file mode 100644
index 0000000000..81e210c9b9
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distribution.zones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
+import static
org.apache.ignite.internal.recovery.ConfigurationCatchUpListener.CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.BaseIgniteRestartTest;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.ConfigurationModules;
+import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import org.apache.ignite.internal.configuration.NodeConfigWriteException;
+import
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
+import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import
org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import
org.apache.ignite.internal.metastorage.server.TestRocksDbKeyValueStorage;
+import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NettyBootstrapFactory;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for checking {@link DistributionZoneManager} behavior after node's
restart.
+ */
+@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value =
"0")
+@ExtendWith(ConfigurationExtension.class)
+public class ItIgniteDistributionZoneManagerNodeRestartTest extends
BaseIgniteRestartTest {
+ private static final LogicalNode A = new LogicalNode(
+ new ClusterNode("1", "A", new NetworkAddress("localhost", 123)),
+ Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10")
+ );
+
+ private static final LogicalNode B = new LogicalNode(
+ new ClusterNode("2", "B", new NetworkAddress("localhost", 123)),
+ Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30")
+ );
+
+ private static final LogicalNode C = new LogicalNode(
+ new ClusterNode("3", "C", new NetworkAddress("localhost", 123)),
+ Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+ );
+
+ private static final String ZONE_NAME = "zone1";
+
+ /**
+ * Start some of Ignite components that are able to serve as Ignite node
for test purposes.
+ *
+ * @param idx Node index.
+ * @return Partial node.
+ */
+ private PartialNode startPartialNode(int idx) {
+ String name = testNodeName(testInfo, idx);
+
+ Path dir = workDir.resolve(name);
+
+ List<IgniteComponent> components = new ArrayList<>();
+
+ VaultManager vault = createVault(name, dir);
+
+ ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
+
+ Path configFile =
workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
+ String configString = configurationString(idx);
+ try {
+ Files.writeString(configFile, configString);
+ } catch (IOException e) {
+ throw new NodeConfigWriteException("Failed to write config content
to file.", e);
+ }
+
+ var localConfigurationGenerator = new ConfigurationTreeGenerator(
+ modules.local().rootKeys(),
+ modules.local().internalSchemaExtensions(),
+ modules.local().polymorphicSchemaExtensions()
+ );
+
+ var nodeCfgMgr = new ConfigurationManager(
+ modules.local().rootKeys(),
+ new LocalFileConfigurationStorage(configFile,
localConfigurationGenerator),
+ localConfigurationGenerator,
+
ConfigurationValidatorImpl.withDefaultValidators(localConfigurationGenerator,
modules.local().validators())
+ );
+
+ NetworkConfiguration networkConfiguration =
nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
+
+ var nettyBootstrapFactory = new
NettyBootstrapFactory(networkConfiguration, name);
+
+ var clusterSvc = new
TestScaleCubeClusterServiceFactory().createClusterService(
+ name,
+ networkConfiguration,
+ nettyBootstrapFactory,
+ defaultSerializationRegistry(),
+ new VaultStateIds(vault)
+ );
+
+ var clusterStateStorage = new TestClusterStateStorage();
+
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+ var cmgManager = mock(ClusterManagementGroupManager.class);
+
+ when(cmgManager.logicalTopology()).thenAnswer(invocation ->
completedFuture(logicalTopology.getLogicalTopology()));
+
+ var metaStorageMgr = StandaloneMetaStorageManager.create(
+ vault,
+ new TestRocksDbKeyValueStorage("test",
workDir.resolve("metastorage"))
+ );
+
+ var cfgStorage = new DistributedConfigurationStorage(metaStorageMgr,
vault);
+
+ ConfigurationTreeGenerator distributedConfigurationGenerator = new
ConfigurationTreeGenerator(
+ modules.distributed().rootKeys(),
+ modules.distributed().internalSchemaExtensions(),
+ modules.distributed().polymorphicSchemaExtensions()
+ );
+
+ var clusterCfgMgr = new ConfigurationManager(
+ modules.distributed().rootKeys(),
+ cfgStorage,
+ distributedConfigurationGenerator,
+
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator,
modules.distributed().validators())
+ );
+
+ ConfigurationRegistry clusterConfigRegistry =
clusterCfgMgr.configurationRegistry();
+
+ DistributionZonesConfiguration zonesConfiguration =
clusterConfigRegistry.getConfiguration(DistributionZonesConfiguration.KEY);
+
+ TablesConfiguration tablesConfiguration =
clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
+
+ LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+ DistributionZoneManager distributionZoneManager = new
DistributionZoneManager(
+ zonesConfiguration,
+ tablesConfiguration,
+ metaStorageMgr,
+ logicalTopologyService,
+ vault,
+ name
+ );
+
+ // Preparing the result map.
+
+ components.add(vault);
+ components.add(nodeCfgMgr);
+
+ // Start.
+
+ vault.start();
+ vault.putName(name).join();
+
+ nodeCfgMgr.start();
+
+ // Start the remaining components.
+ List<IgniteComponent> otherComponents = List.of(
+ nettyBootstrapFactory,
+ clusterSvc,
+ clusterStateStorage,
+ cmgManager,
+ metaStorageMgr,
+ clusterCfgMgr,
+ distributionZoneManager
+ );
+
+ for (IgniteComponent component : otherComponents) {
+ component.start();
+
+ components.add(component);
+ }
+
+ PartialNode partialNode = partialNode(
+ nodeCfgMgr,
+ clusterCfgMgr,
+ metaStorageMgr,
+ null,
+ components,
+ localConfigurationGenerator,
+ logicalTopology,
+ cfgStorage,
+ distributedConfigurationGenerator,
+ clusterConfigRegistry
+ );
+
+ partialNodes.add(partialNode);
+
+ return partialNode;
+ }
+
+ @Test
+ public void testNodeAttributesRestoredAfterRestart() throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ partialNode.logicalTopology().putNode(A);
+ partialNode.logicalTopology().putNode(B);
+ partialNode.logicalTopology().putNode(C);
+
+ distributionZoneManager.createZone(
+ new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ 1,
+ Set.of(A, B,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+
+ Map<String, Map<String, String>> nodeAttributesBeforeRestart =
distributionZoneManager.nodesAttributes();
+
+ partialNode.stop();
+
+ partialNode = startPartialNode(0);
+
+ distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ Map<String, Map<String, String>> nodeAttributesAfterRestart =
distributionZoneManager.nodesAttributes();
+
+ assertEquals(3, nodeAttributesAfterRestart.size());
+
+ assertEquals(nodeAttributesBeforeRestart, nodeAttributesAfterRestart);
+ }
+
+ @Test
+ public void testLogicalTopologyRestoredAfterRestart() throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ partialNode.logicalTopology().putNode(A);
+ partialNode.logicalTopology().putNode(B);
+ partialNode.logicalTopology().putNode(C);
+
+ Set<NodeWithAttributes> logicalTopology = Set.of(A, B, C).stream()
+ .map(n -> new NodeWithAttributes(n.name(), n.id(),
n.nodeAttributes()))
+ .collect(Collectors.toSet());
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ DistributionZoneManager finalDistributionZoneManager =
distributionZoneManager;
+
+ assertTrue(waitForCondition(() ->
logicalTopology.equals(finalDistributionZoneManager.logicalTopology()),
TIMEOUT_MILLIS));
+
+ partialNode.stop();
+
+ partialNode = startPartialNode(0);
+
+ distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ assertEquals(logicalTopology,
distributionZoneManager.logicalTopology());
+ }
+
+ @Test
+ public void testGlobalStateRevisionUpdatedCorrectly() throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ partialNode.logicalTopology().putNode(A);
+ partialNode.logicalTopology().putNode(B);
+ partialNode.logicalTopology().putNode(C);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ DEFAULT_ZONE_ID,
+ Set.of(A, B,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+
+ MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+ long scaleUpChangeTriggerRevision = bytesToLong(
+
metaStorageManager.get(zoneScaleUpChangeTriggerKey(DEFAULT_ZONE_ID)).join().value()
+ );
+
+ VaultManager vaultManager =
findComponent(partialNode.startedComponents(), VaultManager.class);
+
+ long globalStateRevision =
bytesToLong(vaultManager.get(zonesGlobalStateRevision()).join().value());
+
+ assertEquals(scaleUpChangeTriggerRevision, globalStateRevision);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index a33c7f7ba4..cd8d3a1102 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -36,6 +36,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
@@ -43,15 +44,12 @@ import
org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.sql.Session;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
-import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
@@ -61,12 +59,7 @@ import org.junit.jupiter.api.TestInfo;
/**
* These tests check in-memory node restart scenarios.
*/
-public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
- /** Default node port. */
- private static final int DEFAULT_NODE_PORT = 3344;
-
- /** Default client port. */
- private static final int DEFAULT_CLIENT_PORT = 10800;
+public class ItIgniteInMemoryNodeRestartTest extends BaseIgniteRestartTest {
/** Value producer for table data, is used to create data and check it
later. */
private static final IntFunction<String> VALUE_PRODUCER = i -> "val " + i;
@@ -74,18 +67,9 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
/** Test table name. */
private static final String TABLE_NAME = "Table1";
- /** Nodes bootstrap configuration pattern. */
- private static final String NODE_BOOTSTRAP_CFG = "{\n"
- + " network.port: {},\n"
- + " network.nodeFinder.netClusterNodes: {},\n"
- + " clientConnector.port: {}\n"
- + "}";
-
/** Cluster nodes. */
private static final List<Ignite> CLUSTER_NODES = new ArrayList<>();
- private static final List<String> CLUSTER_NODES_NAMES = new ArrayList<>();
-
/**
* Stops all started nodes.
*/
@@ -102,7 +86,6 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
IgniteUtils.closeAll(closeables);
CLUSTER_NODES.clear();
- CLUSTER_NODES_NAMES.clear();
}
/**
@@ -155,22 +138,6 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
return startNode(idx, nodeName, cfgString, workDir.resolve(nodeName));
}
- /**
- * Build a configuration string.
- *
- * @param idx Node index.
- * @return Configuration string.
- */
- private static String configurationString(int idx) {
- int port = DEFAULT_NODE_PORT + idx;
- int clientPort = DEFAULT_CLIENT_PORT + idx;
-
- // The address of the first node.
- @Language("HOCON") String connectAddr = "[localhost\":\"" +
DEFAULT_NODE_PORT + "]";
-
- return IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, port,
connectAddr, clientPort);
- }
-
/**
* Stop the node with given index.
*
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index bd23ff039c..8bec4299f1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -39,13 +39,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.LongFunction;
@@ -55,12 +53,11 @@ import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
-import org.apache.ignite.configuration.ConfigurationModule;
+import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
-import org.apache.ignite.internal.close.ManuallyCloseable;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
@@ -72,8 +69,6 @@ import
org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
import org.apache.ignite.internal.configuration.NodeConfigWriteException;
-import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
-import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -85,7 +80,6 @@ import
org.apache.ignite.internal.distributionzones.configuration.DistributionZo
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.index.IndexManager;
-import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
@@ -95,8 +89,6 @@ import org.apache.ignite.internal.raft.Loza;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
-import org.apache.ignite.internal.recovery.ConfigurationCatchUpListener;
-import org.apache.ignite.internal.recovery.RecoveryCompletionFutureFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.SchemaManager;
@@ -112,19 +104,15 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -136,11 +124,8 @@ import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.intellij.lang.annotations.Language;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -150,12 +135,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value =
"0")
@ExtendWith(ConfigurationExtension.class)
@Timeout(120)
-public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
- /** Default node port. */
- private static final int DEFAULT_NODE_PORT = 3344;
-
- private static final int DEFAULT_CLIENT_PORT = 10800;
-
+public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
/** Value producer for table data, is used to create data and check it
later. */
private static final IntFunction<String> VALUE_PRODUCER = i -> "val " + i;
@@ -165,29 +145,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/** Test table name. */
private static final String TABLE_NAME_2 = "Table2";
- @Language("HOCON")
- private static final String RAFT_CFG = "{\n"
- + " fsync: false,\n"
- + " retryDelay: 20\n"
- + "}";
-
- /** Nodes bootstrap configuration pattern. */
- private static final String NODE_BOOTSTRAP_CFG = "{\n"
- + " network.port: {},\n"
- + " network.nodeFinder.netClusterNodes: {}\n"
- + " network.membership: {\n"
- + " membershipSyncInterval: 1000,\n"
- + " failurePingInterval: 500,\n"
- + " scaleCube: {\n"
- + " membershipSuspicionMultiplier: 1,\n"
- + " failurePingRequestMembers: 1,\n"
- + " gossipInterval: 10\n"
- + " },\n"
- + " },\n"
- + " raft: " + RAFT_CFG + ",\n"
- + " clientConnector.port: {}\n"
- + "}";
-
@InjectConfiguration("mock: " + RAFT_CFG)
private static RaftConfiguration raftConfiguration;
@@ -197,47 +154,12 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
@InjectConfiguration
private static NodeAttributesConfiguration nodeAttributes;
- private final List<String> clusterNodesNames = new ArrayList<>();
-
- /** Cluster nodes. */
- private List<PartialNode> partialNodes;
-
- private TestInfo testInfo;
-
- @BeforeEach
- void setUp(TestInfo testInfo) {
- this.testInfo = testInfo;
- this.partialNodes = new ArrayList<>();
- }
-
- /**
- * Stops all started nodes.
- */
- @AfterEach
- public void afterEach() throws Exception {
- var closeables = new ArrayList<AutoCloseable>();
-
- for (String name : clusterNodesNames) {
- if (name != null) {
- closeables.add(() -> IgnitionManager.stop(name));
- }
- }
-
- if (!partialNodes.isEmpty()) {
- for (PartialNode partialNode : partialNodes) {
- closeables.add(partialNode::stop);
- }
- }
-
- IgniteUtils.closeAll(closeables);
- }
-
/**
* Start some of Ignite components that are able to serve as Ignite node
for test purposes.
*
* @param idx Node index.
* @param cfgString Configuration string or {@code null} to use the
default configuration.
- * @return List of started components.
+ * @return Partial node.
*/
private PartialNode startPartialNode(int idx, @Nullable @Language("HOCON")
String cfgString) {
return startPartialNode(idx, cfgString, null);
@@ -249,7 +171,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* @param idx Node index.
* @param cfgString Configuration string or {@code null} to use the
default configuration.
* @param revisionCallback Callback on storage revision update.
- * @return List of started components.
+ * @return Partial node.
*/
private PartialNode startPartialNode(
int idx,
@@ -485,55 +407,23 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
components.add(component);
}
- AtomicLong lastRevision = new AtomicLong();
-
- Consumer<Long> revisionCallback0 = rev -> {
- if (revisionCallback != null) {
- revisionCallback.accept(rev);
- }
-
- lastRevision.set(rev);
- };
-
- CompletableFuture<Void> configurationCatchUpFuture =
RecoveryCompletionFutureFactory.create(
+ PartialNode partialNode = partialNode(
+ nodeCfgMgr,
clusterCfgMgr,
- fut -> new TestConfigurationCatchUpListener(cfgStorage, fut,
revisionCallback0)
- );
-
- CompletableFuture<?> startFuture = CompletableFuture.allOf(
-
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
- clusterConfigRegistry.notifyCurrentConfigurationListeners()
- ).thenCompose(unused ->
- // Deploy all registered watches because all components are
ready and have registered their listeners.
- CompletableFuture.allOf(metaStorageMgr.deployWatches(),
configurationCatchUpFuture)
- );
-
- assertThat("Partial node was not started", startFuture,
willCompleteSuccessfully());
-
- log.info("Completed recovery on partially started node, last revision
applied: " + lastRevision.get()
- + ", acceptableDifference: " +
IgniteSystemProperties.getInteger(CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY,
100)
+ metaStorageMgr,
+ revisionCallback,
+ components,
+ localConfigurationGenerator,
+ logicalTopology,
+ cfgStorage,
+ distributedConfigurationGenerator,
+ clusterConfigRegistry
);
- PartialNode partialNode = new PartialNode(components,
List.of(localConfigurationGenerator, distributedConfigurationGenerator));
partialNodes.add(partialNode);
return partialNode;
}
- /**
- * Starts the Vault component.
- */
- private static VaultManager createVault(String nodeName, Path workDir) {
- Path vaultPath = workDir.resolve(Paths.get("vault"));
-
- try {
- Files.createDirectories(vaultPath);
- } catch (IOException e) {
- throw new IgniteInternalException(e);
- }
-
- return new VaultManager(new PersistentVaultService(nodeName,
vaultPath));
- }
-
/**
* Returns a path to the partitions store directory. Creates a directory
if it doesn't exist.
*
@@ -552,36 +442,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
return partitionsStore;
}
- /**
- * Load configuration modules.
- *
- * @param log Log.
- * @param classLoader Class loader.
- * @return Configuration modules.
- */
- private static ConfigurationModules loadConfigurationModules(IgniteLogger
log, ClassLoader classLoader) {
- var modulesProvider = new ServiceLoaderModulesProvider();
- List<ConfigurationModule> modules =
modulesProvider.modules(classLoader);
-
- if (log.isInfoEnabled()) {
- log.info("Configuration modules loaded: {}", modules);
- }
-
- if (modules.isEmpty()) {
- throw new IllegalStateException("No configuration modules were
loaded, this means Ignite cannot start. "
- + "Please make sure that the classloader for loading
services is correct.");
- }
-
- var configModules = new ConfigurationModules(modules);
-
- if (log.isInfoEnabled()) {
- log.info("Local root keys: {}", configModules.local().rootKeys());
- log.info("Distributed root keys: {}",
configModules.distributed().rootKeys());
- }
-
- return configModules;
- }
-
/**
* Starts a node with the given parameters.
*
@@ -590,12 +450,12 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* @return Created node instance.
*/
private IgniteImpl startNode(int idx, @Nullable String cfg) {
- boolean initNeeded = clusterNodesNames.isEmpty();
+ boolean initNeeded = CLUSTER_NODES_NAMES.isEmpty();
CompletableFuture<Ignite> future = startNodeAsync(idx, cfg);
if (initNeeded) {
- String nodeName = clusterNodesNames.get(0);
+ String nodeName = CLUSTER_NODES_NAMES.get(0);
InitParameters initParameters = InitParameters.builder()
.destinationNodeName(nodeName)
@@ -634,12 +494,12 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
String cfgString = cfg == null ? configurationString(idx) : cfg;
- if (clusterNodesNames.size() == idx) {
- clusterNodesNames.add(nodeName);
+ if (CLUSTER_NODES_NAMES.size() == idx) {
+ CLUSTER_NODES_NAMES.add(nodeName);
} else {
- assertNull(clusterNodesNames.get(idx));
+ assertNull(CLUSTER_NODES_NAMES.get(idx));
- clusterNodesNames.set(idx, nodeName);
+ CLUSTER_NODES_NAMES.set(idx, nodeName);
}
return TestIgnitionManager.start(nodeName, cfgString,
workDir.resolve(nodeName));
@@ -649,14 +509,14 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* Starts an {@code amount} number of nodes (with sequential indices
starting from 0).
*/
private List<IgniteImpl> startNodes(int amount) {
- boolean initNeeded = clusterNodesNames.isEmpty();
+ boolean initNeeded = CLUSTER_NODES_NAMES.isEmpty();
List<CompletableFuture<Ignite>> futures = IntStream.range(0, amount)
.mapToObj(i -> startNodeAsync(i, null))
.collect(Collectors.toList());
if (initNeeded) {
- String nodeName = clusterNodesNames.get(0);
+ String nodeName = CLUSTER_NODES_NAMES.get(0);
InitParameters initParameters = InitParameters.builder()
.destinationNodeName(nodeName)
@@ -675,25 +535,13 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
.collect(Collectors.toList());
}
- /**
- * Build a configuration string.
- *
- * @param idx Node index.
- * @return Configuration string.
- */
- private static String configurationString(int idx) {
- String connectAddr = "[\"localhost:" + DEFAULT_NODE_PORT + "\"]";
-
- return IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG,
DEFAULT_NODE_PORT + idx, connectAddr, DEFAULT_CLIENT_PORT + idx);
- }
-
/**
* Stop the node with given index.
*
* @param idx Node index.
*/
private void stopNode(int idx) {
- String nodeName = clusterNodesNames.set(idx, null);
+ String nodeName = CLUSTER_NODES_NAMES.set(idx, null);
if (nodeName != null) {
IgnitionManager.stop(nodeName);
@@ -963,25 +811,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
checkTableWithData(ignite, TABLE_NAME_2);
}
- /**
- * Find component of a given type in list.
- *
- * @param components Components list.
- * @param cls Class.
- * @param <T> Type parameter.
- * @return Ignite component.
- */
- @Nullable
- private static <T extends IgniteComponent> T
findComponent(List<IgniteComponent> components, Class<T> cls) {
- for (IgniteComponent component : components) {
- if (cls.isAssignableFrom(component.getClass())) {
- return cls.cast(component);
- }
- }
-
- return null;
- }
-
/**
* Check that the table with given name is present in TableManager.
*
@@ -1081,7 +910,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
PartialNode partialNode = startPartialNode(1, cfgString);
- TableManager tableManager =
findComponent(partialNode.startedComponents, TableManager.class);
+ TableManager tableManager =
findComponent(partialNode.startedComponents(), TableManager.class);
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
}
@@ -1109,7 +938,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
PartialNode partialNode = startPartialNode(nodes.size() - 1, null);
- TableManager tableManager =
findComponent(partialNode.startedComponents, TableManager.class);
+ TableManager tableManager =
findComponent(partialNode.startedComponents(), TableManager.class);
assertTablePresent(tableManager, TABLE_NAME.toUpperCase());
assertTablePresent(tableManager, TABLE_NAME_2.toUpperCase());
@@ -1160,7 +989,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
}
);
- TableManager tableManager =
findComponent(partialNode.startedComponents, TableManager.class);
+ TableManager tableManager =
findComponent(partialNode.startedComponents(), TableManager.class);
for (int i = 0; i < cfgGap; i++) {
assertTablePresent(tableManager, "T" + i);
@@ -1276,7 +1105,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
if (!partialNodes.isEmpty()) {
partialTablesConfiguration = partialNodes.stream()
- .flatMap(it -> it.startedComponents.stream())
+ .flatMap(it -> it.startedComponents().stream())
.filter(ConfigurationManager.class::isInstance)
.map(c -> ((ConfigurationManager)
c).configurationRegistry().getConfiguration(TablesConfiguration.KEY))
.filter(Objects::nonNull)
@@ -1296,7 +1125,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
() -> tablesConfigurations.stream()
.map(cfg -> cfg.indexes().get(indexName.toUpperCase()))
.allMatch(Objects::nonNull),
- 10_000
+ TIMEOUT_MILLIS
));
}
@@ -1318,80 +1147,4 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
return ignite.tables().table(name);
}
-
- /**
- * Configuration catch-up listener for test.
- */
- private static class TestConfigurationCatchUpListener extends
ConfigurationCatchUpListener {
- /** Callback called on revision update. */
- private final Consumer<Long> revisionCallback;
-
- /**
- * Constructor.
- *
- * @param cfgStorage Configuration storage.
- * @param catchUpFuture Catch-up future.
- */
- TestConfigurationCatchUpListener(
- ConfigurationStorage cfgStorage,
- CompletableFuture<Void> catchUpFuture,
- Consumer<Long> revisionCallback
- ) {
- super(cfgStorage, catchUpFuture, log);
-
- this.revisionCallback = revisionCallback;
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<?> onUpdate(long appliedRevision) {
- if (revisionCallback != null) {
- revisionCallback.accept(appliedRevision);
- }
-
- return super.onUpdate(appliedRevision);
- }
- }
-
- private static class PartialNode {
-
- private final List<IgniteComponent> startedComponents;
-
- private final List<ManuallyCloseable> closeables;
-
- public PartialNode(List<IgniteComponent> startedComponents,
List<ManuallyCloseable> closeables) {
- this.startedComponents = startedComponents;
- this.closeables = closeables;
- }
-
- public void stop() {
- ListIterator<IgniteComponent> iter =
startedComponents.listIterator(startedComponents.size());
-
- while (iter.hasPrevious()) {
- IgniteComponent prev = iter.previous();
-
- prev.beforeNodeStop();
- }
-
- iter = startedComponents.listIterator(startedComponents.size());
-
- while (iter.hasPrevious()) {
- IgniteComponent prev = iter.previous();
-
- try {
- prev.stop();
- } catch (Exception e) {
- log.error("Error during component stop", e);
- }
- }
-
- closeables.forEach(c -> {
- try {
- c.close();
- } catch (Exception e) {
- log.error("Error during close", e);
- }
- });
- }
- }
}