This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e74127db340 IGNITE-23373 OutdatedTokenException when running
ItDisasterRecoveryManagerTest#testRestartTablePartitionsWithCleanUp (#6829)
e74127db340 is described below
commit e74127db340f1f3c31866690ddfbbb6971baff7e
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Oct 23 21:48:50 2025 +0300
IGNITE-23373 OutdatedTokenException when running
ItDisasterRecoveryManagerTest#testRestartTablePartitionsWithCleanUp (#6829)
---
.../internal/table/distributed/TableManager.java | 91 ++++++++++------------
.../disaster/ItDisasterRecoveryManagerTest.java | 32 ++++++++
2 files changed, 73 insertions(+), 50 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index fc654e51d65..7a9f2f176c2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2464,15 +2464,15 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
CompletableFuture<Void> localServicesStartFuture;
if (shouldStartLocalGroupNode) {
- localServicesStartFuture = createPartitionAndStartClient(
- replicaGrpId,
- tbl,
- revision,
- isRecovery,
- assignmentsTimestamp,
- localAssignmentInPending,
- computedStableAssignments
- );
+ localServicesStartFuture = localPartitionsVv.get(revision)
+ .thenComposeAsync(unused -> createPartitionAndStartClient(
+ replicaGrpId,
+ tbl,
+ isRecovery,
+ assignmentsTimestamp,
+ localAssignmentInPending,
+ computedStableAssignments
+ ), ioExecutor);
} else if (pendingAssignmentsAreForced && localAssignmentInPending !=
null) {
localServicesStartFuture = runAsync(() -> inBusyLock(busyLock, ()
-> {
assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local
node is outside of the replication group: " + replicaGrpId;
@@ -2530,7 +2530,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private CompletableFuture<Void> createPartitionAndStartClient(
TablePartitionId replicaGrpId,
TableViewInternal tbl,
- long revision,
boolean isRecovery,
long assignmentsTimestamp,
Assignment localAssignmentInPending,
@@ -2540,39 +2539,34 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
PartitionSet singlePartitionIdSet = PartitionSet.of(partitionId);
- return localPartitionsVv.get(revision)
- // TODO https://issues.apache.org/jira/browse/IGNITE-20957
Revisit this code
- .thenComposeAsync(
- unused -> inBusyLock(
- busyLock,
- () -> getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
- .thenRun(() ->
localPartsByTableId.compute(
- replicaGrpId.tableId(),
- (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionId)
- ))
- // If the table is already closed,
it's not a problem (probably the node is stopping).
-
.exceptionally(ignoreTableClosedException())
- ),
- ioExecutor
- )
- .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
- lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(tbl, catalogService,
singlePartitionIdSet, tbl.schemaView(), lwm)
- );
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20957 Revisit
this code
+ return inBusyLock(
+ busyLock,
+ () -> getOrCreatePartitionStorages(tbl, singlePartitionIdSet)
+ .thenRun(() -> localPartsByTableId.compute(
+ replicaGrpId.tableId(),
+ (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionId)
+ ))
+ // If the table is already closed, it's not a problem
(probably the node is stopping).
+ .exceptionally(ignoreTableClosedException())
+ ).thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
+ lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(tbl, catalogService,
singlePartitionIdSet, tbl.schemaView(), lwm)
+ );
- return
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(ignored ->
inBusyLock(busyLock, () -> {
- assert localAssignmentInPending != null : "Local
member assignment";
+ return
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(ignored ->
inBusyLock(busyLock, () -> {
+ assert localAssignmentInPending != null : "Local member
assignment";
- return startPartitionAndStartClient(
- tbl,
- replicaGrpId.partitionId(),
- localAssignmentInPending,
- computedStableAssignments,
- isRecovery,
- assignmentsTimestamp
- );
- }));
- }), ioExecutor);
+ return startPartitionAndStartClient(
+ tbl,
+ replicaGrpId.partitionId(),
+ localAssignmentInPending,
+ computedStableAssignments,
+ isRecovery,
+ assignmentsTimestamp
+ );
+ }));
+ }), ioExecutor);
}
/**
@@ -3544,28 +3538,25 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
long revision,
long assignmentsTimestamp
) {
- return inBusyLockAsync(busyLock, () ->
tablesVv.get(revision).thenComposeAsync(unused -> inBusyLockAsync(busyLock, ()
-> {
- TableViewInternal table = tables.get(tablePartitionId.tableId());
-
+ return tableAsync(tablePartitionId.tableId()).thenComposeAsync(table
-> inBusyLockAsync(busyLock, () -> {
assert table != null : tablePartitionId;
Assignments stableAssignments =
stableAssignmentsGetLocally(metaStorageMgr, tablePartitionId, revision);
Assignment localAssignment = localAssignment(stableAssignments);
- return stopPartitionAndDestroyForRestart(tablePartitionId, table)
- .thenComposeAsync(unused1 ->
+ return stopPartitionAndDestroyForRestart(tablePartitionId,
table).thenComposeAsync(unused1 ->
createPartitionAndStartClient(
tablePartitionId,
- table, revision,
+ table,
false,
assignmentsTimestamp,
localAssignment,
stableAssignments
),
- ioExecutor
- );
- }), ioExecutor));
+ ioExecutor
+ );
+ }), ioExecutor);
}
@Override
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 0ff373b2242..77f6517c6a1 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,7 +59,9 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
+import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
@@ -81,6 +84,7 @@ import
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartition
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.wrapper.Wrapper;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
@@ -275,6 +279,30 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
IgniteImpl nodeToCleanup = findNodeConformingOptions(tableName,
primaryReplica, raftLeader);
+ AtomicBoolean stop = new AtomicBoolean();
+
+ CompletableFuture<Void> msLoadFut = CompletableFuture.runAsync(() -> {
+ MetaStorageManager msMng = node.metaStorageManager();
+
+ var msPutFuts = new ArrayList<CompletableFuture<Void>>();
+
+ int i = 0;
+
+ while (!stop.get()) {
+ msPutFuts.add(msMng.put(ByteArray.fromString("test_key_" +
i++), new byte[0]));
+
+ if (i % 1000 == 0) {
+ assertThat(CompletableFutures.allOf(msPutFuts),
willCompleteSuccessfully());
+
+ msPutFuts.clear();
+
+ log.info("MS entries loaded {}", i);
+ }
+ }
+
+ assertThat(CompletableFutures.allOf(msPutFuts),
willCompleteSuccessfully());
+ });
+
CompletableFuture<Void> restartPartitionsWithCleanupFuture =
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
Set.of(nodeToCleanup.name()),
testZone,
@@ -285,6 +313,10 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
assertThat(restartPartitionsWithCleanupFuture,
willCompleteSuccessfully());
+ stop.set(true);
+
+ assertThat(msLoadFut, willCompleteSuccessfully());
+
insert(1, 1, tableName);
assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);