This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d5d8dde793 IGNITE-20317 Return metastorage invokes for zones changes 
in handlers, immediately recalculate data nodes when scale up/down is 
immediate. (#2685)
d5d8dde793 is described below

commit d5d8dde7939c66637be66e6b62966fc9429de068
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Oct 16 11:11:01 2023 +0400

    IGNITE-20317 Return metastorage invokes for zones changes in handlers, 
immediately recalculate data nodes when scale up/down is immediate. (#2685)
---
 .../distributionzones/DistributionZoneManager.java | 144 ++++++++++++---------
 .../rebalance/DistributionZoneRebalanceEngine.java | 128 +++++++++---------
 .../DistributionZoneCausalityDataNodesTest.java    |   2 -
 3 files changed, 149 insertions(+), 125 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index a65a13eb16..ae1e59ef3b 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -24,6 +24,7 @@ import static 
java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
@@ -150,17 +151,17 @@ public class DistributionZoneManager implements 
IgniteComponent {
     private final LogicalTopologyEventListener topologyEventListener = new 
LogicalTopologyEventListener() {
         @Override
         public void onNodeJoined(LogicalNode joinedNode, 
LogicalTopologySnapshot newTopology) {
-            updateLogicalTopologyInMetaStorage(newTopology, false);
+            updateLogicalTopologyInMetaStorage(newTopology);
         }
 
         @Override
         public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
-            updateLogicalTopologyInMetaStorage(newTopology, false);
+            updateLogicalTopologyInMetaStorage(newTopology);
         }
 
         @Override
         public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
-            updateLogicalTopologyInMetaStorage(newTopology, true);
+            updateLogicalTopologyInMetaStorage(newTopology);
         }
     };
 
@@ -295,6 +296,14 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         long causalityToken = parameters.causalityToken();
 
+        if (newScaleUp == IMMEDIATE_TIMER_VALUE) {
+            return saveDataNodesToMetaStorageOnScaleUp(zoneId, 
causalityToken).thenRun(() -> {
+                // TODO: causalityOnUpdateScaleUp will be removed 
https://issues.apache.org/jira/browse/IGNITE-20604,
+                // catalog must be used instead
+                
causalityDataNodesEngine.causalityOnUpdateScaleUp(causalityToken, zoneId, 
IMMEDIATE_TIMER_VALUE);
+            });
+        }
+
         // It is safe to zonesTimers.get(zoneId) in term of NPE because meta 
storage notifications are one-threaded
         // and this map will be initialized on a manager start or with catalog 
notification
         ZoneState zoneState = zonesState.get(zoneId);
@@ -329,6 +338,14 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         long causalityToken = parameters.causalityToken();
 
+        if (newScaleDown == IMMEDIATE_TIMER_VALUE) {
+            return saveDataNodesToMetaStorageOnScaleDown(zoneId, 
causalityToken).thenRun(() -> {
+                // TODO: causalityOnUpdateScaleDown will be removed 
https://issues.apache.org/jira/browse/IGNITE-20604,
+                // catalog must be used instead
+                
causalityDataNodesEngine.causalityOnUpdateScaleDown(causalityToken, zoneId, 
IMMEDIATE_TIMER_VALUE);
+            });
+        }
+
         // It is safe to zonesTimers.get(zoneId) in term of NPE because meta 
storage notifications are one-threaded
         // and this map will be initialized on a manager start or with catalog 
notification
         ZoneState zoneState = zonesState.get(zoneId);
@@ -387,8 +404,9 @@ public class DistributionZoneManager implements 
IgniteComponent {
      *
      * @param zone Zone descriptor.
      * @param causalityToken Causality token.
+     * @return Future reflecting the completion of creation or restoring a 
zone.
      */
-    private void createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long 
causalityToken) {
+    private CompletableFuture<Void> 
createOrRestoreZoneStateBusy(CatalogZoneDescriptor zone, long causalityToken) {
         int zoneId = zone.id();
 
         VaultEntry topologyAugmentationMapFromVault = 
vaultMgr.get(zoneTopologyAugmentationVault(zoneId)).join();
@@ -403,7 +421,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             Set<Node> dataNodes = 
logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet());
 
-            initDataNodesAndTriggerKeysInMetaStorage(zoneId, causalityToken, 
dataNodes);
+            return initDataNodesAndTriggerKeysInMetaStorage(zoneId, 
causalityToken, dataNodes)
+                    .thenRun(() -> 
causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, zone));
         } else {
             // Restart case, when topologyAugmentationMap has already been 
saved during a cluster work.
             ConcurrentSkipListMap<Long, Augmentation> topologyAugmentationMap 
= fromBytes(topologyAugmentationMapFromVault.value());
@@ -424,6 +443,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
 
         causalityDataNodesEngine.onCreateOrRestoreZoneState(causalityToken, 
zone);
+
+        return completedFuture(null);
     }
 
     /**
@@ -493,8 +514,9 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * @param zoneId Unique id of a zone
      * @param revision Revision of an event that has triggered this method.
      * @param dataNodes Data nodes.
+     * @return Future reflecting the completion of initialisation of zone's 
keys in meta storage.
      */
-    private void initDataNodesAndTriggerKeysInMetaStorage(
+    private CompletableFuture<Void> initDataNodesAndTriggerKeysInMetaStorage(
             int zoneId,
             long revision,
             Set<Node> dataNodes
@@ -512,27 +534,32 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             Iif iif = iif(triggerKeyCondition, dataNodesAndTriggerKeyUpd, 
ops().yield(false));
 
-            metaStorageManager.invoke(iif).whenComplete((res, e) -> {
-                if (e != null) {
-                    LOG.error(
-                            "Failed to update zones' dataNodes value [zoneId = 
{}, dataNodes = {}, revision = {}]",
-                            e,
-                            zoneId,
-                            dataNodes,
-                            revision
-                    );
-                } else if (res.getAsBoolean()) {
-                    LOG.info("Update zones' dataNodes value [zoneId = {}, 
dataNodes = {}, revision = {}]",
-                            zoneId, dataNodes, revision);
-                } else {
-                    LOG.debug(
-                            "Failed to update zones' dataNodes value [zoneId = 
{}, dataNodes = {}, revision = {}]",
-                            zoneId,
-                            dataNodes,
-                            revision
-                    );
-                }
-            });
+            return metaStorageManager.invoke(iif)
+                    .thenApply(StatementResult::getAsBoolean)
+                    .whenComplete((invokeResult, e) -> {
+                        if (e != null) {
+                            LOG.error(
+                                    "Failed to update zones' dataNodes value 
[zoneId = {}, dataNodes = {}, revision = {}]",
+                                    e,
+                                    zoneId,
+                                    dataNodes,
+                                    revision
+                            );
+                        } else if (invokeResult) {
+                            LOG.info("Update zones' dataNodes value [zoneId = 
{}, dataNodes = {}, revision = {}]",
+                                    zoneId,
+                                    dataNodes,
+                                    revision
+                            );
+                        } else {
+                            LOG.debug(
+                                    "Failed to update zones' dataNodes value 
[zoneId = {}, dataNodes = {}, revision = {}]",
+                                    zoneId,
+                                    dataNodes,
+                                    revision
+                            );
+                        }
+                    }).thenCompose((ignored) -> completedFuture(null));
         } finally {
             busyLock.leaveBusy();
         }
@@ -544,7 +571,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * @param zoneId Unique id of a zone
      * @param revision Revision of an event that has triggered this method.
      */
-    private void removeTriggerKeysAndDataNodes(int zoneId, long revision) {
+    private CompletableFuture<Void> removeTriggerKeysAndDataNodes(int zoneId, 
long revision) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -556,20 +583,23 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             Iif iif = iif(triggerKeyCondition, removeKeysUpd, 
ops().yield(false));
 
-            metaStorageManager.invoke(iif).whenComplete((res, e) -> {
-                if (e != null) {
-                    LOG.error(
-                            "Failed to delete zone's dataNodes keys [zoneId = 
{}, revision = {}]",
-                            e,
-                            zoneId,
-                            revision
-                    );
-                } else if (res.getAsBoolean()) {
-                    LOG.info("Delete zone's dataNodes keys [zoneId = {}, 
revision = {}]", zoneId, revision);
-                } else {
-                    LOG.debug("Failed to delete zone's dataNodes keys [zoneId 
= {}, revision = {}]", zoneId, revision);
-                }
-            });
+            return metaStorageManager.invoke(iif)
+                    .thenApply(StatementResult::getAsBoolean)
+                    .whenComplete((invokeResult, e) -> {
+                        if (e != null) {
+                            LOG.error(
+                                    "Failed to delete zone's dataNodes keys 
[zoneId = {}, revision = {}]",
+                                    e,
+                                    zoneId,
+                                    revision
+                            );
+                        } else if (invokeResult) {
+                            LOG.info("Delete zone's dataNodes keys [zoneId = 
{}, revision = {}]", zoneId, revision);
+                        } else {
+                            LOG.debug("Failed to delete zone's dataNodes keys 
[zoneId = {}, revision = {}]", zoneId, revision);
+                        }
+                    })
+                    .thenCompose(ignored -> completedFuture(null));
         } finally {
             busyLock.leaveBusy();
         }
@@ -580,10 +610,8 @@ public class DistributionZoneManager implements 
IgniteComponent {
      * in meta storage.
      *
      * @param newTopology Logical topology snapshot.
-     * @param topologyLeap Flag that indicates whether this updates was 
trigger by
-     *                     {@link 
LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} or not.
      */
-    private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot 
newTopology, boolean topologyLeap) {
+    private void updateLogicalTopologyInMetaStorage(LogicalTopologySnapshot 
newTopology) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -597,12 +625,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
                 // Very first start of the cluster, so we just initialize 
zonesLogicalTopologyVersionKey
                 updateCondition = notExists(zonesLogicalTopologyVersionKey());
             } else {
-                if (topologyLeap) {
-                    updateCondition = 
value(zonesLogicalTopologyVersionKey()).lt(longToBytes(newTopology.version()));
-                } else {
-                    // This condition may be stronger, as far as we receive 
topology events one by one.
-                    updateCondition = 
value(zonesLogicalTopologyVersionKey()).eq(longToBytes(newTopology.version() - 
1));
-                }
+                updateCondition = 
value(zonesLogicalTopologyVersionKey()).lt(longToBytes(newTopology.version()));
             }
 
             Iif iff = iif(
@@ -1358,17 +1381,14 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             CreateZoneEventParameters params = (CreateZoneEventParameters) 
parameters;
 
-            createOrRestoreZoneStateBusy(params.zoneDescriptor(), 
params.causalityToken());
-
-            return completedFuture(false);
+            return createOrRestoreZoneStateBusy(params.zoneDescriptor(), 
params.causalityToken())
+                    .thenCompose((ignored) -> completedFuture(false));
         }));
 
         catalogManager.listen(ZONE_DROP, (parameters, exception) -> 
inBusyLock(busyLock, () -> {
             assert exception == null : parameters;
 
-            onDropZoneBusy((DropZoneEventParameters) parameters);
-
-            return completedFuture(false);
+            return onDropZoneBusy((DropZoneEventParameters) 
parameters).thenCompose((ignored) -> completedFuture(false));
         }));
 
         catalogManager.listen(ZONE_ALTER, new 
ManagerCatalogAlterZoneEventListener());
@@ -1409,7 +1429,7 @@ public class DistributionZoneManager implements 
IgniteComponent {
         }
     }
 
-    private void onDropZoneBusy(DropZoneEventParameters parameters) {
+    private CompletableFuture<Void> onDropZoneBusy(DropZoneEventParameters 
parameters) {
         int zoneId = parameters.zoneId();
 
         long causalityToken = parameters.causalityToken();
@@ -1418,10 +1438,10 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
         zoneState.stopTimers();
 
-        removeTriggerKeysAndDataNodes(zoneId, causalityToken);
+        return removeTriggerKeysAndDataNodes(zoneId, 
causalityToken).thenRun(() -> {
+            causalityDataNodesEngine.onDelete(causalityToken, zoneId);
 
-        causalityDataNodesEngine.onDelete(causalityToken, zoneId);
-
-        zonesState.remove(zoneId);
+            zonesState.remove(zoneId);
+        });
     }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index b0f6533c0f..bbff27b34c 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -104,7 +104,7 @@ public class DistributionZoneRebalanceEngine {
             catalogService.listen(ZONE_ALTER, new 
CatalogAlterZoneEventListener(catalogService) {
                 @Override
                 protected CompletableFuture<Void> 
onReplicasUpdate(AlterZoneEventParameters parameters, int oldReplicas) {
-                    return onUpdateReplicas(parameters, oldReplicas);
+                    return onUpdateReplicas(parameters);
                 }
             });
 
@@ -158,48 +158,14 @@ public class DistributionZoneRebalanceEngine {
                         return completedFuture(null);
                     }
 
-                    for (CatalogTableDescriptor tableDescriptor : 
findTablesByZoneId(zoneId, catalogVersion)) {
-                        CompletableFuture<?>[] partitionFutures = 
RebalanceUtil.triggerAllTablePartitionsRebalance(
-                                tableDescriptor,
-                                zoneDescriptor,
-                                filteredDataNodes,
-                                evt.entryEvent().newEntry().revision(),
-                                metaStorageManager
-                        );
+                    List<CatalogTableDescriptor> tableDescriptors = 
findTablesByZoneId(zoneId, catalogVersion);
 
-                        // This set is used to deduplicate exceptions (if 
there is an exception from upstream, for instance,
-                        // when reading from MetaStorage, it will be 
encountered by every partition future) to avoid noise
-                        // in the logs.
-                        Set<Throwable> unwrappedCauses = 
ConcurrentHashMap.newKeySet();
-
-                        for (int partId = 0; partId < partitionFutures.length; 
partId++) {
-                            int finalPartId = partId;
-
-                            partitionFutures[partId].exceptionally(e -> {
-                                Throwable cause = 
ExceptionUtils.unwrapCause(e);
-
-                                if (unwrappedCauses.add(cause)) {
-                                    // The exception is specific to this 
partition.
-                                    LOG.error(
-                                            "Exception on updating assignments 
for [table={}, partition={}]",
-                                            e,
-                                            tableInfo(tableDescriptor), 
finalPartId
-                                    );
-                                } else {
-                                    // The exception is from upstream and not 
specific for this partition, so don't log the partition index.
-                                    LOG.error(
-                                            "Exception on updating assignments 
for [table={}]",
-                                            e,
-                                            tableInfo(tableDescriptor)
-                                    );
-                                }
-
-                                return null;
-                            });
-                        }
-                    }
-
-                    return completedFuture(null);
+                    return triggerPartitionsRebalanceForAllTables(
+                            evt.entryEvent().newEntry().revision(),
+                            zoneDescriptor,
+                            filteredDataNodes,
+                            tableDescriptors
+                    );
                 });
             }
 
@@ -210,7 +176,7 @@ public class DistributionZoneRebalanceEngine {
         };
     }
 
-    private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters 
parameters, int oldReplicas) {
+    private CompletableFuture<Void> onUpdateReplicas(AlterZoneEventParameters 
parameters) {
         return IgniteUtils.inBusyLockAsync(busyLock, () -> {
             int zoneId = parameters.zoneDescriptor().id();
             long causalityToken = parameters.causalityToken();
@@ -223,28 +189,68 @@ public class DistributionZoneRebalanceEngine {
 
                         List<CatalogTableDescriptor> tableDescriptors = 
findTablesByZoneId(zoneId, parameters.catalogVersion());
 
-                        List<CompletableFuture<?>> tableFutures = new 
ArrayList<>(tableDescriptors.size());
+                        return triggerPartitionsRebalanceForAllTables(
+                                causalityToken,
+                                parameters.zoneDescriptor(),
+                                dataNodes,
+                                tableDescriptors
+                        );
+                    });
+        });
+    }
 
-                        for (CatalogTableDescriptor tableDescriptor : 
tableDescriptors) {
-                            LOG.info(
-                                    "Received update for replicas number 
[table={}, oldNumber={}, newNumber={}]",
-                                    tableInfo(tableDescriptor), oldReplicas, 
parameters.zoneDescriptor().replicas()
-                            );
+    private CompletableFuture<Void> triggerPartitionsRebalanceForAllTables(
+            long revision,
+            CatalogZoneDescriptor zoneDescriptor,
+            Set<String> dataNodes,
+            List<CatalogTableDescriptor> tableDescriptors
+    ) {
+        List<CompletableFuture<?>> tableFutures = new 
ArrayList<>(tableDescriptors.size());
+
+        for (CatalogTableDescriptor tableDescriptor : tableDescriptors) {
+            CompletableFuture<?>[] partitionFutures = 
RebalanceUtil.triggerAllTablePartitionsRebalance(
+                    tableDescriptor,
+                    zoneDescriptor,
+                    dataNodes,
+                    revision,
+                    metaStorageManager
+            );
+
+            // This set is used to deduplicate exceptions (if there is an 
exception from upstream, for instance,
+            // when reading from MetaStorage, it will be encountered by every 
partition future) to avoid noise
+            // in the logs.
+            Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
+
+            for (int partId = 0; partId < partitionFutures.length; partId++) {
+                int finalPartId = partId;
+
+                partitionFutures[partId].exceptionally(e -> {
+                    Throwable cause = ExceptionUtils.unwrapCause(e);
+
+                    if (unwrappedCauses.add(cause)) {
+                        // The exception is specific to this partition.
+                        LOG.error(
+                                "Exception on updating assignments for 
[table={}, partition={}]",
+                                e,
+                                tableInfo(tableDescriptor), finalPartId
+                        );
+                    } else {
+                        // The exception is from upstream and not specific for 
this partition, so don't log the partition index.
+                        LOG.error(
+                                "Exception on updating assignments for 
[table={}]",
+                                e,
+                                tableInfo(tableDescriptor)
+                        );
+                    }
 
-                            CompletableFuture<?>[] partitionFutures = 
RebalanceUtil.triggerAllTablePartitionsRebalance(
-                                    tableDescriptor,
-                                    parameters.zoneDescriptor(),
-                                    dataNodes,
-                                    causalityToken,
-                                    metaStorageManager
-                            );
+                    return null;
+                });
+            }
 
-                            tableFutures.add(allOf(partitionFutures));
-                        }
+            tableFutures.add(allOf(partitionFutures));
+        }
 
-                        return 
allOf(tableFutures.toArray(CompletableFuture[]::new));
-                    });
-        });
+        return allOf(tableFutures.toArray(CompletableFuture[]::new));
     }
 
     private List<CatalogTableDescriptor> findTablesByZoneId(int zoneId, int 
catalogVersion) {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
index ac1ea861fb..26fcaedb4c 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/causalitydatanodes/DistributionZoneCausalityDataNodesTest.java
@@ -576,8 +576,6 @@ public class DistributionZoneCausalityDataNodesTest extends 
BaseDistributionZone
         // Create logical topology with NODE_0.
         topology.putNode(NODE_0);
 
-        putNodeInLogicalTopologyAndGetRevision(NODE_0, ONE_NODE);
-
         // Test steps.
 
         // Create a zone.

Reply via email to