This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 62d02d7e14 IGNITE-19581 Add restoring of zones state after restart
taking into account zone's filter updates (#2288)
62d02d7e14 is described below
commit 62d02d7e14341ec1db220ad07b3b64b18d395b01
Author: Mirza Aliev <[email protected]>
AuthorDate: Thu Jul 6 11:33:10 2023 +0400
IGNITE-19581 Add restoring of zones state after restart taking into account
zone's filter updates (#2288)
---
.../distributionzones/DistributionZoneManager.java | 99 ++++++++--
.../distributionzones/DistributionZonesUtil.java | 19 ++
.../rebalance/DistributionZoneRebalanceEngine.java | 9 +-
.../DistributionZoneManagerAlterFilterTest.java | 50 +----
.../DistributionZoneManagerFilterTest.java | 29 +--
.../DistributionZonesTestUtil.java | 10 +-
...niteDistributionZoneManagerNodeRestartTest.java | 202 ++++++++++++++++-----
7 files changed, 270 insertions(+), 148 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 5673101862..03ce9cbaf0 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -44,6 +44,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneTopologyAugmentationVault;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesFilterUpdateRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
@@ -753,7 +754,22 @@ public class DistributionZoneManager implements
IgniteComponent {
int zoneId = ctx.newValue(DistributionZoneView.class).zoneId();
- saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision());
+ VaultEntry filterUpdateRevision =
vaultMgr.get(zonesFilterUpdateRevision()).join();
+
+ long eventRevision = ctx.storageRevision();
+
+ if (filterUpdateRevision != null) {
+ // This means that we have already handled event with this
revision.
+ // It is possible when node was restarted after this listener
completed,
+ // but applied revision didn't have time to be propagated to
the Vault.
+ if (bytesToLong(filterUpdateRevision.value()) >=
eventRevision) {
+ return completedFuture(null);
+ }
+ }
+
+ vaultMgr.put(zonesFilterUpdateRevision(),
longToBytes(eventRevision)).join();
+
+ saveDataNodesToMetaStorageOnScaleUp(zoneId, eventRevision);
return completedFuture(null);
};
@@ -826,29 +842,74 @@ public class DistributionZoneManager implements
IgniteComponent {
Optional<Long> maxScaleUpRevision =
zoneState.highestRevision(true);
- // Take the highest revision from the topologyAugmentationMap and
schedule scale up/scale down,
- // meaning that all augmentation of nodes will be taken into
account in newly created timers. If the augmentation has already
- // been proposed to data nodes in the metastorage before restart,
- // that means we have updated corresponding trigger key and it's
value will be greater than
- // the highest revision from the topologyAugmentationMap, and
current timer won't affect data nodes.
- maxScaleUpRevision.ifPresent(
- rev -> zoneState.rescheduleScaleUp(
- zone.dataNodesAutoAdjustScaleUp(),
- () -> saveDataNodesToMetaStorageOnScaleUp(zoneId,
rev)
- )
- );
-
Optional<Long> maxScaleDownRevision =
zoneState.highestRevision(false);
- maxScaleDownRevision.ifPresent(
- rev -> zoneState.rescheduleScaleDown(
- zone.dataNodesAutoAdjustScaleDown(),
- () ->
saveDataNodesToMetaStorageOnScaleDown(zoneId, rev)
- )
- );
+ VaultEntry filterUpdateRevision =
vaultMgr.get(zonesFilterUpdateRevision()).join();
+
+ restoreTimers(zone, zoneState, maxScaleUpRevision,
maxScaleDownRevision, filterUpdateRevision);
}
}
+ /**
+ * Restores timers that were scheduled before a node's restart.
+ * Take the highest revision from the {@link
ZoneState#topologyAugmentationMap()}, compare it with the revision
+ * of the last update of the zone's filter and schedule scale up/scale
down timers. Filter revision is taken into account because
+ * any filter update triggers immediate scale up.
+ *
+ * @param zone Zone's view.
+ * @param zoneState Zone's state from Distribution Zone Manager
+ * @param maxScaleUpRevisionOptional Max revision from the {@link
ZoneState#topologyAugmentationMap()} for node joins.
+ * @param maxScaleDownRevisionOptional Max revision from the {@link
ZoneState#topologyAugmentationMap()} for node removals.
+ * @param filterUpdateRevisionVaultEntry Revision of the last update of
the zone's filter.
+ */
+ private void restoreTimers(
+ DistributionZoneView zone,
+ ZoneState zoneState,
+ Optional<Long> maxScaleUpRevisionOptional,
+ Optional<Long> maxScaleDownRevisionOptional,
+ VaultEntry filterUpdateRevisionVaultEntry
+ ) {
+ int zoneId = zone.zoneId();
+
+ maxScaleUpRevisionOptional.ifPresent(
+ maxScaleUpRevision -> {
+ if (filterUpdateRevisionVaultEntry != null) {
+ long filterUpdateRevision =
bytesToLong(filterUpdateRevisionVaultEntry.value());
+
+ // Immediately trigger scale up, that was planned to
be invoked before restart.
+ // If this invoke was successful before restart, then
current call just will be skipped.
+ saveDataNodesToMetaStorageOnScaleUp(zoneId,
filterUpdateRevision);
+
+ if (maxScaleUpRevision < filterUpdateRevision) {
+ // Don't need to trigger additional scale up for
the scenario, when filter update event happened after the last
+ // node join event.
+
+ // TODO: IGNITE-19506 Think carefully for the
scenario when scale up timer was immediate before restart and
+ // causality data nodes is implemented.
+ return;
+ }
+ }
+
+ // Take the highest revision from the
topologyAugmentationMap and schedule scale up/scale down,
+ // meaning that all augmentations of nodes will be taken
into account in newly created timers.
+ // If augmentations have already been proposed to data
nodes in the metastorage before restart,
+ // that means we have updated corresponding trigger key
and it's value will be greater than
+ // the highest revision from the topologyAugmentationMap,
and current timer won't affect data nodes.
+ zoneState.rescheduleScaleUp(
+ zone.dataNodesAutoAdjustScaleUp(),
+ () -> saveDataNodesToMetaStorageOnScaleUp(zoneId,
maxScaleUpRevision)
+ );
+ }
+ );
+
+ maxScaleDownRevisionOptional.ifPresent(
+ maxScaleDownRevision -> zoneState.rescheduleScaleDown(
+ zone.dataNodesAutoAdjustScaleDown(),
+ () -> saveDataNodesToMetaStorageOnScaleDown(zoneId,
maxScaleDownRevision)
+ )
+ );
+ }
+
/**
* Method initialise data nodes value for the specified zone, also sets
{@code revision} to the
* {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 7f87409174..2d823b30ed 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -84,6 +84,9 @@ public class DistributionZonesUtil {
/** Key value for zones' global state revision in vault. */
private static final String DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT
= "vault.distributionZones.globalState.revision";
+ /** Key value for zones' filter update revision in vault. */
+ private static final String
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT =
"vault.distributionZones.filterUpdate.revision";
+
/** Key prefix for zones' logical topology nodes. */
private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY =
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes";
@@ -112,6 +115,10 @@ public class DistributionZonesUtil {
private static final ByteArray
DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY =
new ByteArray(DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT);
+ /** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT}. */
+ private static final ByteArray
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY =
+ new ByteArray(DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT);
+
/** ByteArray representation of {@link
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
private static final ByteArray
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
@@ -237,6 +244,13 @@ public class DistributionZonesUtil {
return DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY;
}
+ /**
+ * The key represents the last revision of the zone's filter update.
+ */
+ public static ByteArray zonesFilterUpdateRevision() {
+ return DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY;
+ }
+
/**
* The key that represents {@link ZoneState#topologyAugmentationMap()} in
the Vault.
*/
@@ -392,6 +406,11 @@ public class DistributionZonesUtil {
return dataNodesMap;
}
+ @Nullable
+ public static Set<Node> parseDataNodes(byte[] dataNodesBytes) {
+ return dataNodesBytes == null ? null :
dataNodes(fromBytes(dataNodesBytes));
+ }
+
/**
* Returns data nodes from the meta storage entry or empty map if the
value is null.
*
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index d2fed35132..89c45d0dd4 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -22,10 +22,10 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toList;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
@@ -48,11 +48,9 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.Nullable;
/**
* Zone rebalance manager.
@@ -228,11 +226,6 @@ public class DistributionZoneRebalanceEngine {
};
}
- @Nullable
- private static Set<Node> parseDataNodes(byte[] dataNodesBytes) {
- return dataNodesBytes == null ? null :
dataNodes(ByteUtils.fromBytes(dataNodesBytes));
- }
-
/**
* Listener of replicas configuration changes.
*
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
index e44b9a8b48..3ffd4167ff 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
@@ -36,7 +36,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.metastorage.server.If;
@@ -114,12 +113,7 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(C,
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C,
D), TIMEOUT_MILLIS);
}
/**
@@ -155,12 +149,7 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(),
TIMEOUT_MILLIS);
}
/**
@@ -201,12 +190,7 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
// TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
// Node C is still in data nodes because altering a filter triggers
only immediate scale up.
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(C,
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C,
D), TIMEOUT_MILLIS);
// Check that scale down task is still scheduled.
assertNotNull(distributionZoneManager.zonesState().get(zoneId).scaleUpTask());
@@ -221,12 +205,7 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
.build()
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
-
Set.of(D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(D),
TIMEOUT_MILLIS);
}
/**
@@ -284,12 +263,7 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
// Check that node E, that was added while filter's altering, is not
propagated to data nodes.
// TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(C,
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C,
D), TIMEOUT_MILLIS);
// Assert that scheduled timer was not canceled because of immediate
scale up after filter altering.
assertNotNull(distributionZoneManager.zonesState().get(zoneId).scaleUpTask());
@@ -304,12 +278,7 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
// Check that node E, that was added after filter's altering, was
added only after altering immediate scale up.
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(C, D,
e).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C,
D, e), TIMEOUT_MILLIS);
}
/**
@@ -354,12 +323,7 @@ public class DistributionZoneManagerAlterFilterTest
extends BaseDistributionZon
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(A,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A,
C), TIMEOUT_MILLIS);
}
private static Stream<Arguments> provideArgumentsForFilterAlteringTests() {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
index afcff858dc..7e977bd885 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
@@ -65,12 +64,7 @@ public class DistributionZoneManagerFilterTest extends
BaseDistributionZoneManag
topology.putNode(D);
- assertDataNodesFromManager(
- distributionZoneManager,
- ZONE_ID,
- Set.of(A, C,
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, ZONE_ID, Set.of(A,
C, D), TIMEOUT_MILLIS);
}
@Test
@@ -79,12 +73,7 @@ public class DistributionZoneManagerFilterTest extends
BaseDistributionZoneManag
topology.removeNodes(Set.of(C));
- assertDataNodesFromManager(
- distributionZoneManager,
- ZONE_ID,
-
Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, ZONE_ID,
Set.of(A), TIMEOUT_MILLIS);
}
@Test
@@ -100,12 +89,7 @@ public class DistributionZoneManagerFilterTest extends
BaseDistributionZoneManag
topology.putNode(newB);
- assertDataNodesFromManager(
- distributionZoneManager,
- ZONE_ID,
- Set.of(A, newB,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, ZONE_ID, Set.of(A,
newB, C), TIMEOUT_MILLIS);
}
/**
@@ -131,11 +115,6 @@ public class DistributionZoneManagerFilterTest extends
BaseDistributionZoneManag
.build()
).get(10_000, TimeUnit.MILLISECONDS);
- assertDataNodesFromManager(
- distributionZoneManager,
- ZONE_ID,
- Set.of(A,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, ZONE_ID, Set.of(A,
C), TIMEOUT_MILLIS);
}
}
diff --git
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 92bd01f763..1cdabe81d1 100644
---
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -63,6 +63,7 @@ import
org.apache.ignite.internal.schema.configuration.storage.DataStorageChange
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -465,21 +466,24 @@ public class DistributionZonesTestUtil {
public static void assertDataNodesFromManager(
DistributionZoneManager distributionZoneManager,
int zoneId,
- @Nullable Set<String> expectedValue,
+ @Nullable Set<LogicalNode> expectedValue,
long timeoutMillis
) throws InterruptedException {
+ Set<String> expectedValueNames =
+ expectedValue == null ? null :
expectedValue.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
boolean success = waitForCondition(() -> {
// TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
- return Objects.equals(dataNodes, expectedValue);
+ return Objects.equals(dataNodes, expectedValueNames);
}, timeoutMillis);
// We do a second check simply to print a nice error message in case
the condition above is not achieved.
if (!success) {
Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
- assertThat(dataNodes, is(expectedValue));
+ assertThat(dataNodes, is(expectedValueNames));
}
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 502af7de66..d744178a3a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
@@ -31,6 +32,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
@@ -80,6 +82,7 @@ import
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkAddress;
@@ -275,12 +278,7 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
- assertDataNodesFromManager(
- distributionZoneManager,
- 1,
- Set.of(A, B,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, 1, Set.of(A, B,
C), TIMEOUT_MILLIS);
Map<String, Map<String, String>> nodeAttributesBeforeRestart =
distributionZoneManager.nodesAttributes();
@@ -335,12 +333,7 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
partialNode.logicalTopology().putNode(C);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
- assertDataNodesFromManager(
- distributionZoneManager,
- DEFAULT_ZONE_ID,
- Set.of(A, B,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, DEFAULT_ZONE_ID,
Set.of(A, B, C), TIMEOUT_MILLIS);
MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
@@ -362,13 +355,13 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
- createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName,
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
partialNode.logicalTopology().putNode(A);
partialNode.logicalTopology().putNode(B);
partialNode.logicalTopology().removeNodes(Set.of(B));
- assertDataNodesFromManager(distributionZoneManager, zoneId,
Set.of(A.name()), TIMEOUT_MILLIS);
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A),
TIMEOUT_MILLIS);
partialNode.stop();
@@ -376,7 +369,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
- assertDataNodesFromManager(distributionZoneManager, zoneId,
Set.of(A.name()), TIMEOUT_MILLIS);
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A),
TIMEOUT_MILLIS);
}
@ParameterizedTest
@@ -386,7 +379,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
- createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName,
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
partialNode.logicalTopology().putNode(A);
partialNode.logicalTopology().putNode(B);
@@ -394,19 +387,14 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
assertDataNodesFromManager(
distributionZoneManager,
zoneId,
- Set.of(A,
B).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ Set.of(A, B),
TIMEOUT_MILLIS
);
MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
- when(metaStorageManager.invoke(argThat(iif -> {
- If iif1 = MetaStorageWriteHandler.toIf(iif);
-
- byte[] keyScaleUp = zoneScaleUpChangeTriggerKey(zoneId).bytes();
-
- return iif1.andThen().update().operations().stream().anyMatch(op
-> Arrays.equals(keyScaleUp, op.key()));
- }))).thenThrow(new RuntimeException("Expected"));
+ // Block scale up
+ blockUpdate(metaStorageManager, zoneScaleUpChangeTriggerKey(zoneId));
partialNode.logicalTopology().putNode(C);
partialNode.logicalTopology().removeNodes(Set.of(B));
@@ -414,7 +402,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertDataNodesFromManager(
distributionZoneManager,
zoneId,
-
Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ Set.of(A),
TIMEOUT_MILLIS
);
@@ -424,32 +412,139 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(A,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A,
C), TIMEOUT_MILLIS);
}
@ParameterizedTest
@MethodSource("provideArgumentsRestartTests")
- public void testScaleDownTimerIsRestoredAfterRestart(int zoneId, String
zoneName) throws Exception {
+ public void testScaleUpTriggeredByFilterUpdateIsRestoredAfterRestart(int
zoneId, String zoneName) throws Exception {
PartialNode partialNode = startPartialNode(0);
DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName,
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
+
+ partialNode.logicalTopology().putNode(A);
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A),
TIMEOUT_MILLIS);
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ partialNode.logicalTopology().putNode(B);
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A),
TIMEOUT_MILLIS);
+
MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
- createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+ blockUpdate(metaStorageManager, zoneScaleUpChangeTriggerKey(zoneId));
- when(metaStorageManager.invoke(argThat(iif -> {
- If iif1 = MetaStorageWriteHandler.toIf(iif);
+ // Only Node B passes the filter
+ String filter = "$[?(@.dataRegionSize > 10)]";
- byte[] keyScaleDown =
zoneScaleDownChangeTriggerKey(zoneId).bytes();
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .filter(filter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- return iif1.andThen().update().operations().stream().anyMatch(op
-> Arrays.equals(keyScaleDown, op.key()));
- }))).thenThrow(new RuntimeException("Expected"));
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A),
TIMEOUT_MILLIS);
+
+ partialNode.stop();
+
+ partialNode = startPartialNode(0);
+
+ distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(B),
TIMEOUT_MILLIS);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArgumentsRestartTests")
+ public void
testScaleUpsTriggeredByFilterUpdateAndNodeJoinAreRestoredAfterRestart(int
zoneId, String zoneName) throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName,
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
+
+ partialNode.logicalTopology().putNode(A);
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A),
TIMEOUT_MILLIS);
+
+ MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+ blockUpdate(metaStorageManager, zoneScaleUpChangeTriggerKey(zoneId));
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(100)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ partialNode.logicalTopology().putNode(B);
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A),
TIMEOUT_MILLIS);
+
+ // Only Node B and C passes the filter
+ String filter = "$[?(@.dataRegionSize > 10)]";
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .filter(filter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ partialNode.logicalTopology().putNode(C);
+
+ partialNode.logicalTopology().removeNodes(Set.of(A));
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(),
TIMEOUT_MILLIS);
+
+ partialNode.stop();
+
+ partialNode = startPartialNode(0);
+
+ distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ // Immediate timer triggered by filter update
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(B),
TIMEOUT_MILLIS);
+
+ ZoneState zoneState = distributionZoneManager.zonesState().get(zoneId);
+
+ // Timer scheduled after join of the node C
+ assertNotNull(zoneState.scaleUpTask());
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(B,
C), TIMEOUT_MILLIS);
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideArgumentsRestartTests")
+ public void testScaleDownTimerIsRestoredAfterRestart(int zoneId, String
zoneName) throws Exception {
+ PartialNode partialNode = startPartialNode(0);
+
+ DistributionZoneManager distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+ createZoneOrAlterDefaultZone(distributionZoneManager, zoneName,
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
+
+ MetaStorageManager metaStorageManager =
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+ // Block scale down
+ blockUpdate(metaStorageManager, zoneScaleDownChangeTriggerKey(zoneId));
partialNode.logicalTopology().putNode(A);
partialNode.logicalTopology().putNode(B);
@@ -459,7 +554,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
assertDataNodesFromManager(
distributionZoneManager,
zoneId,
- Set.of(A, B,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ Set.of(A, B, C),
TIMEOUT_MILLIS
);
@@ -469,12 +564,7 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
distributionZoneManager =
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
- assertDataNodesFromManager(
- distributionZoneManager,
- zoneId,
- Set.of(A,
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
- TIMEOUT_MILLIS
- );
+ assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A,
C), TIMEOUT_MILLIS);
}
private static Stream<Arguments> provideArgumentsRestartTests() {
@@ -488,14 +578,16 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
private static void createZoneOrAlterDefaultZone(
DistributionZoneManager distributionZoneManager,
- String zoneName
+ String zoneName,
+ int scaleUp,
+ int scaleDown
) throws Exception {
if (zoneName.equals(DEFAULT_ZONE_NAME)) {
distributionZoneManager.alterZone(
DEFAULT_ZONE_NAME,
new
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
- .dataNodesAutoAdjustScaleUp(0)
- .dataNodesAutoAdjustScaleDown(0)
+ .dataNodesAutoAdjustScaleUp(scaleUp)
+ .dataNodesAutoAdjustScaleDown(scaleDown)
.build()
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
@@ -512,10 +604,20 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
} else {
distributionZoneManager.createZone(
new
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
- .dataNodesAutoAdjustScaleUp(0)
- .dataNodesAutoAdjustScaleDown(0)
+ .dataNodesAutoAdjustScaleUp(scaleUp)
+ .dataNodesAutoAdjustScaleDown(scaleDown)
.build()
).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
}
+
+ private static void blockUpdate(MetaStorageManager metaStorageManager,
ByteArray key) {
+ when(metaStorageManager.invoke(argThat(iif -> {
+ If iif1 = MetaStorageWriteHandler.toIf(iif);
+
+ byte[] keyScaleUp = key.bytes();
+
+ return iif1.andThen().update().operations().stream().anyMatch(op
-> Arrays.equals(keyScaleUp, op.key()));
+ }))).thenThrow(new RuntimeException("Expected"));
+ }
}