This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7a338a04f7 IGNITE-19443 Skipping empty data nodes propagation to
pending assignments after filtering (#2089)
7a338a04f7 is described below
commit 7a338a04f7dc4e7ad0aeee9a35977bc764dfb5d4
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri May 19 00:12:41 2023 +0400
IGNITE-19443 Skipping empty data nodes propagation to pending assignments
after filtering (#2089)
---
.../rebalance/DistributionZoneRebalanceEngine.java | 15 +-
.../DistributionZoneRebalanceEngineTest.java | 8 +-
.../internal/metastorage/MetaStorageManager.java | 5 +
.../metastorage/impl/MetaStorageManagerImpl.java | 1 +
.../zones/ItDistributionZonesFilterTest.java | 151 ++++++++++++++++-----
.../sql/engine/exec/ddl/DdlCommandHandler.java | 4 +
6 files changed, 147 insertions(+), 37 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index e7c116e951..04c06b93d5 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -181,6 +181,10 @@ public class DistributionZoneRebalanceEngine {
distributionZoneManager.nodesAttributes()
);
+ if (filteredDataNodes.isEmpty()) {
+ continue;
+ }
+
if (zoneId == tableZoneId) {
TableConfiguration tableCfg =
tables.get(tableView.name());
@@ -261,7 +265,6 @@ public class DistributionZoneRebalanceEngine {
int furCur = 0;
for (TableConfiguration tblCfg : tblsCfg) {
-
LOG.info("Received update for replicas number [table={},
oldNumber={}, newNumber={}]",
tblCfg.name().value(), replicasCtx.oldValue(),
replicasCtx.newValue());
@@ -273,13 +276,21 @@ public class DistributionZoneRebalanceEngine {
List<Set<Assignment>> tableAssignments =
ByteUtils.fromBytes(assignmentsBytes);
+ Set<String> dataNodes =
distributionZoneManager.dataNodes(zoneCfg.zoneId());
+
+ if (dataNodes.isEmpty()) {
+ futs[furCur++] = completedFuture(null);
+
+ continue;
+ }
+
for (int i = 0; i < partCnt; i++) {
TablePartitionId replicaGrpId = new
TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i);
futs[furCur++] = updatePendingAssignmentsKeys(
tblCfg.name().value(),
replicaGrpId,
-
distributionZoneManager.dataNodes(zoneCfg.zoneId()),
+ dataNodes,
newReplicas,
replicasCtx.storageRevision(),
metaStorageManager,
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 969b05edcb..d51c123669 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -319,16 +319,16 @@ public class DistributionZoneRebalanceEngineTest extends
IgniteAbstractTest {
verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
- nodes = emptySet();
+ Set<String> emptyNodes = emptySet();
- watchListenerOnUpdate(1, nodes, 3);
+ watchListenerOnUpdate(1, emptyNodes, 3);
zoneNodes.clear();
- zoneNodes.put(1, nodes);
+ zoneNodes.put(1, null);
checkAssignments(tablesConfiguration, zoneNodes,
RebalanceUtil::plannedPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(1)).invoke(any(), any());
}
@Test
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 804cfd3a89..4bf608e61a 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -64,6 +64,11 @@ public interface MetaStorageManager extends IgniteComponent {
*/
CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys);
+ /**
+ * Inserts or updates an entry with the given key and the given value.
+ */
+ CompletableFuture<Void> put(ByteArray key, byte[] val);
+
/**
* Retrieves entries for the given key prefix in lexicographic order.
Shortcut for {@link #prefix(ByteArray, long)} where
* {@code revUpperBound = LATEST_REVISION}.
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 0a11931f7c..1e83e3c057 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -350,6 +350,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
*
* @see MetaStorageService#put(ByteArray, byte[])
*/
+ @Override
public CompletableFuture<Void> put(ByteArray key, byte[] val) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
index 522a6e07b3..57ade2725a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static
org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,20 +31,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.Node;
+import
org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.sql.Session;
import org.intellij.lang.annotations.Language;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -147,7 +152,6 @@ public class ItDistributionZonesFilterTest extends
ClusterPerTestIntegrationTest
assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
>= dataNodesEntry1.revision(), TIMEOUT_MILLIS));
// We check that two nodes that pass the filter are presented in the
stable key.
-
assertValueInStorage(
metaStorageManager,
stablePartAssignmentsKey(partId),
@@ -176,14 +180,13 @@ public class ItDistributionZonesFilterTest extends
ClusterPerTestIntegrationTest
* @throws Exception If failed.
*/
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-19443")
void testFilteredEmptyDataNodesDoNotTriggerRebalance() throws Exception {
String filter = "'$[?(@.region == \"EU\" && @.storage == \"HDD\")]'";
// This node do not pass the filter.
IgniteImpl node0 = node(0);
- // This node pass the filter
+ // This node passes the filter
@Language("JSON") String firstNodeAttributes =
"{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
IgniteImpl node1 = startNode(1,
createStartConfig(firstNodeAttributes));
@@ -197,13 +200,70 @@ public class ItDistributionZonesFilterTest extends
ClusterPerTestIntegrationTest
+ "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
+ "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
+ MetaStorageManager metaStorageManager = (MetaStorageManager)
IgniteTestUtils.getFieldValue(
+ node0,
+ IgniteImpl.class,
+ "metaStorageMgr"
+ );
+
+ waitDataNodeAndListenersAreHandled(metaStorageManager, 2);
+
String tableName = "table1";
session.execute(null, "CREATE TABLE " + tableName + "("
+ COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR)
WITH PRIMARY_ZONE='TEST_ZONE'");
- MetaStorageManager metaStorageManager = (MetaStorageManager)
IgniteTestUtils
- .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+ TableManager tableManager = (TableManager)
IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
+
+ TableImpl table = (TableImpl) tableManager.table(tableName);
+
+ TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+ // Table was created after both nodes was up, so there wasn't any
rebalance.
+ assertPendingStableAreNull(metaStorageManager, partId);
+
+ // Stop node, that was only one, that passed the filter, so data nodes
after filtering will be empty.
+ stopNode(1);
+
+ waitDataNodeAndListenersAreHandled(metaStorageManager, 1);
+
+ //Check that stable and pending are null, so there wasn't any
rebalance.
+ assertPendingStableAreNull(metaStorageManager, partId);
+ }
+
+ @Test
+ void testFilteredEmptyDataNodesDoNotTriggerRebalanceOnReplicaUpdate()
throws Exception {
+ String filter = "'$[?(@.region == \"EU\" && @.storage == \"HDD\")]'";
+
+ // This node do not pass the filter.
+ IgniteImpl node0 = node(0);
+
+ // This node passes the filter
+ @Language("JSON") String firstNodeAttributes =
"{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
+
+ startNode(1, createStartConfig(firstNodeAttributes));
+
+ Session session = node0.sql().createSession();
+
+ session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+ + "\"REPLICAS\" = 1, "
+ + "\"PARTITIONS\" = 1, "
+ + "\"DATA_NODES_FILTER\" = " + filter + ", "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
+
+ MetaStorageManager metaStorageManager = (MetaStorageManager)
IgniteTestUtils.getFieldValue(
+ node0,
+ IgniteImpl.class,
+ "metaStorageMgr"
+ );
+
+ waitDataNodeAndListenersAreHandled(metaStorageManager, 2);
+
+ String tableName = "table1";
+
+ session.execute(null, "CREATE TABLE " + tableName + "("
+ + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR)
WITH PRIMARY_ZONE='TEST_ZONE'");
TableManager tableManager = (TableManager)
IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
@@ -211,48 +271,77 @@ public class ItDistributionZonesFilterTest extends
ClusterPerTestIntegrationTest
TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
- assertValueInStorage(
- metaStorageManager,
- stablePartAssignmentsKey(partId),
- (v) -> ((Set<Assignment>) fromBytes(v)).size(),
- null,
- TIMEOUT_MILLIS
+ // Table was created after both nodes was up, so there wasn't any
rebalance.
+ assertPendingStableAreNull(metaStorageManager, partId);
+
+ // Stop node, that was only one, that passed the filter, so data nodes
after filtering will be empty.
+ stopNode(1);
+
+ waitDataNodeAndListenersAreHandled(metaStorageManager, 1);
+
+ //Check that stable and pending are null, so there wasn't any
rebalance.
+ assertPendingStableAreNull(metaStorageManager, partId);
+
+ session.execute(null, "ALTER ZONE \"TEST_ZONE\" SET "
+ + "\"REPLICAS\" = 2");
+
+ DistributionZoneManager distributionZoneManager =
(DistributionZoneManager) IgniteTestUtils.getFieldValue(
+ node0,
+ IgniteImpl.class,
+ "distributionZoneManager"
);
- assertValueInStorage(
- metaStorageManager,
- zoneDataNodesKey(1),
- (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
- 2,
- TIMEOUT_MILLIS
+ DistributionZonesConfiguration distributionZonesConfiguration =
(DistributionZonesConfiguration) IgniteTestUtils.getFieldValue(
+ distributionZoneManager,
+ DistributionZoneManager.class,
+ "zonesConfiguration"
);
- Entry dataNodesEntry =
metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+ CountDownLatch latch = new CountDownLatch(1);
- assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
>= dataNodesEntry.revision(), 5_000));
+ // We need to be sure, that the first asynchronous configuration
change of replica was handled,
+ // so we create a listener with a latch, and change replica again and
wait for latch, so we can be sure that the first
+ // replica was handled.
+
distributionZonesConfiguration.distributionZones().any().replicas().listen(ctx
-> {
+ latch.countDown();
- assertValueInStorage(
- metaStorageManager,
- stablePartAssignmentsKey(partId),
- (v) -> ((Set<Assignment>) fromBytes(v)).size(),
- null,
- TIMEOUT_MILLIS
- );
+ return CompletableFuture.completedFuture(null);
+ });
- stopNode(1);
+ session.execute(null, "ALTER ZONE \"TEST_ZONE\" SET \"REPLICAS\" = 3");
+ latch.await(10_000, MILLISECONDS);
+
+ //Check that stable and pending are null, so there wasn't any
rebalance.
+ assertPendingStableAreNull(metaStorageManager, partId);
+ }
+
+ private static void waitDataNodeAndListenersAreHandled(
+ MetaStorageManager metaStorageManager,
+ int expectedDataNodesSize
+ ) throws Exception {
assertValueInStorage(
metaStorageManager,
zoneDataNodesKey(1),
(v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
- 1,
+ expectedDataNodesSize,
TIMEOUT_MILLIS
);
- Entry newDataNodesEntry =
metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+ ByteArray fakeKey = new ByteArray("Foo");
+
+ metaStorageManager.put(fakeKey, toBytes("Bar")).get(5_000,
MILLISECONDS);
- assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
>= newDataNodesEntry.revision(), 5_000));
+ Entry fakeEntry = metaStorageManager.get(fakeKey).get(5_000,
MILLISECONDS);
+
+ // We wait for all data nodes listeners are triggered and all their
meta storage activity is done.
+ assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
>= fakeEntry.revision(), 5_000));
+ }
+ private static void assertPendingStableAreNull(
+ MetaStorageManager metaStorageManager,
+ TablePartitionId partId
+ ) throws InterruptedException {
assertValueInStorage(
metaStorageManager,
pendingPartAssignmentsKey(partId),
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 63b3e4d66b..4a7cae5df4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -230,6 +230,10 @@ public class DdlCommandHandler {
zoneCfgBuilder.dataNodesAutoAdjustScaleUp(cmd.dataNodesAutoAdjustScaleUp());
}
+ if (cmd.replicas() != null) {
+ zoneCfgBuilder.replicas(cmd.replicas());
+ }
+
return distributionZoneManager.alterZone(cmd.zoneName(),
zoneCfgBuilder.build())
.handle(handleModificationResult(cmd.ifExists(),
DistributionZoneNotFoundException.class));
}