This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e513427d8 IGNITE-23560 Retrieve 
DistributionZoneManager#logicalTopology based on revision (#4774)
3e513427d8 is described below

commit 3e513427d815787a2e4f2d6202a0b8377f8dcb7d
Author: Cyrill <[email protected]>
AuthorDate: Tue Nov 26 17:12:36 2024 +0300

    IGNITE-23560 Retrieve DistributionZoneManager#logicalTopology based on 
revision (#4774)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 .../distributionzones/DistributionZoneManager.java | 36 +++++++---
 .../CausalityDataNodesEngine.java                  |  8 +--
 .../DistributionZoneCausalityDataNodesTest.java    | 80 +++++++++++++++++++++-
 3 files changed, 107 insertions(+), 17 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 80021304cd..64b5818670 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -181,10 +181,9 @@ public class DistributionZoneManager extends
     };
 
     /**
-     * The logical topology on the last watch event.
-     * It's enough to mark this field by volatile because we don't update the 
collection after it is assigned to the field.
+     * The logical topology mapped to the MS revision.
      */
-    private volatile Set<NodeWithAttributes> logicalTopology = emptySet();
+    private final ConcurrentSkipListMap<Long, Set<NodeWithAttributes>> 
logicalTopologyByRevision = new ConcurrentSkipListMap<>();
 
     /**
      * Local mapping of {@code nodeId} -> node's attributes, where {@code 
nodeId} is a node id, that changes between restarts.
@@ -520,7 +519,7 @@ public class DistributionZoneManager extends
 
         assert prevZoneState == null : "Zone's state was created twice [zoneId 
= " + zoneId + ']';
 
-        Set<Node> dataNodes = 
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
+        Set<Node> dataNodes = 
logicalTopology(causalityToken).stream().map(NodeWithAttributes::node).collect(toSet());
 
         causalityDataNodesEngine.onCreateZoneState(causalityToken, zone);
 
@@ -743,13 +742,13 @@ public class DistributionZoneManager extends
             // that one value is not null, but other is null.
             assert nodeAttributesEntry.value() != null;
 
-            logicalTopology = 
deserializeLogicalTopologySet(lastHandledTopologyEntry.value());
+            logicalTopologyByRevision.put(recoveryRevision, 
deserializeLogicalTopologySet(lastHandledTopologyEntry.value()));
 
             nodesAttributes = 
DistributionZonesUtil.deserializeNodesAttributes(nodeAttributesEntry.value());
         }
 
         assert lastHandledTopologyEntry.value() == null
-                || 
logicalTopology.equals(deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))
+                || 
logicalTopology(recoveryRevision).equals(deserializeLogicalTopologySet(lastHandledTopologyEntry.value()))
                 : "Initial value of logical topology was changed after 
initialization from the Meta Storage manager.";
 
         assert nodeAttributesEntry.value() == null
@@ -829,15 +828,17 @@ public class DistributionZoneManager extends
      * @return Future reflecting the completion of the actions needed when 
logical topology was updated.
      */
     private CompletableFuture<Void> 
onLogicalTopologyUpdate(Set<NodeWithAttributes> newLogicalTopology, long 
revision, int catalogVersion) {
+        Set<NodeWithAttributes> currentLogicalTopology = 
logicalTopology(revision);
+
         Set<Node> removedNodes =
-                logicalTopology.stream()
+                currentLogicalTopology.stream()
                         .filter(node -> !newLogicalTopology.contains(node))
                         .map(NodeWithAttributes::node)
                         .collect(toSet());
 
         Set<Node> addedNodes =
                 newLogicalTopology.stream()
-                        .filter(node -> !logicalTopology.contains(node))
+                        .filter(node -> !currentLogicalTopology.contains(node))
                         .map(NodeWithAttributes::node)
                         .collect(toSet());
 
@@ -857,7 +858,7 @@ public class DistributionZoneManager extends
 
         newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n));
 
-        logicalTopology = newLogicalTopology;
+        logicalTopologyByRevision.put(revision, newLogicalTopology);
 
         futures.add(saveRecoverableStateToMetastorage(zoneIds, revision, 
newLogicalTopology));
 
@@ -1571,7 +1572,22 @@ public class DistributionZoneManager extends
     }
 
     public Set<NodeWithAttributes> logicalTopology() {
-        return logicalTopology;
+        return logicalTopology(Long.MAX_VALUE);
+    }
+
+    /**
+     * Get logical topology for the given revision.
+     * If there is no data for revision i, return topology for the maximum 
revision smaller than i.
+     *
+     * @param revision metastore revision.
+     * @return logical topology.
+     */
+    public Set<NodeWithAttributes> logicalTopology(long revision) {
+        assert revision >= 0 : revision;
+
+        Map.Entry<Long, Set<NodeWithAttributes>> entry = 
logicalTopologyByRevision.floorEntry(revision);
+
+        return entry != null ? entry.getValue() : emptySet();
     }
 
     private void registerCatalogEventListenersOnStartManagerBusy() {
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
index 3f52be112a..276ca7b723 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/CausalityDataNodesEngine.java
@@ -241,15 +241,11 @@ public class CausalityDataNodesEngine {
                 // Update the data nodes set with pending data from 
augmentation map
                 subAugmentationMap.forEach((rev, augmentation) -> {
                     if (augmentation.addition() && rev > 
scaleUpTriggerRevision && rev <= lastScaleUpRevision) {
-                        for (Node node : augmentation.nodes()) {
-                            finalDataNodes.add(node);
-                        }
+                        finalDataNodes.addAll(augmentation.nodes());
                     }
 
                     if (!augmentation.addition() && rev > 
scaleDownTriggerRevision && rev <= lastScaleDownRevision) {
-                        for (Node node : augmentation.nodes()) {
-                            finalDataNodes.remove(node);
-                        }
+                        finalDataNodes.removeAll(augmentation.nodes());
                     }
                 });
             }
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index 612a00c142..ff91559b4d 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -47,7 +47,10 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedF
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.argThat;
@@ -1027,6 +1030,77 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
         checkDataNodesRepeated(expectedDataNodesOnTopologyUpdateEvent, 
expectedDataNodesAfterTimersAreExpired, false);
     }
 
+    /**
+     * Tests that Distribution Zone Manager stores logical topology update 
history.
+     */
+    @Test
+    void testLogicalTopologyUpdateHistoryIsStored() throws Exception {
+        createZone(ZONE_NAME, 1, 1, null);
+
+        Set<LogicalNode> newTopology = new HashSet<>();
+
+        newTopology.add(NODE_0);
+        long topologyRevisionOneAdded = 
putNodeInLogicalTopologyAndGetRevision(NODE_0, newTopology);
+        waitForCondition(() -> metaStorageManager.appliedRevision() >= 
topologyRevisionOneAdded, TIMEOUT);
+
+        newTopology.add(NODE_1);
+        long topologyRevisionTwoAdded = 
putNodeInLogicalTopologyAndGetRevision(NODE_1, newTopology);
+        waitForCondition(() -> metaStorageManager.appliedRevision() >= 
topologyRevisionTwoAdded, TIMEOUT);
+
+        newTopology.remove(NODE_1);
+        long topologyRevisionOneRemoved = 
removeNodeInLogicalTopologyAndGetRevision(Set.of(NODE_1), newTopology);
+        waitForCondition(() -> metaStorageManager.appliedRevision() >= 
topologyRevisionOneRemoved, TIMEOUT);
+
+        int zoneId = getZoneId(ZONE_NAME);
+
+        assertDataNodesFromManager(
+                distributionZoneManager,
+                metaStorageManager::appliedRevision,
+                catalogManager::latestCatalogVersion,
+                zoneId,
+                ONE_NODE,
+                TIMEOUT
+        );
+
+        assertEquals(ONE_NODE_NAME, 
nodeNames(distributionZoneManager.logicalTopology(topologyRevisionOneAdded)));
+        assertEquals(TWO_NODES_NAMES, 
nodeNames(distributionZoneManager.logicalTopology(topologyRevisionTwoAdded)));
+        assertEquals(ONE_NODE_NAME, 
nodeNames(distributionZoneManager.logicalTopology(topologyRevisionOneRemoved)));
+    }
+
+    /**
+     * Tests logical topology param handling.
+     */
+    @Test
+    void testLogicalTopologyParams() throws Exception {
+        createZone(ZONE_NAME, 1, 1, null);
+
+        Set<LogicalNode> newTopology = new HashSet<>();
+
+        newTopology.add(NODE_0);
+        long topologyRevision = putNodeInLogicalTopologyAndGetRevision(NODE_0, 
newTopology);
+        waitForCondition(() -> metaStorageManager.appliedRevision() >= 
topologyRevision, TIMEOUT);
+
+        // topologyRevision is at least 2 as there have been other revision 
updates.
+        assertTrue(topologyRevision > 2);
+        // We need this check to make sure that the topology is empty if 
requested with a revision = (0, topologyRevision-1];
+
+        Set<NodeWithAttributes> topologyBeforeAddingNodes = 
distributionZoneManager.logicalTopology(topologyRevision - 1);
+        assertThat(topologyBeforeAddingNodes, is(empty()));
+
+        // Same for revision = 0.
+        Set<NodeWithAttributes> topology0 = 
distributionZoneManager.logicalTopology(0);
+        assertThat(topology0, is(empty()));
+
+        // Exactly one node should be present in the topology at revision = 
topologyRevision.
+        assertEquals(ONE_NODE_NAME, 
nodeNames(distributionZoneManager.logicalTopology(topologyRevision)));
+
+        // Same topology for the revision > topologyRevision.
+        assertEquals(ONE_NODE_NAME, 
nodeNames(distributionZoneManager.logicalTopology(topologyRevision + 1)));
+
+        // Fails if revision is negative (invalid).
+        assertThrows(AssertionError.class, () -> 
distributionZoneManager.logicalTopology(-1));
+    }
+
     private void checkDataNodesRepeated(
             Map<Integer, Set<String>> expectedDataNodesOnTopologyUpdateEvent,
             Map<Integer, Set<String>> expectedDataNodesAfterTimersAreExpired,
@@ -1439,7 +1513,7 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
                     }
                 }
 
-                Set<String> nodeNames = 
newLogicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet());
+                Set<String> nodeNames = nodeNames(newLogicalTopology);
 
                 if (topologyRevisions.containsKey(nodeNames)) {
                     topologyRevisions.remove(nodeNames).complete(revision);
@@ -1454,6 +1528,10 @@ public class DistributionZoneCausalityDataNodesTest 
extends BaseDistributionZone
         };
     }
 
+    private static Set<String> nodeNames(Set<NodeWithAttributes> 
newLogicalTopology) {
+        return 
newLogicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet());
+    }
+
     /**
      * Creates a data nodes watch listener which completes futures from {@code 
zoneDataNodesRevisions}
      * when receives event with expected data nodes.

Reply via email to