This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a00da3d757 IGNITE-22629 Fix
ItReplicaStateManagerTest.testReplicaStatesManagement (#4040)
a00da3d757 is described below
commit a00da3d757231c139da501b7bf8ef6e33d6b089e
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jul 9 17:03:59 2024 +0300
IGNITE-22629 Fix ItReplicaStateManagerTest.testReplicaStatesManagement
(#4040)
---
.../RebalanceRaftGroupEventsListener.java | 9 ++-
.../ignite/internal/replicator/ReplicaManager.java | 2 +
.../runner/app/ItReplicaStateManagerTest.java | 65 +++++++++++++++-------
.../ignite/internal/BaseIgniteRestartTest.java | 2 +-
4 files changed, 55 insertions(+), 23 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index ed074f5a38..fb6335597e 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -261,11 +261,11 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
Set<Integer> counter = fromBytes(counterEntry.value());
- assert !counter.isEmpty();
-
if (!counter.contains(tablePartitionId.tableId())) {
// Count down for this table has already been processed, just
skip.
// For example, this can happen when leader re-election
happened during the rebalance process.
+ LOG.info("Counter count down skipped, because the counter
doesn't contain the tableId=" + tablePartitionId.tableId());
+
return;
}
@@ -312,6 +312,11 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
} catch (InterruptedException | ExecutionException e) {
// TODO: IGNITE-14693
LOG.warn("Unable to count down partitions counter in metastore: "
+ tablePartitionId, e);
+ } catch (Throwable e) {
+ // TODO: IGNITE-14693
+ LOG.error("Unable to count down partitions counter in metastore: "
+ tablePartitionId, e);
+
+ throw e;
}
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index bdf06f50bd..7b8f0a72e3 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -1350,6 +1350,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
} else if (state == ReplicaState.PRIMARY_ONLY) {
context.replicaState = ReplicaState.ASSIGNED;
+ LOG.debug("Weak replica start complete [state={}].",
context.replicaState);
+
return trueCompletedFuture();
} // else no-op.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
index f0039611f6..7b90f79329 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItReplicaStateManagerTest.java
@@ -18,10 +18,8 @@
package org.apache.ignite.internal.runner.app;
import static java.util.Collections.emptySet;
-import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -53,6 +51,19 @@ import org.junit.jupiter.api.Test;
* Test for replica lifecycle.
*/
public class ItReplicaStateManagerTest extends BaseIgniteRestartTest {
+ private static final String[] ATTRIBUTES = {
+ "{region:{attribute:\"REG0\"}}",
+ "{region:{attribute:\"REG1\"}}",
+ "{region:{attribute:\"REG2\"}}"
+ };
+
+ private static final String ZONE_NAME = "TEST_ZONE";
+
+ @Override
+ protected String configurationString(int idx) {
+ return configurationString(idx, ATTRIBUTES[idx]);
+ }
+
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22629")
public void testReplicaStatesManagement() throws InterruptedException {
@@ -61,15 +72,17 @@ public class ItReplicaStateManagerTest extends
BaseIgniteRestartTest {
IgniteImpl node0 = nodes.get(0);
- String zone = "TEST_ZONE";
String tableName = "TEST";
node0.sql().execute(null,
String.format("CREATE ZONE IF NOT EXISTS %s WITH REPLICAS=%d,
PARTITIONS=%d, STORAGE_PROFILES='%s'",
- zone, 3, 1, DEFAULT_STORAGE_PROFILE));
+ ZONE_NAME, 3, 1, DEFAULT_STORAGE_PROFILE));
node0.sql().execute(null,
- String.format("CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY
KEY, name VARCHAR) WITH PRIMARY_ZONE='%s'", tableName, zone));
+ String.format("CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY
KEY, name VARCHAR) WITH PRIMARY_ZONE='%s'", tableName,
+ ZONE_NAME
+ )
+ );
TableImpl tbl = unwrapTableImpl(node0.tables().table("TEST"));
int tableId = tbl.tableId();
@@ -84,18 +97,10 @@ public class ItReplicaStateManagerTest extends
BaseIgniteRestartTest {
log.info("Test: primary replica is " + replicaMeta);
- // This will be the pending assignments excluding the primary replica
node.
- Set<Assignment> newPendingAssignments = nodes.stream()
- .filter(n -> !n.id().equals(replicaMeta.getLeaseholderId()))
- .map(n -> Assignment.forPeer(n.name()))
- .collect(toSet());
-
- ByteArray pendingAssignmentsKey = pendingPartAssignmentsKey(partId);
-
log.info("Test: Excluding the current primary from assignments. The
replica should stay alive.");
// Excluding the current primary from assignments. The replica should
stay alive.
- node0.metaStorageManager().put(pendingAssignmentsKey,
Assignments.toBytes(newPendingAssignments));
+ node0.sql().execute(null, alterZoneSql(filterForNodes(nodes,
replicaMeta.getLeaseholderId())));
ByteArray stableAssignmentsKey = stablePartAssignmentsKey(partId);
@@ -106,11 +111,7 @@ public class ItReplicaStateManagerTest extends
BaseIgniteRestartTest {
log.info("Test: Including it back.");
// Including it back.
- Set<Assignment> pendingAssignmentsAllNodes = nodes.stream()
- .map(n -> Assignment.forPeer(n.name()))
- .collect(toSet());
-
- node0.metaStorageManager().put(pendingAssignmentsKey,
Assignments.toBytes(pendingAssignmentsAllNodes));
+ node0.sql().execute(null, alterZoneSql(filterForNodes(nodes, null)));
waitForStableAssignments(node0.metaStorageManager(),
stableAssignmentsKey.bytes(), nodesCount);
@@ -119,7 +120,7 @@ public class ItReplicaStateManagerTest extends
BaseIgniteRestartTest {
log.info("Test: Excluding again.");
// Excluding again.
- node0.metaStorageManager().put(pendingAssignmentsKey,
Assignments.toBytes(newPendingAssignments));
+ node0.sql().execute(null, alterZoneSql(filterForNodes(nodes,
replicaMeta.getLeaseholderId())));
waitForStableAssignments(node0.metaStorageManager(),
stableAssignmentsKey.bytes(), nodesCount - 1);
@@ -149,6 +150,30 @@ public class ItReplicaStateManagerTest extends
BaseIgniteRestartTest {
assertTrue(success);
}
+ private static String alterZoneSql(String filter) {
+ return String.format("ALTER ZONE \"%s\" SET \"DATA_NODES_FILTER\" =
'%s'", ZONE_NAME, filter);
+ }
+
+ private static String filterForNodes(List<IgniteImpl> nodes, @Nullable
String excludeId) {
+ StringBuilder attrs = new StringBuilder();
+
+ for (int idx = 0; idx < nodes.size(); idx++) {
+ IgniteImpl node = nodes.get(idx);
+
+ if (excludeId != null && node.id().equals(excludeId)) {
+ continue;
+ }
+
+ if (!attrs.toString().isEmpty()) {
+ attrs.append(" || ");
+ }
+
+ attrs.append("@.region == \"REG" + idx + "\"");
+ }
+
+ return "$[?(" + attrs + ")]";
+ }
+
@Nullable
private static MvPartitionStorage storage(IgniteImpl node) {
TableImpl t = unwrapTableImpl(node.tables().table("TEST"));
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
index 96ea800175..7167f8a1a9 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/BaseIgniteRestartTest.java
@@ -225,7 +225,7 @@ public abstract class BaseIgniteRestartTest extends
IgniteAbstractTest {
* @param idx Node index.
* @return Configuration string.
*/
- protected static String configurationString(int idx) {
+ protected String configurationString(int idx) {
return configurationString(idx, "{}");
}