This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 22e2e77d7c7 IGNITE-24467 Fixed reset and filter rebalance race (#7445)
22e2e77d7c7 is described below
commit 22e2e77d7c72bcbf9617be96f9625c3098ef1890
Author: Egor <[email protected]>
AuthorDate: Wed Feb 4 12:24:15 2026 +0400
IGNITE-24467 Fixed reset and filter rebalance race (#7445)
Co-authored-by: Egor Kuts <[email protected]>
---
.../partitiondistribution/Assignments.java | 2 +-
.../partitiondistribution/AssignmentsQueue.java | 9 ++++++
...ilablePartitionsRecoveryByFilterUpdateTest.java | 24 ++++++++++++++--
.../disaster/GroupUpdateRequestHandler.java | 32 +++++++++++++++++++++-
.../ItDisasterRecoveryReconfigurationTest.java | 18 +++++++++---
5 files changed, 76 insertions(+), 9 deletions(-)
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
index 1c2c606c9ed..06198b3be41 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
@@ -63,7 +63,7 @@ public class Assignments {
/**
* Constructor.
*/
- private Assignments(Collection<Assignment> nodes, boolean force, long
timestamp, boolean fromReset) {
+ public Assignments(Collection<Assignment> nodes, boolean force, long
timestamp, boolean fromReset) {
// A set of nodes must be a HashSet in order for serialization to
produce stable results,
// that could be compared as byte arrays.
this.nodes = nodes instanceof HashSet ? ((HashSet<Assignment>) nodes)
: new HashSet<>(nodes);
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
index c5d667c1fd7..1968ee975c7 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java
@@ -41,6 +41,15 @@ public class AssignmentsQueue implements
Iterable<Assignments> {
@IgniteToStringInclude
private final Deque<Assignments> queue;
+ /** Constructor. */
+ public AssignmentsQueue(AssignmentsQueue... assignmentsQueues) {
+ LinkedList<Assignments> assignments = new LinkedList<>();
+ for (AssignmentsQueue assignmentsQueue : assignmentsQueues) {
+ assignments.addAll(assignmentsQueue.queue);
+ }
+ this.queue = assignments;
+ }
+
/** Constructor. */
public AssignmentsQueue(Assignments... assignments) {
this(Arrays.asList(assignments));
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
index b5d82432680..8470b920920 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryByFilterUpdateTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.disaster;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopologyInMetastorage;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommand;
import org.apache.ignite.internal.catalog.commands.AlterZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.table.Table;
import org.intellij.lang.annotations.Language;
@@ -328,7 +330,7 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
}
private void alterZoneSql(String filter, String zoneName) {
- executeSql(String.format("ALTER ZONE \"%s\" SET (\"DATA_NODES_FILTER\"
'%s')", zoneName, filter));
+ executeSql(String.format("ALTER ZONE \"%s\" SET (NODES FILTER '%s')",
zoneName, filter));
}
/**
@@ -344,7 +346,6 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
* <li>No data should be lost</li>
* </ol>
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24467")
@Test
void testResetAfterChangeFilters() throws InterruptedException {
startNode(1, EU_ONLY_NODES_CONFIG);
@@ -376,6 +377,15 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
stopNodes(1, 2);
+ Set<LogicalNode> expectedNodes = Set.of(
+ getLogicalNode(igniteImpl(0)),
+ getLogicalNode(igniteImpl(3)),
+ getLogicalNode(igniteImpl(4)),
+ getLogicalNode(igniteImpl(5))
+ );
+
+ assertLogicalTopologyInMetastorage(expectedNodes,
node.metaStorageManager());
+
String globalFilter = "$[?(@.region == \"US\")]";
alterZoneSql(globalFilter, HA_ZONE_NAME);
@@ -402,7 +412,7 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
* <li>No data should be lost</li>
* </ol>
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24467")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27643")
@Test
void testResetAfterChangeStorageProfiles() throws InterruptedException {
startNode(1, AIPERSIST_NODES_CONFIG);
@@ -519,4 +529,12 @@ public class
ItHighAvailablePartitionsRecoveryByFilterUpdateTest extends Abstrac
+ " rest.port: {}\n"
+ "}";
}
+
+ private static LogicalNode getLogicalNode(IgniteImpl ignite) {
+
+ return
ignite.logicalTopologyService().localLogicalTopology().nodes().stream()
+ .filter(n -> n.name().equals(ignite.name()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Node not found
in logical topology: " + ignite.name()));
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
index 3bb6bc190ee..90f54e63bcd 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
@@ -333,7 +333,17 @@ class GroupUpdateRequestHandler {
.stable(Assignments.of(currentAssignments,
assignmentsTimestamp))
.target(Assignments.forced(Set.of(nextAssignment),
assignmentsTimestamp))
.toQueue();
-
+ if (!manualUpdate) {
+ ByteArray pendingKey =
ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(partId);
+ var entry = metaStorageMgr.getLocally(pendingKey);
+ if (entry != null) {
+ AssignmentsQueue pendingQueue =
AssignmentsQueue.fromBytes(entry.value());
+ if (pendingQueue != null && !pendingQueue.isEmpty()) {
+ AssignmentsQueue filteredPendingQueue =
filterAliveNodesOnly(pendingQueue, aliveNodesConsistentIds);
+ assignmentsQueue = new
AssignmentsQueue(assignmentsQueue, filteredPendingQueue);
+ }
+ }
+ }
return invoke(
partId,
revision,
@@ -347,6 +357,26 @@ class GroupUpdateRequestHandler {
});
}
+ private static AssignmentsQueue filterAliveNodesOnly(AssignmentsQueue
queue, Set<String> aliveNodesConsistentIds) {
+ List<Assignments> filteredAssignments = new ArrayList<>();
+
+ for (Assignments assignments : queue) {
+ Set<Assignment> aliveAssignments = assignments.nodes().stream()
+ .filter(assignment ->
aliveNodesConsistentIds.contains(assignment.consistentId()))
+ .collect(toSet());
+
+ if (!aliveAssignments.isEmpty()) {
+ filteredAssignments.add(new Assignments(
+ aliveAssignments,
+ assignments.force(),
+ assignments.timestamp(),
+ assignments.fromReset())
+ );
+ }
+ }
+ return new
AssignmentsQueue(filteredAssignments.toArray(Assignments[]::new));
+ }
+
/**
* Returns an assignment with the most up to date log index, if there are
more than one node with the same index, returns the first one
* in the lexicographic order.
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 5c75fe98eaf..9d2d295e3d8 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -806,9 +806,9 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
assertRealAssignments(node0, partId, 1, 3, 4);
- stopNodesInParallel(3, 4);
+ stopNodesInParallel(2, 3, 4);
- waitForScale(node0, 3);
+ waitForScale(node0, 2);
DisasterRecoveryManager disasterRecoveryManager =
node0.disasterRecoveryManager();
CompletableFuture<?> updateFuture =
disasterRecoveryManager.resetPartitions(zoneName, emptySet(), false, 1);
@@ -817,18 +817,20 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
awaitPrimaryReplica(node0, partId);
- assertRealAssignments(node0, partId, 1);
+ assertRealAssignments(node0, partId, 0, 1);
List<Throwable> errors = insertValues(table, partId, 0);
assertThat(errors, is(empty()));
+ awaitStableContainsNodes(node0, partId, 0, 1);
// Check that there is no ongoing or planned rebalance.
assertNull(getPendingAssignments(node0, partId));
- assertRealAssignments(node0, partId, 1);
+ assertRealAssignments(node0, partId, 0, 1);
// No fromReset flag is set on stable.
Assignments assignmentsStable = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
Assignment.forPeer(node(1).name())
), timestamp);
@@ -2056,6 +2058,14 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
.until(() -> getStableAssignments(node0,
partId).nodes().size() == 1);
}
+ private void awaitStableContainsNodes(IgniteImpl node0, int partId,
Integer ... expected) {
+ await().atMost(60, SECONDS)
+ .until(() -> requireNonNull(getStableAssignments(node0,
partId)).nodes()
+
.stream().map(Assignment::consistentId).collect(Collectors.toList()).equals(
+ Arrays.stream(expected).map(idx ->
cluster.nodeName(idx)).collect(Collectors.toList())
+ ));
+ }
+
/**
* Return assignments based on states of partitions in the cluster. It is
possible that returned value contains nodes
* from stable and pending, for example, when rebalance is in progress.