This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a338a04f7 IGNITE-19443 Skipping empty data nodes propagation to 
pending assignments after filtering (#2089)
7a338a04f7 is described below

commit 7a338a04f7dc4e7ad0aeee9a35977bc764dfb5d4
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri May 19 00:12:41 2023 +0400

    IGNITE-19443 Skipping empty data nodes propagation to pending assignments 
after filtering (#2089)
---
 .../rebalance/DistributionZoneRebalanceEngine.java |  15 +-
 .../DistributionZoneRebalanceEngineTest.java       |   8 +-
 .../internal/metastorage/MetaStorageManager.java   |   5 +
 .../metastorage/impl/MetaStorageManagerImpl.java   |   1 +
 .../zones/ItDistributionZonesFilterTest.java       | 151 ++++++++++++++++-----
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |   4 +
 6 files changed, 147 insertions(+), 37 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index e7c116e951..04c06b93d5 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -181,6 +181,10 @@ public class DistributionZoneRebalanceEngine {
                                 distributionZoneManager.nodesAttributes()
                         );
 
+                        if (filteredDataNodes.isEmpty()) {
+                            continue;
+                        }
+
                         if (zoneId == tableZoneId) {
                             TableConfiguration tableCfg = 
tables.get(tableView.name());
 
@@ -261,7 +265,6 @@ public class DistributionZoneRebalanceEngine {
                 int furCur = 0;
 
                 for (TableConfiguration tblCfg : tblsCfg) {
-
                     LOG.info("Received update for replicas number [table={}, 
oldNumber={}, newNumber={}]",
                             tblCfg.name().value(), replicasCtx.oldValue(), 
replicasCtx.newValue());
 
@@ -273,13 +276,21 @@ public class DistributionZoneRebalanceEngine {
 
                     List<Set<Assignment>> tableAssignments = 
ByteUtils.fromBytes(assignmentsBytes);
 
+                    Set<String> dataNodes = 
distributionZoneManager.dataNodes(zoneCfg.zoneId());
+
+                    if (dataNodes.isEmpty()) {
+                        futs[furCur++] = completedFuture(null);
+
+                        continue;
+                    }
+
                     for (int i = 0; i < partCnt; i++) {
                         TablePartitionId replicaGrpId = new 
TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i);
 
                         futs[furCur++] = updatePendingAssignmentsKeys(
                                 tblCfg.name().value(),
                                 replicaGrpId,
-                                
distributionZoneManager.dataNodes(zoneCfg.zoneId()),
+                                dataNodes,
                                 newReplicas,
                                 replicasCtx.storageRevision(),
                                 metaStorageManager,
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 969b05edcb..d51c123669 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -319,16 +319,16 @@ public class DistributionZoneRebalanceEngineTest extends 
IgniteAbstractTest {
 
         verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
 
-        nodes = emptySet();
+        Set<String> emptyNodes = emptySet();
 
-        watchListenerOnUpdate(1, nodes, 3);
+        watchListenerOnUpdate(1, emptyNodes, 3);
 
         zoneNodes.clear();
-        zoneNodes.put(1, nodes);
+        zoneNodes.put(1, null);
 
         checkAssignments(tablesConfiguration, zoneNodes, 
RebalanceUtil::plannedPartAssignmentsKey);
 
-        verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
     }
 
     @Test
diff --git 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 804cfd3a89..4bf608e61a 100644
--- 
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ 
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -64,6 +64,11 @@ public interface MetaStorageManager extends IgniteComponent {
      */
     CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys);
 
+    /**
+     * Inserts or updates an entry with the given key and the given value.
+     */
+    CompletableFuture<Void> put(ByteArray key, byte[] val);
+
     /**
      * Retrieves entries for the given key prefix in lexicographic order. 
Shortcut for {@link #prefix(ByteArray, long)} where
      * {@code revUpperBound = LATEST_REVISION}.
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 0a11931f7c..1e83e3c057 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -350,6 +350,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager {
      *
      * @see MetaStorageService#put(ByteArray, byte[])
      */
+    @Override
     public CompletableFuture<Void> put(ByteArray key, byte[] val) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
index 522a6e07b3..57ade2725a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTest
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,20 +31,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.Node;
+import 
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.sql.Session;
 import org.intellij.lang.annotations.Language;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -147,7 +152,6 @@ public class ItDistributionZonesFilterTest extends 
ClusterPerTestIntegrationTest
         assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
>= dataNodesEntry1.revision(), TIMEOUT_MILLIS));
 
         // We check that two nodes that pass the filter are presented in the 
stable key.
-
         assertValueInStorage(
                 metaStorageManager,
                 stablePartAssignmentsKey(partId),
@@ -176,14 +180,13 @@ public class ItDistributionZonesFilterTest extends 
ClusterPerTestIntegrationTest
      * @throws Exception If failed.
      */
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19443";)
     void testFilteredEmptyDataNodesDoNotTriggerRebalance() throws Exception {
         String filter = "'$[?(@.region == \"EU\" && @.storage == \"HDD\")]'";
 
         // This node do not pass the filter.
         IgniteImpl node0 = node(0);
 
-        // This node pass the filter
+        // This node passes the filter
         @Language("JSON") String firstNodeAttributes = 
"{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
 
         IgniteImpl node1 = startNode(1, 
createStartConfig(firstNodeAttributes));
@@ -197,13 +200,70 @@ public class ItDistributionZonesFilterTest extends 
ClusterPerTestIntegrationTest
                 + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
                 + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
 
+        MetaStorageManager metaStorageManager = (MetaStorageManager) 
IgniteTestUtils.getFieldValue(
+                node0,
+                IgniteImpl.class,
+                "metaStorageMgr"
+        );
+
+        waitDataNodeAndListenersAreHandled(metaStorageManager, 2);
+
         String tableName = "table1";
 
         session.execute(null, "CREATE TABLE " + tableName + "("
                 + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) 
WITH PRIMARY_ZONE='TEST_ZONE'");
 
-        MetaStorageManager metaStorageManager = (MetaStorageManager) 
IgniteTestUtils
-                .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+        TableManager tableManager = (TableManager) 
IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
+
+        TableImpl table = (TableImpl) tableManager.table(tableName);
+
+        TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+        // Table was created after both nodes was up, so there wasn't any 
rebalance.
+        assertPendingStableAreNull(metaStorageManager, partId);
+
+        // Stop node, that was only one, that passed the filter, so data nodes 
after filtering will be empty.
+        stopNode(1);
+
+        waitDataNodeAndListenersAreHandled(metaStorageManager, 1);
+
+        //Check that stable and pending are null, so there wasn't any 
rebalance.
+        assertPendingStableAreNull(metaStorageManager, partId);
+    }
+
+    @Test
+    void testFilteredEmptyDataNodesDoNotTriggerRebalanceOnReplicaUpdate() 
throws Exception {
+        String filter = "'$[?(@.region == \"EU\" && @.storage == \"HDD\")]'";
+
+        // This node do not pass the filter.
+        IgniteImpl node0 = node(0);
+
+        // This node passes the filter
+        @Language("JSON") String firstNodeAttributes = 
"{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
+
+        startNode(1, createStartConfig(firstNodeAttributes));
+
+        Session session = node0.sql().createSession();
+
+        session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+                + "\"REPLICAS\" = 1, "
+                + "\"PARTITIONS\" = 1, "
+                + "\"DATA_NODES_FILTER\" = " + filter + ", "
+                + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
+                + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
+
+        MetaStorageManager metaStorageManager = (MetaStorageManager) 
IgniteTestUtils.getFieldValue(
+                node0,
+                IgniteImpl.class,
+                "metaStorageMgr"
+        );
+
+        waitDataNodeAndListenersAreHandled(metaStorageManager, 2);
+
+        String tableName = "table1";
+
+        session.execute(null, "CREATE TABLE " + tableName + "("
+                + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) 
WITH PRIMARY_ZONE='TEST_ZONE'");
 
         TableManager tableManager = (TableManager) 
IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
 
@@ -211,48 +271,77 @@ public class ItDistributionZonesFilterTest extends 
ClusterPerTestIntegrationTest
 
         TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
 
-        assertValueInStorage(
-                metaStorageManager,
-                stablePartAssignmentsKey(partId),
-                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
-                null,
-                TIMEOUT_MILLIS
+        // Table was created after both nodes was up, so there wasn't any 
rebalance.
+        assertPendingStableAreNull(metaStorageManager, partId);
+
+        // Stop node, that was only one, that passed the filter, so data nodes 
after filtering will be empty.
+        stopNode(1);
+
+        waitDataNodeAndListenersAreHandled(metaStorageManager, 1);
+
+        //Check that stable and pending are null, so there wasn't any 
rebalance.
+        assertPendingStableAreNull(metaStorageManager, partId);
+
+        session.execute(null, "ALTER ZONE \"TEST_ZONE\" SET "
+                + "\"REPLICAS\" = 2");
+
+        DistributionZoneManager distributionZoneManager = 
(DistributionZoneManager) IgniteTestUtils.getFieldValue(
+                node0,
+                IgniteImpl.class,
+                "distributionZoneManager"
         );
 
-        assertValueInStorage(
-                metaStorageManager,
-                zoneDataNodesKey(1),
-                (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
-                2,
-                TIMEOUT_MILLIS
+        DistributionZonesConfiguration distributionZonesConfiguration = 
(DistributionZonesConfiguration) IgniteTestUtils.getFieldValue(
+                distributionZoneManager,
+                DistributionZoneManager.class,
+                "zonesConfiguration"
         );
 
-        Entry dataNodesEntry = 
metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+        CountDownLatch latch = new CountDownLatch(1);
 
-        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
>= dataNodesEntry.revision(), 5_000));
+        // We need to be sure, that the first asynchronous configuration 
change of replica was handled,
+        // so we create a listener with a latch, and change replica again and 
wait for latch, so we can be sure that the first
+        // replica was handled.
+        
distributionZonesConfiguration.distributionZones().any().replicas().listen(ctx 
-> {
+            latch.countDown();
 
-        assertValueInStorage(
-                metaStorageManager,
-                stablePartAssignmentsKey(partId),
-                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
-                null,
-                TIMEOUT_MILLIS
-        );
+            return CompletableFuture.completedFuture(null);
+        });
 
-        stopNode(1);
+        session.execute(null, "ALTER ZONE \"TEST_ZONE\" SET \"REPLICAS\" = 3");
 
+        latch.await(10_000, MILLISECONDS);
+
+        //Check that stable and pending are null, so there wasn't any 
rebalance.
+        assertPendingStableAreNull(metaStorageManager, partId);
+    }
+
+    private static void waitDataNodeAndListenersAreHandled(
+            MetaStorageManager metaStorageManager,
+            int expectedDataNodesSize
+    ) throws Exception {
         assertValueInStorage(
                 metaStorageManager,
                 zoneDataNodesKey(1),
                 (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
-                1,
+                expectedDataNodesSize,
                 TIMEOUT_MILLIS
         );
 
-        Entry newDataNodesEntry = 
metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+        ByteArray fakeKey = new ByteArray("Foo");
+
+        metaStorageManager.put(fakeKey, toBytes("Bar")).get(5_000, 
MILLISECONDS);
 
-        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
>= newDataNodesEntry.revision(), 5_000));
+        Entry fakeEntry = metaStorageManager.get(fakeKey).get(5_000, 
MILLISECONDS);
+
+        // We wait for all data nodes listeners are triggered and all their 
meta storage activity is done.
+        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() 
>= fakeEntry.revision(), 5_000));
+    }
 
+    private static void assertPendingStableAreNull(
+            MetaStorageManager metaStorageManager,
+            TablePartitionId partId
+    ) throws InterruptedException {
         assertValueInStorage(
                 metaStorageManager,
                 pendingPartAssignmentsKey(partId),
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 63b3e4d66b..4a7cae5df4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -230,6 +230,10 @@ public class DdlCommandHandler {
             
zoneCfgBuilder.dataNodesAutoAdjustScaleUp(cmd.dataNodesAutoAdjustScaleUp());
         }
 
+        if (cmd.replicas() != null) {
+            zoneCfgBuilder.replicas(cmd.replicas());
+        }
+
         return distributionZoneManager.alterZone(cmd.zoneName(), 
zoneCfgBuilder.build())
                 .handle(handleModificationResult(cmd.ifExists(), 
DistributionZoneNotFoundException.class));
     }

Reply via email to