This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new db73d706343 IGNITE-27097 Improve recovery from interrupted Raft 
snapshot install (#7207)
db73d706343 is described below

commit db73d7063439b130b15f320b5cdd38ceefa4bfe1
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Dec 11 17:15:29 2025 +0400

    IGNITE-27097 Improve recovery from interrupted Raft snapshot install (#7207)
---
 .../LocalBeforeReplicaStartEventParameters.java    |  79 ++++++
 .../PartitionReplicaLifecycleManager.java          |  32 ++-
 .../partition/replicator/ZoneResourcesManager.java |   4 +
 .../raft/snapshot/PartitionSnapshotStorage.java    |  37 ++-
 .../incoming/IncomingSnapshotCopierTest.java       |   4 +-
 modules/raft/build.gradle                          |   2 +
 ...InterruptedRaftSnapshotStorageRecoveryTest.java | 274 +++++++++++++++++++++
 .../java/org/apache/ignite/internal/Cluster.java   |   8 +-
 .../internal/table/distributed/TableManager.java   | 155 ++++++------
 9 files changed, 501 insertions(+), 94 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalBeforeReplicaStartEventParameters.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalBeforeReplicaStartEventParameters.java
new file mode 100644
index 00000000000..8fcbe8229b9
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalBeforeReplicaStartEventParameters.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+
+/**
+ * Parameters for the {@link 
LocalPartitionReplicaEvent#BEFORE_REPLICA_STARTED} event.
+ *
+ * <p>Used as a container to pass information from event listeners to the 
partition replica lifecycle manager about whether any storage
+ * is seen in the rebalance state.
+ */
+public class LocalBeforeReplicaStartEventParameters extends 
LocalPartitionReplicaEventParameters {
+    private volatile boolean anyStorageIsInRebalanceState;
+
+    private final List<Supplier<CompletableFuture<Void>>> cleanupActions = new 
CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param zonePartitionId Zone partition id.
+     * @param revision Event's revision.
+     * @param onRecovery Flag indicating if this event was produced on node 
recovery.
+     */
+    public LocalBeforeReplicaStartEventParameters(
+            ZonePartitionId zonePartitionId,
+            long revision,
+            boolean onRecovery,
+            boolean anyStorageIsInRebalanceState
+    ) {
+        super(zonePartitionId, revision, onRecovery);
+
+        this.anyStorageIsInRebalanceState = anyStorageIsInRebalanceState;
+    }
+
+    /** Returns whether at least one storage is in rebalance state. */
+    public boolean anyStorageIsInRebalanceState() {
+        return anyStorageIsInRebalanceState;
+    }
+
+    /** Registers that at least one storage is in rebalance state. */
+    public void registerStorageInRebalanceState() {
+        this.anyStorageIsInRebalanceState = true;
+    }
+
+    /**
+     * Adds a cleanup action to be executed after this event has been handled 
by all event handlers, IF at least one storage is
+     * in the rebalance state.
+     *
+     * @param action Cleanup action to execute.
+     */
+    public void addCleanupAction(Supplier<CompletableFuture<Void>> action) {
+        cleanupActions.add(action);
+    }
+
+    /** Returns the registered cleanup actions. */
+    public List<Supplier<CompletableFuture<Void>>> cleanupActions() {
+        return List.copyOf(cleanupActions);
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 1617b1f9a78..396976ef41b 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -700,7 +700,6 @@ public class PartitionReplicaLifecycleManager extends
 
         Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
             var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
-            var eventParams = new 
LocalPartitionReplicaEventParameters(zonePartitionId, revision, onRecovery);
 
             ZonePartitionResources zoneResources = 
zoneResourcesManager.allocateZonePartitionResources(
                     zonePartitionId,
@@ -708,9 +707,40 @@ public class PartitionReplicaLifecycleManager extends
                     storageIndexTracker
             );
 
+            var eventParams = new LocalBeforeReplicaStartEventParameters(
+                    zonePartitionId,
+                    revision,
+                    onRecovery,
+                    zoneResources.txStatePartitionStorageIsInRebalanceState()
+            );
+
             startedReplicationGroups.beforeStartingGroup(zonePartitionId);
 
             return 
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
+                    .thenCompose(v -> {
+                        if (eventParams.anyStorageIsInRebalanceState()) {
+                            // We must destroy protocol storages first. If we 
do so, then, as MV and TX state storages sync Raft log
+                            // before being flushed, there is a guarantee 
that, after a possible crash, we will either see some storage
+                            // still in the rebalance state (and hence we'll 
repeat the destruction on the next start), or the Raft log
+                            // destruction will be persisted (and we'll just 
recover normally).
+                            try {
+                                
replicaMgr.destroyReplicationProtocolStorages(zonePartitionId, isVolatileZone);
+                            } catch (NodeStoppingException e) {
+                                return failedFuture(e);
+                            }
+
+                            CompletableFuture<Void> clearTxStateStorage = 
zoneResources.txStatePartitionStorage().clear();
+
+                            CompletableFuture<?>[] registeredCleanupFutures = 
eventParams.cleanupActions().stream()
+                                    .map(Supplier::get)
+                                    .toArray(CompletableFuture[]::new);
+                            CompletableFuture<Void> clearMvStorages = 
allOf(registeredCleanupFutures);
+
+                            return allOf(clearTxStateStorage, clearMvStorages);
+                        } else {
+                            return nullCompletedFuture();
+                        }
+                    })
                     .thenCompose(v -> {
                         try {
                             // TODO 
https://issues.apache.org/jira/browse/IGNITE-24654 Properly close 
storageIndexTracker.
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index a4e314c7913..51c8efe2cea 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -267,6 +267,10 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
             return txStatePartitionStorage;
         }
 
+        public boolean txStatePartitionStorageIsInRebalanceState() {
+            return txStatePartitionStorage.lastAppliedIndex() == 
TxStatePartitionStorage.REBALANCE_IN_PROGRESS;
+        }
+
         public ZonePartitionRaftListener raftListener() {
             return raftListener;
         }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index f1b390bacab..a28d4bcd2a3 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -48,9 +48,9 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Snapshot storage for a partition.
+ * Snapshot storage for a zone partition.
  *
- * <p>In case of zone partitions manages all table storages of a zone 
partition.
+ * <p>Manages all table storages of a zone partition.
  *
  * <p>Utilizes the fact that every partition already stores its latest applied 
index and thus can itself be used as its own snapshot.
  *
@@ -66,7 +66,7 @@ public class PartitionSnapshotStorage {
     /** Default number of milliseconds that the follower is allowed to try to 
catch up the required catalog version. */
     private static final int DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS = 3000;
 
-    private final PartitionKey partitionKey;
+    private final ZonePartitionKey partitionKey;
 
     private final TopologyService topologyService;
 
@@ -102,7 +102,7 @@ public class PartitionSnapshotStorage {
 
     /** Constructor. */
     public PartitionSnapshotStorage(
-            PartitionKey partitionKey,
+            ZonePartitionKey partitionKey,
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             PartitionTxStateAccess txState,
@@ -126,7 +126,7 @@ public class PartitionSnapshotStorage {
 
     /** Constructor. */
     public PartitionSnapshotStorage(
-            PartitionKey partitionKey,
+            ZonePartitionKey partitionKey,
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             PartitionTxStateAccess txState,
@@ -302,12 +302,15 @@ public class PartitionSnapshotStorage {
         for (PartitionMvStorageAccess partitionStorage : 
partitionsByTableId.values()) {
             long lastAppliedIndex = partitionStorage.lastAppliedIndex();
 
-            assert lastAppliedIndex >= 0 :
-                    String.format("Partition storage [tableId=%d, 
partitionId=%d] contains an unexpected applied index value: %d.",
-                            partitionStorage.tableId(),
-                            partitionStorage.partitionId(),
-                            lastAppliedIndex
-                    );
+            if (lastAppliedIndex < 0) {
+                throw new IllegalStateException(String.format(
+                        "MV partition storage [tableId=%d, zoneId=%d, 
partitionId=%d] contains an unexpected applied index value: %d.",
+                        partitionStorage.tableId(),
+                        partitionKey.zoneId(),
+                        partitionStorage.partitionId(),
+                        lastAppliedIndex
+                ));
+            }
 
             if (lastAppliedIndex == 0) {
                 return null;
@@ -319,7 +322,17 @@ public class PartitionSnapshotStorage {
             }
         }
 
-        if (txState.lastAppliedIndex() < minLastAppliedIndex) {
+        long txStateLastAppliedIndex = txState.lastAppliedIndex();
+        if (txStateLastAppliedIndex < 0) {
+            throw new IllegalStateException(
+                    String.format("Tx state partition storage [key=%s] 
contains an unexpected applied index value: %d.",
+                            partitionKey,
+                            txStateLastAppliedIndex
+                    )
+            );
+        }
+
+        if (txStateLastAppliedIndex < minLastAppliedIndex) {
             return startupSnapshotMetaFromTxStorage();
         } else {
             assert storageWithMinLastAppliedIndex != null;
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index d06514b039f..00aa96c7ddb 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -117,7 +117,6 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
-import 
org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.TransactionIds;
@@ -146,6 +145,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 /** For {@link IncomingSnapshotCopier} testing. */
 @ExtendWith(MockitoExtension.class)
 public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
+    private static final int ZONE_ID = 0;
     private static final int TABLE_ID = 1;
 
     private static final String NODE_NAME = "node";
@@ -396,7 +396,7 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
         
when(outgoingSnapshotsManager.messagingService()).thenReturn(messagingService);
 
         var storage = new PartitionSnapshotStorage(
-                new TablePartitionKey(TABLE_ID, PARTITION_ID),
+                new ZonePartitionKey(ZONE_ID, PARTITION_ID),
                 topologyService,
                 outgoingSnapshotsManager,
                 new 
PartitionTxStateAccessImpl(incomingTxStateStorage.getPartitionStorage(PARTITION_ID)),
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index a315d2f74ce..de2a3a40451 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -113,6 +113,8 @@ dependencies {
     integrationTestImplementation project(':ignite-low-watermark')
     integrationTestImplementation project(':ignite-schema')
     integrationTestImplementation project(':ignite-configuration-root')
+    integrationTestImplementation project(':ignite-page-memory')
+    integrationTestImplementation project(':ignite-transactions')
     integrationTestImplementation libs.awaitility
     integrationTestImplementation libs.dropwizard.metrics
     integrationTestImplementation libs.disruptor
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
new file mode 100644
index 00000000000..73d0472dafe
--- /dev/null
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.components.LogSyncer;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbPartitionStorage;
+import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
+import org.apache.ignite.internal.util.Constants;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.table.KeyValueView;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
+class ItInterruptedRaftSnapshotStorageRecoveryTest extends 
ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+    private static final String ZONE_NAME = "TEST_ZONE";
+
+    @InjectConfiguration("mock.profiles.default {engine = aipersist, sizeBytes 
= " + Constants.GiB + "}")
+    private StorageConfiguration storageConfig;
+
+    @InjectExecutorService
+    private ExecutorService executor;
+
+    @InjectExecutorService
+    private ScheduledExecutorService scheduler;
+
+    @Test
+    void raftSnapshotWorksAfterNonFinishedRaftSnapshotInstallOnMvStorage() {
+        
testRaftSnapshotWorksAfterNonFinishedRaftSnapshotInstall(this::simulateNonFinishedRaftSnapshotInstallInMvStorage);
+    }
+
+    @Test
+    void 
raftSnapshotWorksAfterNonFinishedRaftSnapshotInstallOnTxStateStorage() {
+        
testRaftSnapshotWorksAfterNonFinishedRaftSnapshotInstall(this::simulateNonFinishedRaftSnapshotInstallInTxStateStorage);
+    }
+
+    private void 
testRaftSnapshotWorksAfterNonFinishedRaftSnapshotInstall(Consumer<Path> 
executeOnStoppedNode2StoragePath) {
+        createTableWithOnePartitionAnd3Replicas(TABLE_NAME);
+
+        // Using explicit transaction to be sure that TxStateStorage gets 
written.
+        putOneRowInExplicitTransaction();
+
+        waitForAllNodesToHave1RowInTable(TABLE_NAME);
+
+        // Truncate log prefix to force snapshot installation to node 2 when 
its storages will be cleared on startup.
+        // This also causes flushes of both MV and TxState storages, so, after 
we simulate non-finished rebalance in either MV or
+        // TX state storage and restart node 2, the corresponding storage data 
will not be rewritten by reapplying the log.
+        truncateLogPrefixOnAllNodes(cluster.solePartitionId(ZONE_NAME, 
TABLE_NAME));
+
+        Path node2PartitionsDbPath = 
unwrapIgniteImpl(cluster.node(2)).partitionsWorkDir().dbPath();
+
+        cluster.stopNode(2);
+
+        executeOnStoppedNode2StoragePath.accept(node2PartitionsDbPath);
+
+        Ignite node2 = cluster.startNode(2);
+
+        waitForNodeToHave1RowInTable(TABLE_NAME, node2);
+    }
+
+    private void createTableWithOnePartitionAnd3Replicas(String tableName) {
+        cluster.aliveNode().sql().executeScript("CREATE ZONE " + ZONE_NAME + " 
(PARTITIONS 1, REPLICAS 3) STORAGE PROFILES ['"
+                + CatalogService.DEFAULT_STORAGE_PROFILE + "'];"
+                + "CREATE TABLE " + tableName + " (ID INT PRIMARY KEY, VAL 
VARCHAR) ZONE " + ZONE_NAME + ";");
+    }
+
+    private void putOneRowInExplicitTransaction() {
+        KeyValueView<Integer, String> kvView = 
cluster.aliveNode().tables().table(TABLE_NAME).keyValueView(Integer.class, 
String.class);
+        cluster.aliveNode().transactions().runInTransaction(tx -> {
+            kvView.put(tx, 1, "one");
+        });
+    }
+
+    private void waitForAllNodesToHave1RowInTable(String tableName) {
+        cluster.nodes().parallelStream().forEach(node -> 
waitForNodeToHave1RowInTable(tableName, node));
+    }
+
+    private static void waitForNodeToHave1RowInTable(String tableName, Ignite 
node) {
+        TableViewInternal table = 
unwrapTableViewInternal(node.tables().table(tableName));
+
+        MvPartitionStorage partition = 
table.internalTable().storage().getMvPartition(0);
+        assertThat(partition, is(notNullValue()));
+
+        await().until(partition::estimatedSize, is(1L));
+    }
+
+    private void truncateLogPrefixOnAllNodes(ReplicationGroupId 
replicationGroupId) {
+        cluster.nodes().parallelStream().forEach(node -> 
truncateLogPrefix(replicationGroupId, node));
+    }
+
+    private static void truncateLogPrefix(ReplicationGroupId 
replicationGroupId, Ignite node) {
+        RaftGroupService raftGroupService = 
Cluster.raftGroupService(unwrapIgniteImpl(node), replicationGroupId);
+        assertThat(raftGroupService, is(notNullValue()));
+
+        doSnapshotOn(raftGroupService);
+    }
+
+    private static void doSnapshotOn(RaftGroupService raftGroupService) {
+        var fut = new CompletableFuture<Status>();
+        raftGroupService.getRaftNode().snapshot(fut::complete, true);
+
+        assertThat(fut, willCompleteSuccessfully());
+        assertEquals(RaftError.SUCCESS, fut.join().getRaftError());
+    }
+
+    private StorageTableDescriptor storageTableDescriptor() {
+        TableViewInternal table = 
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+        MvTableStorage mvTableStorage = table.internalTable().storage();
+        return mvTableStorage.getTableDescriptor();
+    }
+
+    private int zoneId() {
+        CatalogManager catalogManager = 
unwrapIgniteImpl(cluster.aliveNode()).catalogManager();
+        Catalog catalog = 
catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+        CatalogZoneDescriptor zone = catalog.zone(ZONE_NAME);
+        assertThat(zone, is(notNullValue()));
+
+        return zone.id();
+    }
+
+    private void simulateNonFinishedRaftSnapshotInstallInMvStorage(Path 
storagePath) {
+        assertTrue(Files.exists(storagePath));
+        assertTrue(Files.isDirectory(storagePath));
+
+        var metricManager = new TestMetricManager();
+
+        StorageEngine engine = createPersistentPageMemoryEngine(storagePath, 
metricManager);
+
+        engine.start();
+
+        try {
+            assertThat(metricManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+            StorageTableDescriptor tableDescriptor = storageTableDescriptor();
+
+            MvTableStorage tableStorage = 
engine.createMvTable(tableDescriptor, new StorageIndexDescriptorSupplier() {
+                @Override
+                public @Nullable StorageIndexDescriptor get(int indexId) {
+                    return null;
+                }
+            });
+            assertThat(tableStorage.createMvPartition(0), 
willCompleteSuccessfully());
+            assertThat(tableStorage.startRebalancePartition(0), 
willCompleteSuccessfully());
+
+            // Flush to make sure it gets written to disk.
+            flushPartition0(tableStorage);
+        } finally {
+            engine.stop();
+        }
+    }
+
+    private StorageEngine createPersistentPageMemoryEngine(Path storagePath, 
MetricManager metricManager) {
+        var ioRegistry = new PageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        return new PersistentPageMemoryStorageEngine(
+                "test",
+                metricManager,
+                storageConfig,
+                null,
+                ioRegistry,
+                storagePath,
+                null,
+                mock(FailureManager.class),
+                mock(LogSyncer.class),
+                scheduler,
+                new HybridClockImpl()
+        );
+    }
+
+    private static void flushPartition0(MvTableStorage tableStorage) {
+        MvPartitionStorage partitionStorage = tableStorage.getMvPartition(0);
+        assertThat(partitionStorage, is(notNullValue()));
+        assertThat(partitionStorage.flush(), willCompleteSuccessfully());
+    }
+
+    private void simulateNonFinishedRaftSnapshotInstallInTxStateStorage(Path 
storagePath) {
+        Path txStateStoragePath = storagePath.resolve("tx-state");
+        assertTrue(Files.exists(txStateStoragePath));
+        assertTrue(Files.isDirectory(txStateStoragePath));
+
+        var sharedStorage = new TxStateRocksDbSharedStorage(
+                "test",
+                txStateStoragePath,
+                scheduler,
+                executor,
+                mock(LogSyncer.class),
+                mock(FailureProcessor.class),
+                () -> 0
+        );
+        assertThat(sharedStorage.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+        try {
+            var zoneTxStateStorage = new TxStateRocksDbStorage(zoneId(), 
storageTableDescriptor().getPartitions(), sharedStorage);
+            zoneTxStateStorage.start();
+
+            TxStateRocksDbPartitionStorage txStatePartitionStorage = 
zoneTxStateStorage.getOrCreatePartitionStorage(0);
+            assertThat(txStatePartitionStorage.startRebalance(), 
willCompleteSuccessfully());
+
+            // Flush to make sure it gets written to disk.
+            sharedStorage.flush();
+        } finally {
+            assertThat(sharedStorage.stopAsync(), willCompleteSuccessfully());
+        }
+    }
+}
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 8225608e18b..1cd5d6cae92 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -620,7 +620,13 @@ public class Cluster {
                 .orElse(null);
     }
 
-    private static @Nullable RaftGroupService raftGroupService(IgniteImpl 
igniteImpl, ReplicationGroupId groupId) {
+    /**
+     * Finds a Raft group service for the given group ID on the given node.
+     *
+     * @param igniteImpl Ignite instance.
+     * @param groupId Replication group ID.
+     */
+    public static @Nullable RaftGroupService raftGroupService(IgniteImpl 
igniteImpl, ReplicationGroupId groupId) {
         JraftServerImpl server = (JraftServerImpl) 
igniteImpl.raftManager().server();
 
         return server.localNodes().stream()
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 36d7895ff8b..b6c672be1f0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.anyOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.groupingBy;
@@ -122,6 +123,7 @@ import 
org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import 
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
 import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
 import org.apache.ignite.internal.partition.replicator.NaiveAsyncReadWriteLock;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
@@ -138,7 +140,6 @@ import 
org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
@@ -184,6 +185,7 @@ import 
org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
@@ -361,11 +363,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     /** Configuration of rebalance retries delay. */
     private final SystemDistributedConfigurationPropertyHolder<Integer> 
rebalanceRetryDelayConfiguration;
 
-    private final EventListener<LocalPartitionReplicaEventParameters> 
onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
+    private final EventListener<LocalBeforeReplicaStartEventParameters> 
onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
     private final EventListener<LocalPartitionReplicaEventParameters> 
onZoneReplicaStoppedListener = this::onZoneReplicaStopped;
     private final EventListener<LocalPartitionReplicaEventParameters> 
onZoneReplicaDestroyedListener = this::onZoneReplicaDestroyed;
 
-    private final EventListener<CreateTableEventParameters> 
onTableCreateWithColocationListener = this::loadTableToZoneOnTableCreate;
+    private final EventListener<CreateTableEventParameters> 
onTableCreateListener = this::loadTableToZoneOnTableCreate;
     private final EventListener<DropTableEventParameters> onTableDropListener 
= fromConsumer(this::onTableDrop);
     private final EventListener<CatalogEventParameters> onTableAlterListener = 
this::onTableAlter;
 
@@ -556,7 +558,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             cleanUpResourcesForDroppedTablesOnRecoveryBusy();
 
-            catalogService.listen(CatalogEvent.TABLE_CREATE, 
onTableCreateWithColocationListener);
+            catalogService.listen(CatalogEvent.TABLE_CREATE, 
onTableCreateListener);
             catalogService.listen(CatalogEvent.TABLE_DROP, 
onTableDropListener);
             catalogService.listen(CatalogEvent.TABLE_ALTER, 
onTableAlterListener);
 
@@ -574,14 +576,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         });
     }
 
-    private CompletableFuture<Boolean> 
beforeZoneReplicaStarted(LocalPartitionReplicaEventParameters parameters) {
+    private CompletableFuture<Boolean> 
beforeZoneReplicaStarted(LocalBeforeReplicaStartEventParameters parameters) {
         return inBusyLockAsync(busyLock, () -> readyToProcessReplicaStarts
                 .thenCompose(v -> beforeZoneReplicaStartedImpl(parameters))
                 .thenApply(unused -> false)
         );
     }
 
-    private CompletableFuture<Void> 
beforeZoneReplicaStartedImpl(LocalPartitionReplicaEventParameters parameters) {
+    private CompletableFuture<Void> 
beforeZoneReplicaStartedImpl(LocalBeforeReplicaStartEventParameters parameters) 
{
         return inBusyLockAsync(busyLock, () -> {
             ZonePartitionId zonePartitionId = parameters.zonePartitionId();
 
@@ -595,7 +597,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 return readLockAcquisitionFuture.thenCompose(stamp -> {
                     Set<TableViewInternal> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
 
-                    return 
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables, 
parameters.onRecovery());
+                    return 
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables, 
parameters);
                 }).whenComplete((unused, t) -> 
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
             } catch (Throwable t) {
                 readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
@@ -608,42 +610,78 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     private CompletableFuture<Void> 
createPartitionsAndLoadResourcesToZoneReplica(
             ZonePartitionId zonePartitionId,
             Set<TableViewInternal> zoneTables,
-            boolean onRecovery
+            LocalBeforeReplicaStartEventParameters event
     ) {
         int partitionIndex = zonePartitionId.partitionId();
 
         PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
 
-        CompletableFuture<?>[] futures = zoneTables.stream()
+        List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
                 .map(tbl -> inBusyLockAsync(busyLock, () -> {
-                    return getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
-                            .thenRunAsync(() -> inBusyLock(busyLock, () -> {
-                                localPartsByTableId.compute(
-                                        tbl.tableId(),
-                                        (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionIndex)
-                                );
-
-                                lowWatermark.getLowWatermarkSafe(lwm ->
-                                        registerIndexesToTable(
-                                                tbl,
-                                                catalogService,
-                                                singlePartitionIdSet,
-                                                tbl.schemaView(),
-                                                lwm
-                                        )
-                                );
-
-                                
preparePartitionResourcesAndLoadToZoneReplicaBusy(tbl, zonePartitionId, 
onRecovery);
-                            }), ioExecutor)
+                    return createPartitionStoragesIfAbsent(tbl, 
singlePartitionIdSet)
                             // If the table is already closed, it's not a 
problem (probably the node is stopping).
                             .exceptionally(ignoreTableClosedException());
                 }))
-                .toArray(CompletableFuture[]::new);
+                .collect(toList());
+
+        return CompletableFutures.allOf(storageCreationFutures)
+                .thenRunAsync(() -> 
scheduleMvPartitionsCleanupIfNeeded(zoneTables, partitionIndex, event), 
ioExecutor)
+                // If a table is already closed, it's not a problem (probably 
the node is stopping).
+                .exceptionally(ignoreTableClosedException())
+                .thenCompose(unused -> {
+                    CompletableFuture<?>[] futures = zoneTables.stream()
+                            .map(tbl -> inBusyLockAsync(busyLock, () -> {
+                                return runAsync(() -> inBusyLock(busyLock, () 
-> {
+                                    localPartsByTableId.compute(
+                                            tbl.tableId(),
+                                            (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionIndex)
+                                    );
+
+                                    lowWatermark.getLowWatermarkSafe(lwm ->
+                                            registerIndexesToTable(
+                                                    tbl,
+                                                    catalogService,
+                                                    singlePartitionIdSet,
+                                                    tbl.schemaView(),
+                                                    lwm
+                                            )
+                                    );
+
+                                    
preparePartitionResourcesAndLoadToZoneReplicaBusy(tbl, zonePartitionId, 
event.onRecovery());
+                                }), ioExecutor)
+                                // If the table is already closed, it's not a 
problem (probably the node is stopping).
+                                .exceptionally(ignoreTableClosedException());
+                            }))
+                            .toArray(CompletableFuture[]::new);
+
+                    return allOf(futures);
+                });
+    }
+
+    private static void scheduleMvPartitionsCleanupIfNeeded(
+            Set<TableViewInternal> zoneTables,
+            int partitionIndex,
+            LocalBeforeReplicaStartEventParameters event
+    ) {
+        boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream()
+                .map(table -> 
table.internalTable().storage().getMvPartition(partitionIndex))
+                .filter(Objects::nonNull)
+                .anyMatch(partitionStorage -> 
partitionStorage.lastAppliedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS);
+
+        if (anyMvPartitionStorageIsInRebalanceState) {
+            event.registerStorageInRebalanceState();
+        }
 
-        return allOf(futures);
+        // Adding the cleanup action even if no MV partition storage is in 
rebalance state as it might be that the TX state storage is.
+        event.addCleanupAction(() -> {
+            CompletableFuture<?>[] clearFutures = zoneTables.stream()
+                    .map(table -> 
table.internalTable().storage().clearPartition(partitionIndex))
+                    .toArray(CompletableFuture[]::new);
+            return allOf(clearFutures);
+        });
     }
 
-    private static Function<Throwable, Void> ignoreTableClosedException() {
+    private static <T> Function<Throwable, T> ignoreTableClosedException() {
         return ex -> {
             if (hasCause(ex, TableClosedException.class)) {
                 return null;
@@ -799,7 +837,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         }
                     }
 
-                    return getOrCreatePartitionStorages(table, 
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
+                    return createPartitionStoragesIfAbsent(table, 
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
                 }, ioExecutor))
                 // If the table is already closed, it's not a problem 
(probably the node is stopping).
                 .exceptionally(ignoreTableClosedException())
@@ -1066,7 +1104,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED, 
onLowWatermarkChangedListener);
 
-        catalogService.removeListener(CatalogEvent.TABLE_CREATE, 
onTableCreateWithColocationListener);
+        catalogService.removeListener(CatalogEvent.TABLE_CREATE, 
onTableCreateListener);
         catalogService.removeListener(CatalogEvent.TABLE_DROP, 
onTableDropListener);
         catalogService.removeListener(CatalogEvent.TABLE_ALTER, 
onTableAlterListener);
     }
@@ -1488,34 +1526,21 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     // TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create 
storages only once.
-    private CompletableFuture<Void> 
getOrCreatePartitionStorages(TableViewInternal table, PartitionSet partitions) {
+    private CompletableFuture<Void> 
createPartitionStoragesIfAbsent(TableViewInternal table, PartitionSet 
partitions) {
         InternalTable internalTable = table.internalTable();
 
-        CompletableFuture<?>[] storageFuts = 
partitions.stream().mapToObj(partitionId -> {
+        List<CompletableFuture<MvPartitionStorage>> storageFuts = 
partitions.stream().mapToObj(partitionId -> {
             MvPartitionStorage mvPartition;
             try {
                 mvPartition = 
internalTable.storage().getMvPartition(partitionId);
             } catch (StorageClosedException e) {
-                return failedFuture(new TableClosedException(table.tableId(), 
e));
+                return CompletableFuture.<MvPartitionStorage>failedFuture(new 
TableClosedException(table.tableId(), e));
             }
 
-            return (mvPartition != null ? completedFuture(mvPartition) : 
internalTable.storage().createMvPartition(partitionId))
-                    .thenComposeAsync(mvPartitionStorage -> {
-                        if (mvPartitionStorage.lastAppliedIndex() == 
MvPartitionStorage.REBALANCE_IN_PROGRESS) {
-                            destroyReplicationProtocolStorages(
-                                    new ZonePartitionId(table.zoneId(), 
partitionId),
-                                    table,
-                                    false
-                            );
-
-                            return 
internalTable.storage().clearPartition(partitionId);
-                        } else {
-                            return nullCompletedFuture();
-                        }
-                    }, ioExecutor);
-        }).toArray(CompletableFuture[]::new);
+            return mvPartition != null ? completedFuture(mvPartition) : 
internalTable.storage().createMvPartition(partitionId);
+        }).collect(toList());
 
-        return allOf(storageFuts);
+        return CompletableFutures.allOf(storageFuts);
     }
 
     private CompletableFuture<Void> 
stopAndDestroyTablePartitions(TableViewInternal table) {
@@ -1657,32 +1682,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
     }
 
-    private void destroyReplicationProtocolStorages(
-            ReplicationGroupId replicationGroupId,
-            TableViewInternal table,
-            boolean destroyDurably
-    ) {
-        var internalTbl = (InternalTableImpl) table.internalTable();
-
-        destroyReplicationProtocolStorages(replicationGroupId, 
internalTbl.storage().isVolatile(), destroyDurably);
-    }
-
-    private void destroyReplicationProtocolStorages(
-            ReplicationGroupId replicationGroupId,
-            boolean isVolatileStorage,
-            boolean destroyDurably
-    ) {
-        try {
-            if (destroyDurably) {
-                
replicaMgr.destroyReplicationProtocolStoragesDurably(replicationGroupId, 
isVolatileStorage);
-            } else {
-                
replicaMgr.destroyReplicationProtocolStorages(replicationGroupId, 
isVolatileStorage);
-            }
-        } catch (NodeStoppingException e) {
-            throw new IgniteInternalException(NODE_STOPPING_ERR, e);
-        }
-    }
-
     private static void closePartitionTrackers(InternalTable internalTable, 
int partitionId) {
         closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));
 


Reply via email to