This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e74127db340 IGNITE-23373 OutdatedTokenException when running 
ItDisasterRecoveryManagerTest#testRestartTablePartitionsWithCleanUp (#6829)
e74127db340 is described below

commit e74127db340f1f3c31866690ddfbbb6971baff7e
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Oct 23 21:48:50 2025 +0300

    IGNITE-23373 OutdatedTokenException when running 
ItDisasterRecoveryManagerTest#testRestartTablePartitionsWithCleanUp (#6829)
---
 .../internal/table/distributed/TableManager.java   | 91 ++++++++++------------
 .../disaster/ItDisasterRecoveryManagerTest.java    | 32 ++++++++
 2 files changed, 73 insertions(+), 50 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index fc654e51d65..7a9f2f176c2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2464,15 +2464,15 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         CompletableFuture<Void> localServicesStartFuture;
 
         if (shouldStartLocalGroupNode) {
-            localServicesStartFuture = createPartitionAndStartClient(
-                    replicaGrpId,
-                    tbl,
-                    revision,
-                    isRecovery,
-                    assignmentsTimestamp,
-                    localAssignmentInPending,
-                    computedStableAssignments
-            );
+            localServicesStartFuture = localPartitionsVv.get(revision)
+                    .thenComposeAsync(unused -> createPartitionAndStartClient(
+                            replicaGrpId,
+                            tbl,
+                            isRecovery,
+                            assignmentsTimestamp,
+                            localAssignmentInPending,
+                            computedStableAssignments
+                    ), ioExecutor);
         } else if (pendingAssignmentsAreForced && localAssignmentInPending != 
null) {
             localServicesStartFuture = runAsync(() -> inBusyLock(busyLock, () 
-> {
                 assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local 
node is outside of the replication group: " + replicaGrpId;
@@ -2530,7 +2530,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     private CompletableFuture<Void> createPartitionAndStartClient(
             TablePartitionId replicaGrpId,
             TableViewInternal tbl,
-            long revision,
             boolean isRecovery,
             long assignmentsTimestamp,
             Assignment localAssignmentInPending,
@@ -2540,39 +2539,34 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         PartitionSet singlePartitionIdSet = PartitionSet.of(partitionId);
 
-        return localPartitionsVv.get(revision)
-                // TODO https://issues.apache.org/jira/browse/IGNITE-20957 
Revisit this code
-                .thenComposeAsync(
-                        unused -> inBusyLock(
-                                busyLock,
-                                () -> getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
-                                        .thenRun(() -> 
localPartsByTableId.compute(
-                                                replicaGrpId.tableId(),
-                                                (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionId)
-                                        ))
-                                        // If the table is already closed, 
it's not a problem (probably the node is stopping).
-                                        
.exceptionally(ignoreTableClosedException())
-                        ),
-                        ioExecutor
-                )
-                .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
-                    lowWatermark.getLowWatermarkSafe(lwm ->
-                            registerIndexesToTable(tbl, catalogService, 
singlePartitionIdSet, tbl.schemaView(), lwm)
-                    );
+        // TODO https://issues.apache.org/jira/browse/IGNITE-20957 Revisit 
this code
+        return inBusyLock(
+                busyLock,
+                () -> getOrCreatePartitionStorages(tbl, singlePartitionIdSet)
+                        .thenRun(() -> localPartsByTableId.compute(
+                                replicaGrpId.tableId(),
+                                (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionId)
+                        ))
+                        // If the table is already closed, it's not a problem 
(probably the node is stopping).
+                        .exceptionally(ignoreTableClosedException())
+        ).thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
+            lowWatermark.getLowWatermarkSafe(lwm ->
+                    registerIndexesToTable(tbl, catalogService, 
singlePartitionIdSet, tbl.schemaView(), lwm)
+            );
 
-                    return 
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(ignored -> 
inBusyLock(busyLock, () -> {
-                        assert localAssignmentInPending != null : "Local 
member assignment";
+            return 
waitForMetadataCompleteness(assignmentsTimestamp).thenCompose(ignored -> 
inBusyLock(busyLock, () -> {
+                assert localAssignmentInPending != null : "Local member 
assignment";
 
-                        return startPartitionAndStartClient(
-                                tbl,
-                                replicaGrpId.partitionId(),
-                                localAssignmentInPending,
-                                computedStableAssignments,
-                                isRecovery,
-                                assignmentsTimestamp
-                        );
-                    }));
-                }), ioExecutor);
+                return startPartitionAndStartClient(
+                        tbl,
+                        replicaGrpId.partitionId(),
+                        localAssignmentInPending,
+                        computedStableAssignments,
+                        isRecovery,
+                        assignmentsTimestamp
+                );
+            }));
+        }), ioExecutor);
     }
 
     /**
@@ -3544,28 +3538,25 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             long revision,
             long assignmentsTimestamp
     ) {
-        return inBusyLockAsync(busyLock, () -> 
tablesVv.get(revision).thenComposeAsync(unused -> inBusyLockAsync(busyLock, () 
-> {
-            TableViewInternal table = tables.get(tablePartitionId.tableId());
-
+        return tableAsync(tablePartitionId.tableId()).thenComposeAsync(table 
-> inBusyLockAsync(busyLock, () -> {
             assert table != null : tablePartitionId;
 
             Assignments stableAssignments = 
stableAssignmentsGetLocally(metaStorageMgr, tablePartitionId, revision);
 
             Assignment localAssignment = localAssignment(stableAssignments);
 
-            return stopPartitionAndDestroyForRestart(tablePartitionId, table)
-                    .thenComposeAsync(unused1 ->
+            return stopPartitionAndDestroyForRestart(tablePartitionId, 
table).thenComposeAsync(unused1 ->
                             createPartitionAndStartClient(
                                     tablePartitionId,
-                                    table, revision,
+                                    table,
                                     false,
                                     assignmentsTimestamp,
                                     localAssignment,
                                     stableAssignments
                             ),
-                            ioExecutor
-                    );
-        }), ioExecutor));
+                    ioExecutor
+            );
+        }), ioExecutor);
     }
 
     @Override
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 0ff373b2242..77f6517c6a1 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,7 +59,9 @@ import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
+import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateEnum;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
@@ -81,6 +84,7 @@ import 
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartition
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.wrapper.Wrapper;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
@@ -275,6 +279,30 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
 
         IgniteImpl nodeToCleanup = findNodeConformingOptions(tableName, 
primaryReplica, raftLeader);
 
+        AtomicBoolean stop = new AtomicBoolean();
+
+        CompletableFuture<Void> msLoadFut =  CompletableFuture.runAsync(() -> {
+            MetaStorageManager msMng = node.metaStorageManager();
+
+            var msPutFuts = new ArrayList<CompletableFuture<Void>>();
+
+            int i = 0;
+
+            while (!stop.get()) {
+                msPutFuts.add(msMng.put(ByteArray.fromString("test_key_" + 
i++), new byte[0]));
+
+                if (i % 1000 == 0) {
+                    assertThat(CompletableFutures.allOf(msPutFuts), 
willCompleteSuccessfully());
+
+                    msPutFuts.clear();
+
+                    log.info("MS entries loaded {}", i);
+                }
+            }
+
+            assertThat(CompletableFutures.allOf(msPutFuts), 
willCompleteSuccessfully());
+        });
+
         CompletableFuture<Void> restartPartitionsWithCleanupFuture = 
node.disasterRecoveryManager().restartTablePartitionsWithCleanup(
                 Set.of(nodeToCleanup.name()),
                 testZone,
@@ -285,6 +313,10 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
 
         assertThat(restartPartitionsWithCleanupFuture, 
willCompleteSuccessfully());
 
+        stop.set(true);
+
+        assertThat(msLoadFut, willCompleteSuccessfully());
+
         insert(1, 1, tableName);
 
         assertValueOnSpecificNodes(tableName, runningNodes, 0, 0);

Reply via email to