This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3e513427d8 IGNITE-23560 Retrieve
DistributionZoneManager#logicalTopology based on revision (#4774)
3e513427d8 is described below
commit 3e513427d815787a2e4f2d6202a0b8377f8dcb7d
Author: Cyrill <[email protected]>
AuthorDate: Tue Nov 26 17:12:36 2024 +0300
IGNITE-23560 Retrieve DistributionZoneManager#logicalTopology based on
revision (#4774)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../distributionzones/DistributionZoneManager.java | 36 +++++++---
.../CausalityDataNodesEngine.java | 8 +--
.../DistributionZoneCausalityDataNodesTest.java | 80 +++++++++++++++++++++-
3 files changed, 107 insertions(+), 17 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 80021304cd..64b5818670 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -181,10 +181,9 @@ public class DistributionZoneManager extends
};
/**
- * The logical topology on the last watch event.
- * It's enough to mark this field by volatile because we don't update the
collection after it is assigned to the field.
+ * The logical topology mapped to the MS revision.
*/
- private volatile Set<NodeWithAttributes> logicalTopology = emptySet();
+ private final ConcurrentSkipListMap<Long, Set<NodeWithAttributes>>
logicalTopologyByRevision = new ConcurrentSkipListMap<>();
/**
* Local mapping of {@code nodeId} -> node's attributes, where {@code
nodeId} is a node id, that changes between restarts.
@@ -520,7 +519,7 @@ public class DistributionZoneManager extends
assert prevZoneState == null : "Zone's state was created twice [zoneId
= " + zoneId + ']';
- Set<Node> dataNodes =
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
+ Set<Node> dataNodes =
logicalTopology(causalityToken).stream().map(NodeWithAttributes::node).collect(toSet());
causalityDataNodesEngine.onCreateZoneState(causalityToken, zone);
@@ -743,13 +742,13 @@ public class DistributionZoneManager extends
// that one value is not null, but other is null.
assert nodeAttributesEntry.value() != null;
- logicalTopology =
deserializeLogicalTopologySet(lastHandledTopologyEntry.value());
+ logicalTopologyByRevision.put(recoveryRevision,
deserializeLogicalTopologySet(lastHandledTopologyEntry.value()));
nodesAttributes =
DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value());
}
assert lastHandledTopologyEntry.value() == null
- ||
logicalTopology.equals(deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))
+ ||
logicalTopology(recoveryRevision).equals(deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))
: "Initial value of logical topology was changed after
initialization from the Meta Storage manager.";
assert nodeAttributesEntry.value() == null
@@ -829,15 +828,17 @@ public class DistributionZoneManager extends
* @return Future reflecting the completion of the actions needed when
logical topology was updated.
*/
private CompletableFuture<Void>
onLogicalTopologyUpdate(Set<NodeWithAttributes> newLogicalTopology, long
revision, int catalogVersion) {
+ Set<NodeWithAttributes> currentLogicalTopology =
logicalTopology(revision);
+
Set<Node> removedNodes =
- logicalTopology.stream()
+ currentLogicalTopology.stream()
.filter(node -> !newLogicalTopology.contains(node))
.map(NodeWithAttributes::node)
.collect(toSet());
Set<Node> addedNodes =
newLogicalTopology.stream()
- .filter(node -> !logicalTopology.contains(node))
+ .filter(node -> !currentLogicalTopology.contains(node))
.map(NodeWithAttributes::node)
.collect(toSet());
@@ -857,7 +858,7 @@ public class DistributionZoneManager extends
newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n));
- logicalTopology = newLogicalTopology;
+ logicalTopologyByRevision.put(revision, newLogicalTopology);
futures.add(saveRecoverableStateToMetastorage(zoneIds, revision,
newLogicalTopology));
@@ -1571,7 +1572,22 @@ public class DistributionZoneManager extends
}
public Set<NodeWithAttributes> logicalTopology() {
- return logicalTopology;
+ return logicalTopology(Long.MAX_VALUE);
+ }
+
+ /**
+ * Get logical topology for the given revision.
+ * If there is no data for revision i, return topology for the maximum
revision smaller than i.
+ *
+ * @param revision metastore revision.
+ * @return logical topology.
+ */
+ public Set<NodeWithAttributes> logicalTopology(long revision) {
+ assert revision >= 0 : revision;
+
+ Map.Entry<Long, Set<NodeWithAttributes>> entry =
logicalTopologyByRevision.floorEntry(revision);
+
+ return entry != null ? entry.getValue() : emptySet();
}
private void registerCatalogEventListenersOnStartManagerBusy() {
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
index 3f52be112a..276ca7b723 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
@@ -241,15 +241,11 @@ public class CausalityDataNodesEngine {
// Update the data nodes set with pending data from
augmentation map
subAugmentationMap.forEach((rev, augmentation) -> {
if (augmentation.addition() && rev >
scaleUpTriggerRevision && rev <= lastScaleUpRevision) {
- for (Node node : augmentation.nodes()) {
- finalDataNodes.add(node);
- }
+ finalDataNodes.addAll(augmentation.nodes());
}
if (!augmentation.addition() && rev >
scaleDownTriggerRevision && rev <= lastScaleDownRevision) {
- for (Node node : augmentation.nodes()) {
- finalDataNodes.remove(node);
- }
+ finalDataNodes.removeAll(augmentation.nodes());
}
});
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index 612a00c142..ff91559b4d 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -47,7 +47,10 @@ import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedF
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.argThat;
@@ -1027,6 +1030,77 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
checkDataNodesRepeated(expectedDataNodesOnTopologyUpdateEvent,
expectedDataNodesAfterTimersAreExpired, false);
}
+ /**
+ * Tests that Distribution Zone Manager stores logical topology update
history.
+ */
+ @Test
+ void testLogicalTopologyUpdateHistoryIsStored() throws Exception {
+ createZone(ZONE_NAME, 1, 1, null);
+
+ Set<LogicalNode> newTopology = new HashSet<>();
+
+ newTopology.add(NODE_0);
+ long topologyRevisionOneAdded =
putNodeInLogicalTopologyAndGetRevision(NODE_0, newTopology);
+ waitForCondition(() -> metaStorageManager.appliedRevision() >=
topologyRevisionOneAdded, TIMEOUT);
+
+ newTopology.add(NODE_1);
+ long topologyRevisionTwoAdded =
putNodeInLogicalTopologyAndGetRevision(NODE_1, newTopology);
+ waitForCondition(() -> metaStorageManager.appliedRevision() >=
topologyRevisionTwoAdded, TIMEOUT);
+
+ newTopology.remove(NODE_1);
+ long topologyRevisionOneRemoved =
removeNodeInLogicalTopologyAndGetRevision(Set.of(NODE_1), newTopology);
+ waitForCondition(() -> metaStorageManager.appliedRevision() >=
topologyRevisionOneRemoved, TIMEOUT);
+
+ int zoneId = getZoneId(ZONE_NAME);
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ metaStorageManager::appliedRevision,
+ catalogManager::latestCatalogVersion,
+ zoneId,
+ ONE_NODE,
+ TIMEOUT
+ );
+
+ assertEquals(ONE_NODE_NAME,
nodeNames(distributionZoneManager.logicalTopology(topologyRevisionOneAdded)));
+ assertEquals(TWO_NODES_NAMES,
nodeNames(distributionZoneManager.logicalTopology(topologyRevisionTwoAdded)));
+ assertEquals(ONE_NODE_NAME,
nodeNames(distributionZoneManager.logicalTopology(topologyRevisionOneRemoved)));
+ }
+
+ /**
+ * Tests logical topology param handling.
+ */
+ @Test
+ void testLogicalTopologyParams() throws Exception {
+ createZone(ZONE_NAME, 1, 1, null);
+
+ Set<LogicalNode> newTopology = new HashSet<>();
+
+ newTopology.add(NODE_0);
+ long topologyRevision = putNodeInLogicalTopologyAndGetRevision(NODE_0,
newTopology);
+ waitForCondition(() -> metaStorageManager.appliedRevision() >=
topologyRevision, TIMEOUT);
+
+ // topologyRevision is at least 2 as there have been other revision
updates.
+ assertTrue(topologyRevision > 2);
+ // We need this check to make sure that the topology is empty if
requested with a revision = (0, topologyRevision-1];
+
+ Set<NodeWithAttributes> topologyBeforeAddingNodes =
distributionZoneManager.logicalTopology(topologyRevision - 1);
+ assertThat(topologyBeforeAddingNodes, is(empty()));
+
+ // Same for revision = 0.
+ Set<NodeWithAttributes> topology0 =
distributionZoneManager.logicalTopology(0);
+ assertThat(topology0, is(empty()));
+
+ // Exactly one node should be present in the topology at revision =
topologyRevision.
+ assertEquals(ONE_NODE_NAME,
nodeNames(distributionZoneManager.logicalTopology(topologyRevision)));
+
+ // Same topology for the revision > topologyRevision.
+ assertEquals(ONE_NODE_NAME,
nodeNames(distributionZoneManager.logicalTopology(topologyRevision + 1)));
+
+ // Fails if revision is negative (invalid).
+ assertThrows(AssertionError.class, () ->
distributionZoneManager.logicalTopology(-1));
+ }
+
private void checkDataNodesRepeated(
Map<Integer, Set<String>> expectedDataNodesOnTopologyUpdateEvent,
Map<Integer, Set<String>> expectedDataNodesAfterTimersAreExpired,
@@ -1439,7 +1513,7 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
}
}
- Set<String> nodeNames =
newLogicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet());
+ Set<String> nodeNames = nodeNames(newLogicalTopology);
if (topologyRevisions.containsKey(nodeNames)) {
topologyRevisions.remove(nodeNames).complete(revision);
@@ -1454,6 +1528,10 @@ public class DistributionZoneCausalityDataNodesTest
extends BaseDistributionZone
};
}
+ private static Set<String> nodeNames(Set<NodeWithAttributes>
newLogicalTopology) {
+ return
newLogicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet());
+ }
+
/**
* Creates a data nodes watch listener which completes futures from {@code
zoneDataNodesRevisions}
* when receives event with expected data nodes.