This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 62d02d7e14 IGNITE-19581 Add restoring of zones state after restart 
taking into account zone's filter updates (#2288)
62d02d7e14 is described below

commit 62d02d7e14341ec1db220ad07b3b64b18d395b01
Author: Mirza Aliev <[email protected]>
AuthorDate: Thu Jul 6 11:33:10 2023 +0400

    IGNITE-19581 Add restoring of zones state after restart taking into account 
zone's filter updates (#2288)
---
 .../distributionzones/DistributionZoneManager.java |  99 ++++++++--
 .../distributionzones/DistributionZonesUtil.java   |  19 ++
 .../rebalance/DistributionZoneRebalanceEngine.java |   9 +-
 .../DistributionZoneManagerAlterFilterTest.java    |  50 +----
 .../DistributionZoneManagerFilterTest.java         |  29 +--
 .../DistributionZonesTestUtil.java                 |  10 +-
 ...niteDistributionZoneManagerNodeRestartTest.java | 202 ++++++++++++++++-----
 7 files changed, 270 insertions(+), 148 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 5673101862..03ce9cbaf0 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -44,6 +44,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneTopologyAugmentationVault;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesFilterUpdateRevision;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesGlobalStateRevision;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
@@ -753,7 +754,22 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             int zoneId = ctx.newValue(DistributionZoneView.class).zoneId();
 
-            saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision());
+            VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
+
+            long eventRevision = ctx.storageRevision();
+
+            if (filterUpdateRevision != null) {
+                // This means that we have already handled event with this 
revision.
+                // It is possible when node was restarted after this listener 
completed,
+                // but applied revision didn't have time to be propagated to 
the Vault.
+                if (bytesToLong(filterUpdateRevision.value()) >= 
eventRevision) {
+                    return completedFuture(null);
+                }
+            }
+
+            vaultMgr.put(zonesFilterUpdateRevision(), 
longToBytes(eventRevision)).join();
+
+            saveDataNodesToMetaStorageOnScaleUp(zoneId, eventRevision);
 
             return completedFuture(null);
         };
@@ -826,29 +842,74 @@ public class DistributionZoneManager implements 
IgniteComponent {
 
             Optional<Long> maxScaleUpRevision = 
zoneState.highestRevision(true);
 
-            // Take the highest revision from the topologyAugmentationMap and 
schedule scale up/scale down,
-            // meaning that all augmentation of nodes will be taken into 
account in newly created timers. If the augmentation has already
-            // been proposed to data nodes in the metastorage before restart,
-            // that means we have updated corresponding trigger key and it's 
value will be greater than
-            // the highest revision from the topologyAugmentationMap, and 
current timer won't affect data nodes.
-            maxScaleUpRevision.ifPresent(
-                    rev -> zoneState.rescheduleScaleUp(
-                            zone.dataNodesAutoAdjustScaleUp(),
-                            () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, 
rev)
-                    )
-            );
-
             Optional<Long> maxScaleDownRevision = 
zoneState.highestRevision(false);
 
-            maxScaleDownRevision.ifPresent(
-                    rev -> zoneState.rescheduleScaleDown(
-                            zone.dataNodesAutoAdjustScaleDown(),
-                            () -> 
saveDataNodesToMetaStorageOnScaleDown(zoneId, rev)
-                    )
-            );
+            VaultEntry filterUpdateRevision = 
vaultMgr.get(zonesFilterUpdateRevision()).join();
+
+            restoreTimers(zone, zoneState, maxScaleUpRevision, 
maxScaleDownRevision, filterUpdateRevision);
         }
     }
 
+    /**
+     * Restores timers that were scheduled before a node's restart.
+     * Take the highest revision from the {@link 
ZoneState#topologyAugmentationMap()}, compare it with the revision
+     * of the last update of the zone's filter and schedule scale up/scale 
down timers. Filter revision is taken into account because
+     * any filter update triggers immediate scale up.
+     *
+     * @param zone Zone's view.
+     * @param zoneState Zone's state from Distribution Zone Manager
+     * @param maxScaleUpRevisionOptional Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node joins.
+     * @param maxScaleDownRevisionOptional Max revision from the {@link 
ZoneState#topologyAugmentationMap()} for node removals.
+     * @param filterUpdateRevisionVaultEntry Revision of the last update of 
the zone's filter.
+     */
+    private void restoreTimers(
+            DistributionZoneView zone,
+            ZoneState zoneState,
+            Optional<Long> maxScaleUpRevisionOptional,
+            Optional<Long> maxScaleDownRevisionOptional,
+            VaultEntry filterUpdateRevisionVaultEntry
+    ) {
+        int zoneId = zone.zoneId();
+
+        maxScaleUpRevisionOptional.ifPresent(
+                maxScaleUpRevision -> {
+                    if (filterUpdateRevisionVaultEntry != null) {
+                        long filterUpdateRevision = 
bytesToLong(filterUpdateRevisionVaultEntry.value());
+
+                        // Immediately trigger scale up, that was planned to 
be invoked before restart.
+                        // If this invoke was successful before restart, then 
current call just will be skipped.
+                        saveDataNodesToMetaStorageOnScaleUp(zoneId, 
filterUpdateRevision);
+
+                        if (maxScaleUpRevision < filterUpdateRevision) {
+                            // Don't need to trigger additional scale up for 
the scenario, when filter update event happened after the last
+                            // node join event.
+
+                            // TODO: IGNITE-19506 Think carefully for the 
scenario when scale up timer was immediate before restart and
+                            // causality data nodes is implemented.
+                            return;
+                        }
+                    }
+
+                    // Take the highest revision from the 
topologyAugmentationMap and schedule scale up/scale down,
+                    // meaning that all augmentations of nodes will be taken 
into account in newly created timers.
+                    // If augmentations have already been proposed to data 
nodes in the metastorage before restart,
+                    // that means we have updated corresponding trigger key 
and it's value will be greater than
+                    // the highest revision from the topologyAugmentationMap, 
and current timer won't affect data nodes.
+                    zoneState.rescheduleScaleUp(
+                            zone.dataNodesAutoAdjustScaleUp(),
+                            () -> saveDataNodesToMetaStorageOnScaleUp(zoneId, 
maxScaleUpRevision)
+                    );
+                }
+        );
+
+        maxScaleDownRevisionOptional.ifPresent(
+                maxScaleDownRevision -> zoneState.rescheduleScaleDown(
+                        zone.dataNodesAutoAdjustScaleDown(),
+                        () -> saveDataNodesToMetaStorageOnScaleDown(zoneId, 
maxScaleDownRevision)
+                )
+        );
+    }
+
     /**
      * Method initialise data nodes value for the specified zone, also sets 
{@code revision} to the
      * {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)}, {@link 
DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} and
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 7f87409174..2d823b30ed 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -84,6 +84,9 @@ public class DistributionZonesUtil {
     /** Key value for zones' global state revision in vault. */
     private static final String DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT 
= "vault.distributionZones.globalState.revision";
 
+    /** Key value for zones' filter update revision in vault. */
+    private static final String 
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT = 
"vault.distributionZones.filterUpdate.revision";
+
     /** Key prefix for zones' logical topology nodes. */
     private static final String DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY = 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX + "nodes";
 
@@ -112,6 +115,10 @@ public class DistributionZonesUtil {
     private static final ByteArray 
DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY =
             new ByteArray(DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT);
 
+    /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT}. */
+    private static final ByteArray 
DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY =
+            new ByteArray(DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT);
+
     /** ByteArray representation of {@link 
DistributionZonesUtil#DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION}. */
     private static final ByteArray 
DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION_KEY =
             new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_VERSION);
@@ -237,6 +244,13 @@ public class DistributionZonesUtil {
         return DISTRIBUTION_ZONES_GLOBAL_STATE_REVISION_VAULT_KEY;
     }
 
+    /**
+     * The key represents the last revision of the zone's filter update.
+     */
+    public static ByteArray zonesFilterUpdateRevision() {
+        return DISTRIBUTION_ZONES_FILTER_UPDATE_REVISION_VAULT_KEY;
+    }
+
     /**
      * The key that represents {@link ZoneState#topologyAugmentationMap()} in 
the Vault.
      */
@@ -392,6 +406,11 @@ public class DistributionZonesUtil {
         return dataNodesMap;
     }
 
+    @Nullable
+    public static Set<Node> parseDataNodes(byte[] dataNodesBytes) {
+        return dataNodesBytes == null ? null : 
dataNodes(fromBytes(dataNodesBytes));
+    }
+
     /**
      * Returns data nodes from the meta storage entry or empty map if the 
value is null.
      *
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index d2fed35132..89c45d0dd4 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -22,10 +22,10 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.toList;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.parseDataNodes;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
@@ -48,11 +48,9 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.schema.configuration.TableView;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Zone rebalance manager.
@@ -228,11 +226,6 @@ public class DistributionZoneRebalanceEngine {
         };
     }
 
-    @Nullable
-    private static Set<Node> parseDataNodes(byte[] dataNodesBytes) {
-        return dataNodesBytes == null ? null : 
dataNodes(ByteUtils.fromBytes(dataNodesBytes));
-    }
-
     /**
      * Listener of replicas configuration changes.
      *
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
index e44b9a8b48..3ffd4167ff 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
@@ -36,7 +36,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.metastorage.server.If;
@@ -114,12 +113,7 @@ public class DistributionZoneManagerAlterFilterTest  
extends BaseDistributionZon
         ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(C, 
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C, 
D), TIMEOUT_MILLIS);
     }
 
     /**
@@ -155,12 +149,7 @@ public class DistributionZoneManagerAlterFilterTest  
extends BaseDistributionZon
         ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(), 
TIMEOUT_MILLIS);
     }
 
     /**
@@ -201,12 +190,7 @@ public class DistributionZoneManagerAlterFilterTest  
extends BaseDistributionZon
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
         // Node C is still in data nodes because altering a filter triggers 
only immediate scale up.
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(C, 
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C, 
D), TIMEOUT_MILLIS);
 
         // Check that scale down task is still scheduled.
         
assertNotNull(distributionZoneManager.zonesState().get(zoneId).scaleUpTask());
@@ -221,12 +205,7 @@ public class DistributionZoneManagerAlterFilterTest  
extends BaseDistributionZon
                         .build()
         ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                
Set.of(D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(D), 
TIMEOUT_MILLIS);
     }
 
     /**
@@ -284,12 +263,7 @@ public class DistributionZoneManagerAlterFilterTest  
extends BaseDistributionZon
 
         // Check that node E, that was added while filter's altering, is not 
propagated to data nodes.
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(C, 
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C, 
D), TIMEOUT_MILLIS);
 
         // Assert that scheduled timer was not canceled because of immediate 
scale up after filter altering.
         
assertNotNull(distributionZoneManager.zonesState().get(zoneId).scaleUpTask());
@@ -304,12 +278,7 @@ public class DistributionZoneManagerAlterFilterTest  
extends BaseDistributionZon
         ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
         // Check that node E, that was added after filter's altering, was 
added only after altering immediate scale up.
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(C, D, 
e).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(C, 
D, e), TIMEOUT_MILLIS);
     }
 
     /**
@@ -354,12 +323,7 @@ public class DistributionZoneManagerAlterFilterTest  
extends BaseDistributionZon
             ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
         }
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(A, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A, 
C), TIMEOUT_MILLIS);
     }
 
     private static Stream<Arguments> provideArgumentsForFilterAlteringTests() {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
index afcff858dc..7e977bd885 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTest
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
@@ -65,12 +64,7 @@ public class DistributionZoneManagerFilterTest extends 
BaseDistributionZoneManag
 
         topology.putNode(D);
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                ZONE_ID,
-                Set.of(A, C, 
D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, ZONE_ID, Set.of(A, 
C, D), TIMEOUT_MILLIS);
     }
 
     @Test
@@ -79,12 +73,7 @@ public class DistributionZoneManagerFilterTest extends 
BaseDistributionZoneManag
 
         topology.removeNodes(Set.of(C));
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                ZONE_ID,
-                
Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, ZONE_ID, 
Set.of(A), TIMEOUT_MILLIS);
     }
 
     @Test
@@ -100,12 +89,7 @@ public class DistributionZoneManagerFilterTest extends 
BaseDistributionZoneManag
 
         topology.putNode(newB);
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                ZONE_ID,
-                Set.of(A, newB, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, ZONE_ID, Set.of(A, 
newB, C), TIMEOUT_MILLIS);
     }
 
     /**
@@ -131,11 +115,6 @@ public class DistributionZoneManagerFilterTest extends 
BaseDistributionZoneManag
                         .build()
         ).get(10_000, TimeUnit.MILLISECONDS);
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                ZONE_ID,
-                Set.of(A, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, ZONE_ID, Set.of(A, 
C), TIMEOUT_MILLIS);
     }
 }
diff --git 
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
 
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 92bd01f763..1cdabe81d1 100644
--- 
a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++ 
b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -63,6 +63,7 @@ import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageChange
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
 
@@ -465,21 +466,24 @@ public class DistributionZonesTestUtil {
     public static void assertDataNodesFromManager(
             DistributionZoneManager distributionZoneManager,
             int zoneId,
-            @Nullable Set<String> expectedValue,
+            @Nullable Set<LogicalNode> expectedValue,
             long timeoutMillis
     ) throws InterruptedException {
+        Set<String> expectedValueNames =
+                expectedValue == null ? null : 
expectedValue.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
         boolean success = waitForCondition(() -> {
             // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
             Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
 
-            return Objects.equals(dataNodes, expectedValue);
+            return Objects.equals(dataNodes, expectedValueNames);
         }, timeoutMillis);
 
         // We do a second check simply to print a nice error message in case 
the condition above is not achieved.
         if (!success) {
             Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
 
-            assertThat(dataNodes, is(expectedValue));
+            assertThat(dataNodes, is(expectedValueNames));
         }
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 502af7de66..d744178a3a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
@@ -31,6 +32,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
@@ -80,6 +82,7 @@ import 
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkAddress;
@@ -275,12 +278,7 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
         ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                1,
-                Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, 1, Set.of(A, B, 
C), TIMEOUT_MILLIS);
 
         Map<String, Map<String, String>> nodeAttributesBeforeRestart = 
distributionZoneManager.nodesAttributes();
 
@@ -335,12 +333,7 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
         partialNode.logicalTopology().putNode(C);
 
         // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change 
this to the causality versioned call to dataNodes.
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                DEFAULT_ZONE_ID,
-                Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, DEFAULT_ZONE_ID, 
Set.of(A, B, C), TIMEOUT_MILLIS);
 
         MetaStorageManager metaStorageManager = 
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
 
@@ -362,13 +355,13 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
         DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
 
-        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName, 
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
 
         partialNode.logicalTopology().putNode(A);
         partialNode.logicalTopology().putNode(B);
         partialNode.logicalTopology().removeNodes(Set.of(B));
 
-        assertDataNodesFromManager(distributionZoneManager, zoneId, 
Set.of(A.name()), TIMEOUT_MILLIS);
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A), 
TIMEOUT_MILLIS);
 
         partialNode.stop();
 
@@ -376,7 +369,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
         distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
 
-        assertDataNodesFromManager(distributionZoneManager, zoneId, 
Set.of(A.name()), TIMEOUT_MILLIS);
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A), 
TIMEOUT_MILLIS);
     }
 
     @ParameterizedTest
@@ -386,7 +379,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
         DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
 
-        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName, 
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
 
         partialNode.logicalTopology().putNode(A);
         partialNode.logicalTopology().putNode(B);
@@ -394,19 +387,14 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
         assertDataNodesFromManager(
                 distributionZoneManager,
                 zoneId,
-                Set.of(A, 
B).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                Set.of(A, B),
                 TIMEOUT_MILLIS
         );
 
         MetaStorageManager metaStorageManager = 
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
 
-        when(metaStorageManager.invoke(argThat(iif -> {
-            If iif1 = MetaStorageWriteHandler.toIf(iif);
-
-            byte[] keyScaleUp = zoneScaleUpChangeTriggerKey(zoneId).bytes();
-
-            return iif1.andThen().update().operations().stream().anyMatch(op 
-> Arrays.equals(keyScaleUp, op.key()));
-        }))).thenThrow(new RuntimeException("Expected"));
+        // Block scale up
+        blockUpdate(metaStorageManager, zoneScaleUpChangeTriggerKey(zoneId));
 
         partialNode.logicalTopology().putNode(C);
         partialNode.logicalTopology().removeNodes(Set.of(B));
@@ -414,7 +402,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
         assertDataNodesFromManager(
                 distributionZoneManager,
                 zoneId,
-                
Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                Set.of(A),
                 TIMEOUT_MILLIS
         );
 
@@ -424,32 +412,139 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
         distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(A, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A, 
C), TIMEOUT_MILLIS);
     }
 
     @ParameterizedTest
     @MethodSource("provideArgumentsRestartTests")
-    public void testScaleDownTimerIsRestoredAfterRestart(int zoneId, String 
zoneName) throws Exception {
+    public void testScaleUpTriggeredByFilterUpdateIsRestoredAfterRestart(int 
zoneId, String zoneName) throws Exception {
         PartialNode partialNode = startPartialNode(0);
 
         DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
 
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName, 
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
+
+        partialNode.logicalTopology().putNode(A);
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A), 
TIMEOUT_MILLIS);
+
+        distributionZoneManager.alterZone(
+                zoneName,
+                new DistributionZoneConfigurationParameters.Builder(zoneName)
+                        .dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+                        .build()
+        ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+        partialNode.logicalTopology().putNode(B);
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A), 
TIMEOUT_MILLIS);
+
         MetaStorageManager metaStorageManager = 
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
 
-        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName);
+        blockUpdate(metaStorageManager, zoneScaleUpChangeTriggerKey(zoneId));
 
-        when(metaStorageManager.invoke(argThat(iif -> {
-            If iif1 = MetaStorageWriteHandler.toIf(iif);
+        // Only Node B passes the filter
+        String filter = "$[?(@.dataRegionSize > 10)]";
 
-            byte[] keyScaleDown = 
zoneScaleDownChangeTriggerKey(zoneId).bytes();
+        distributionZoneManager.alterZone(
+                zoneName,
+                new DistributionZoneConfigurationParameters.Builder(zoneName)
+                        .filter(filter)
+                        .build()
+        ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
-            return iif1.andThen().update().operations().stream().anyMatch(op 
-> Arrays.equals(keyScaleDown, op.key()));
-        }))).thenThrow(new RuntimeException("Expected"));
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A), 
TIMEOUT_MILLIS);
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(B), 
TIMEOUT_MILLIS);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideArgumentsRestartTests")
+    public void 
testScaleUpsTriggeredByFilterUpdateAndNodeJoinAreRestoredAfterRestart(int 
zoneId, String zoneName) throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName, 
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
+
+        partialNode.logicalTopology().putNode(A);
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A), 
TIMEOUT_MILLIS);
+
+        MetaStorageManager metaStorageManager = 
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+        blockUpdate(metaStorageManager, zoneScaleUpChangeTriggerKey(zoneId));
+
+        distributionZoneManager.alterZone(
+                zoneName,
+                new DistributionZoneConfigurationParameters.Builder(zoneName)
+                        .dataNodesAutoAdjustScaleUp(100)
+                        .build()
+        ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+        partialNode.logicalTopology().putNode(B);
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A), 
TIMEOUT_MILLIS);
+
+        // Only Node B and C passes the filter
+        String filter = "$[?(@.dataRegionSize > 10)]";
+
+        distributionZoneManager.alterZone(
+                zoneName,
+                new DistributionZoneConfigurationParameters.Builder(zoneName)
+                        .filter(filter)
+                        .build()
+        ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+        partialNode.logicalTopology().putNode(C);
+
+        partialNode.logicalTopology().removeNodes(Set.of(A));
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(), 
TIMEOUT_MILLIS);
+
+        partialNode.stop();
+
+        partialNode = startPartialNode(0);
+
+        distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        // Immediate timer triggered by filter update
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(B), 
TIMEOUT_MILLIS);
+
+        ZoneState zoneState = distributionZoneManager.zonesState().get(zoneId);
+
+        // Timer scheduled after join of the node C
+        assertNotNull(zoneState.scaleUpTask());
+
+        distributionZoneManager.alterZone(
+                zoneName,
+                new DistributionZoneConfigurationParameters.Builder(zoneName)
+                        .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                        .build()
+        ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(B, 
C), TIMEOUT_MILLIS);
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideArgumentsRestartTests")
+    public void testScaleDownTimerIsRestoredAfterRestart(int zoneId, String 
zoneName) throws Exception {
+        PartialNode partialNode = startPartialNode(0);
+
+        DistributionZoneManager distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
+
+        createZoneOrAlterDefaultZone(distributionZoneManager, zoneName, 
IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE);
+
+        MetaStorageManager metaStorageManager = 
findComponent(partialNode.startedComponents(), MetaStorageManager.class);
+
+        // Block scale down
+        blockUpdate(metaStorageManager, zoneScaleDownChangeTriggerKey(zoneId));
 
         partialNode.logicalTopology().putNode(A);
         partialNode.logicalTopology().putNode(B);
@@ -459,7 +554,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
         assertDataNodesFromManager(
                 distributionZoneManager,
                 zoneId,
-                Set.of(A, B, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+                Set.of(A, B, C),
                 TIMEOUT_MILLIS
         );
 
@@ -469,12 +564,7 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
         distributionZoneManager = 
findComponent(partialNode.startedComponents(), DistributionZoneManager.class);
 
-        assertDataNodesFromManager(
-                distributionZoneManager,
-                zoneId,
-                Set.of(A, 
C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
-                TIMEOUT_MILLIS
-        );
+        assertDataNodesFromManager(distributionZoneManager, zoneId, Set.of(A, 
C), TIMEOUT_MILLIS);
     }
 
     private static Stream<Arguments> provideArgumentsRestartTests() {
@@ -488,14 +578,16 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
 
     private static void createZoneOrAlterDefaultZone(
             DistributionZoneManager distributionZoneManager,
-            String zoneName
+            String zoneName,
+            int scaleUp,
+            int scaleDown
     ) throws Exception {
         if (zoneName.equals(DEFAULT_ZONE_NAME)) {
             distributionZoneManager.alterZone(
                     DEFAULT_ZONE_NAME,
                     new 
DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
-                            .dataNodesAutoAdjustScaleUp(0)
-                            .dataNodesAutoAdjustScaleDown(0)
+                            .dataNodesAutoAdjustScaleUp(scaleUp)
+                            .dataNodesAutoAdjustScaleDown(scaleDown)
                             .build()
             ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
 
@@ -512,10 +604,20 @@ public class 
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
         } else {
             distributionZoneManager.createZone(
                     new 
DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
-                            .dataNodesAutoAdjustScaleUp(0)
-                            .dataNodesAutoAdjustScaleDown(0)
+                            .dataNodesAutoAdjustScaleUp(scaleUp)
+                            .dataNodesAutoAdjustScaleDown(scaleDown)
                             .build()
             ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
         }
     }
+
+    private static void blockUpdate(MetaStorageManager metaStorageManager, 
ByteArray key) {
+        when(metaStorageManager.invoke(argThat(iif -> {
+            If iif1 = MetaStorageWriteHandler.toIf(iif);
+
+            byte[] keyScaleUp = key.bytes();
+
+            return iif1.andThen().update().operations().stream().anyMatch(op 
-> Arrays.equals(keyScaleUp, op.key()));
+        }))).thenThrow(new RuntimeException("Expected"));
+    }
 }


Reply via email to