This is an automated email from the ASF dual-hosted git repository.
ashapkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2cb34043dbb [IGNITE-27835] Check majority in HA mechanism based on
quorum size no… (#7624)
2cb34043dbb is described below
commit 2cb34043dbb9bd99608f33a20c7b2434fb97f5b8
Author: Egor <[email protected]>
AuthorDate: Mon Mar 2 17:26:51 2026 +0400
[IGNITE-27835] Check majority in HA mechanism based on quorum size no…
(#7624)
Co-authored-by: Egor Kuts <[email protected]>
---
...bstractHighAvailablePartitionsRecoveryTest.java | 4 +-
...ilablePartitionsRecoveryByFilterUpdateTest.java | 2 +-
.../disaster/DisasterRecoveryManager.java | 12 ++---
.../disaster/GroupUpdateRequestHandler.java | 5 +-
.../ItDisasterRecoveryReconfigurationTest.java | 55 +++++++++++++++++++++-
5 files changed, 64 insertions(+), 14 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
index 5483986e55e..1fdbbf9ea16 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
@@ -115,10 +115,10 @@ public abstract class
AbstractHighAvailablePartitionsRecoveryTest extends Cluste
private static final String SC_TABLE_NAME = "SC_TABLE";
- private static final int PARTITIONS_NUMBER = 2;
-
private static final int ENTRIES = 2;
+ static final int PARTITIONS_NUMBER = 2;
+
static Set<Integer> PARTITION_IDS = IntStream
.range(0, PARTITIONS_NUMBER)
.boxed()
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index 9be0b55c60e..83eb69d7a88 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -202,7 +202,7 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
}
String globalFilter = "$[?(@.zone == \"custom\")]";
- createHaZoneWithTable(globalFilter, nodeNames(1, 2, 3, 4, 5, 6, 7));
+ createHaZoneWithTables(HA_ZONE_NAME, PARTITIONS_NUMBER, globalFilter,
4, List.of(HA_TABLE_NAME), nodeNames(1, 2, 3, 4, 5, 6, 7));
IgniteImpl node0 = igniteImpl(0);
Table table = node0.tables().table(HA_TABLE_NAME);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index d7ab9cf511f..1ef6aac79f0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -329,7 +329,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return busyLock;
}
- private Set<Assignment>
stableAssignmentsWithOnlyAliveNodes(ReplicationGroupId partitionId, long
revision) {
+ private Set<Assignment>
stableAssignmentsWithOnlyAliveAndVotingNodes(ReplicationGroupId partitionId,
long revision) {
Set<Assignment> stableAssignments;
stableAssignments = ZoneRebalanceUtil.zoneStableAssignmentsGetLocally(
@@ -342,7 +342,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
.stream().map(NodeWithAttributes::nodeName).collect(Collectors.toUnmodifiableSet());
return stableAssignments
- .stream().filter(a ->
logicalTopology.contains(a.consistentId())).collect(Collectors.toUnmodifiableSet());
+ .stream().filter(a ->
logicalTopology.contains(a.consistentId()) &&
a.isPeer()).collect(Collectors.toUnmodifiableSet());
}
private CompletableFuture<Boolean>
onHaZonePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
@@ -359,7 +359,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
for (int partId = 0; partId < zoneDescriptor.partitions();
partId++) {
ZonePartitionId partitionId = new ZonePartitionId(zoneId,
partId);
- if (stableAssignmentsWithOnlyAliveNodes(partitionId,
revision).size() < calculateQuorum(zoneDescriptor.replicas())) {
+ if (stableAssignmentsWithOnlyAliveAndVotingNodes(partitionId,
revision).size() < zoneDescriptor.quorumSize()) {
partitionsToReset.add(partId);
}
}
@@ -1298,7 +1298,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
CatalogZoneDescriptor zoneDescriptor =
catalog.zone(zonePartitionId.zoneId());
int replicas = zoneDescriptor.replicas();
- int quorum = calculateQuorum(replicas);
+ int quorum = zoneDescriptor.quorumSize();
Map<LocalPartitionStateEnum, List<LocalPartitionState>> groupedStates
= map.values().stream()
.collect(groupingBy(localPartitionState ->
localPartitionState.state));
@@ -1327,10 +1327,6 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
}
}
- private static int calculateQuorum(int replicas) {
- return replicas / 2 + 1;
- }
-
private Catalog catalogLatestVersion() {
return catalogManager.latestCatalog();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
index 6c6813d90a6..6a55893f456 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
@@ -257,6 +257,7 @@ class GroupUpdateRequestHandler {
aliveNodesConsistentIds,
zoneDescriptor.partitions(),
zoneDescriptor.replicas(),
+ zoneDescriptor.quorumSize(),
zoneDescriptor.consensusGroupSize(),
revision,
timestamp,
@@ -283,6 +284,7 @@ class GroupUpdateRequestHandler {
Set<String> aliveNodesConsistentIds,
int partitions,
int replicas,
+ int quorumSize,
int consensusGroupSize,
long revision,
HybridTimestamp timestamp,
@@ -296,8 +298,9 @@ class GroupUpdateRequestHandler {
return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
Set<Assignment> partAssignments =
getAliveNodesWithData(aliveNodesConsistentIds,
localPartitionStateMessageByNode);
Set<Assignment> aliveStableNodes =
CollectionUtils.intersect(currentAssignments, partAssignments);
+ long aliveStableVotingNodes =
aliveStableNodes.stream().filter(Assignment::isPeer).count();
- if (aliveStableNodes.size() >= (replicas / 2 + 1)) {
+ if (aliveStableVotingNodes >= quorumSize) {
return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 68567108c5b..0f08ecd06fb 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -196,9 +196,11 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
startNodesInParallel(IntStream.range(INITIAL_NODES,
zoneParams.nodes()).toArray());
executeSql(format("CREATE ZONE %s (replicas %d, partitions %d, "
- + "auto scale down %d, auto scale up %d, consistency
mode '%s') storage profiles ['%s']",
+ + "auto scale down %d, auto scale up %d, consistency
mode '%s', quorum size %d) storage profiles ['%s']",
zoneName, zoneParams.replicas(), zoneParams.partitions(),
SCALE_DOWN_TIMEOUT_SECONDS, 1,
- zoneParams.consistencyMode().name(), DEFAULT_STORAGE_PROFILE
+ zoneParams.consistencyMode().name(),
+ zoneParams.quorumSize() == -1 ? zoneParams.replicas() / 2 + 1
: zoneParams.quorumSize(),
+ DEFAULT_STORAGE_PROFILE
));
CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
@@ -1694,6 +1696,53 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertAssignmentsChain(node0, partId,
AssignmentsChain.of(finalAssignments));
}
+ /**
+ * Tests that reset is not triggered when we still have quorum with custom
quorum size.
+ * Verifies IGNITE-27835: quorum check uses zoneDescriptor.quorumSize()
instead of replicas/2+1.
+ *
+ * <p>With 7 replicas and custom quorum size of 2:
+ * <ul>
+ * <li>Stop 4 nodes, leaving 3 alive</li>
+ * <li>With custom quorum of 2: 3 >= 2, still have quorum, reset should
NOT trigger</li>
+ * <li>With default quorum of 4: 2 < 4, would have lost quorum, reset
would trigger</li>
+ * </ul>
+ */
+ @Test
+ @ZoneParams(nodes = 7, replicas = 7, partitions = 1, quorumSize = 2)
+ void testCustomQuorumSizeRespectedInManualReset() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = igniteImpl(0);
+ CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+
+ assertThat(zone.quorumSize(), is(2));
+
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ stopNodesInParallel(3, 4, 5, 6);
+ awaitStableContainsNodes(node0, 0, 0, 1, 2);
+ Assignments assignmentsBeforeReset = getStableAssignments(node0,
partId);
+
+ // Trigger reset - should return ASSIGNMENT_NOT_UPDATED because 2 >= 2
(still have quorum).
+ CompletableFuture<Void> resetFuture =
node0.disasterRecoveryManager().resetPartitions(
+ zoneName,
+ emptySet(),
+ true,
+ -1
+ );
+
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ // Verify stable assignments were NOT changed (reset was skipped).
+ Assignments finalStable = getStableAssignments(node0, partId);
+ assertThat(finalStable, is(assignmentsBeforeReset));
+ }
+
/**
* Lease agreement message should be the first message sent after reset.
*/
@@ -2138,6 +2187,8 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
int nodes() default INITIAL_NODES;
+ int quorumSize() default -1;
+
ConsistencyMode consistencyMode() default
ConsistencyMode.STRONG_CONSISTENCY;
}