This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f6eaf1254be IGNITE-26849 Fix node recovery after aborting raft 
snapshot installation and raft log truncation (#7054)
f6eaf1254be is described below

commit f6eaf1254bebef6514ec976e6aa8e307eee801e4
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 25 15:44:35 2025 +0300

    IGNITE-26849 Fix node recovery after aborting raft snapshot installation 
and raft log truncation (#7054)
---
 .../ItZonePartitionRaftListenerRecoveryTest.java   |  8 ++-
 .../PartitionReplicaLifecycleManager.java          |  3 +-
 .../partition/replicator/ZoneResourcesManager.java | 11 +++-
 .../{PartitionKey.java => LogStorageAccess.java}   | 26 ++++++--
 .../raft/snapshot/LogStorageAccessImpl.java        | 42 ++++++++++++
 .../replicator/raft/snapshot/PartitionKey.java     |  4 ++
 .../raft/snapshot/PartitionMvStorageAccess.java    |  3 +
 .../raft/snapshot/PartitionSnapshotStorage.java    | 17 ++++-
 .../replicator/raft/snapshot/ZonePartitionKey.java |  7 ++
 .../snapshot/incoming/IncomingSnapshotCopier.java  | 50 ++++++++++++--
 .../incoming/ReplicationLogStorageKey.java         | 76 ++++++++++++++++++++++
 .../PartitionReplicaLifecycleManagerTest.java      | 57 ++++++++--------
 .../replicator/ZoneResourcesManagerTest.java       |  5 +-
 .../PartitionSnapshotStorageFactoryTest.java       |  3 +-
 .../snapshot/PartitionSnapshotStorageTest.java     |  3 +-
 .../incoming/IncomingSnapshotCopierTest.java       | 11 +++-
 .../outgoing/OutgoingSnapshotReaderTest.java       |  4 +-
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  2 -
 .../java/org/apache/ignite/internal/raft/Loza.java | 18 +++++
 .../internal/raft/server/impl/JraftServerImpl.java | 16 +++++
 .../raft/storage/impl/VolatileRaftMetaStorage.java |  5 ++
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  4 ++
 .../ignite/raft/jraft/storage/RaftMetaStorage.java |  4 +-
 .../jraft/storage/impl/LocalRaftMetaStorage.java   | 12 +++-
 .../ignite/internal/replicator/ReplicaManager.java | 12 ++++
 .../internal/table/distributed/TableManager.java   | 24 +++++--
 .../snapshot/PartitionMvStorageAccessImpl.java     |  5 ++
 .../raft/snapshot/TablePartitionKey.java           |  7 ++
 28 files changed, 374 insertions(+), 65 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index bdcb3605a2c..df3fc9078ea 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
@@ -98,6 +99,7 @@ import 
org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRowImpl;
@@ -236,7 +238,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends 
IgniteAbstractTest {
             @InjectConfiguration SystemLocalConfiguration 
systemLocalConfiguration,
             @InjectExecutorService ScheduledExecutorService 
scheduledExecutorService,
             @Mock Catalog catalog,
-            @Mock CatalogIndexDescriptor catalogIndexDescriptor
+            @Mock CatalogIndexDescriptor catalogIndexDescriptor,
+            @Mock ReplicaManager replicaManager
     ) {
         when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
         
when(catalog.indexes(anyInt())).thenReturn(List.of(catalogIndexDescriptor));
@@ -311,7 +314,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends 
IgniteAbstractTest {
                 new 
PartitionTxStateAccessImpl(txStateStorage.getOrCreatePartitionStorage(PARTITION_ID.partitionId())),
                 catalogService,
                 failureProcessor,
-                executor
+                executor,
+                new LogStorageAccessImpl(replicaManager)
         );
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f3fee7e95cc..ac5d5cc02e8 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -334,7 +334,8 @@ public class PartitionReplicaLifecycleManager extends
                         topologyService,
                         catalogService,
                         failureProcessor,
-                        partitionOperationsExecutor
+                        partitionOperationsExecutor,
+                        replicaMgr
                 )
         );
     }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index e2298bba0e0..a4e314c7913 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -30,10 +30,12 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tx.TxManager;
 import 
org.apache.ignite.internal.tx.storage.state.ThreadAssertingTxStateStorage;
@@ -66,6 +68,8 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
 
     private final Executor partitionOperationsExecutor;
 
+    private final ReplicaManager replicaManager;
+
     /** Map from zone IDs to their resource holders. */
     private final Map<Integer, ZoneResources> resourcesByZoneId = new 
ConcurrentHashMap<>();
 
@@ -78,7 +82,8 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
             TopologyService topologyService,
             CatalogService catalogService,
             FailureProcessor failureProcessor,
-            Executor partitionOperationsExecutor
+            Executor partitionOperationsExecutor,
+            ReplicaManager replicaManager
     ) {
         this.sharedTxStateStorage = sharedTxStateStorage;
         this.txManager = txManager;
@@ -87,6 +92,7 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
         this.catalogService = catalogService;
         this.failureProcessor = failureProcessor;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
+        this.replicaManager = replicaManager;
     }
 
     ZonePartitionResources allocateZonePartitionResources(
@@ -121,7 +127,8 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
                 new PartitionTxStateAccessImpl(txStatePartitionStorage),
                 catalogService,
                 failureProcessor,
-                partitionOperationsExecutor
+                partitionOperationsExecutor,
+                new LogStorageAccessImpl(replicaManager)
         );
 
         var zonePartitionResources = new ZonePartitionResources(
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
similarity index 50%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
index ae5c8aac731..e8e49c2d5ce 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
@@ -17,9 +17,25 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot;
 
-/**
- * Uniquely identifies a partition.
- */
-public interface PartitionKey {
-    int partitionId();
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/** Small abstraction for accessing the replication log. */
+public interface LogStorageAccess {
+    /**
+     * Destroys the replication log.
+     *
+     * @param replicationGroupId Replication group ID.
+     * @param isVolatile Is storage volatile.
+     * @throws NodeStoppingException If the node is being stopped.
+     */
+    void destroy(ReplicationGroupId replicationGroupId, boolean isVolatile) 
throws NodeStoppingException;
+
+    /**
+     * Creates a replication log meta storage.
+     *
+     * @param replicationGroupId Replication group ID.
+     * @throws NodeStoppingException If the node is being stopped.
+     */
+    void createMetaStorage(ReplicationGroupId replicationGroupId) throws 
NodeStoppingException;
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
new file mode 100644
index 00000000000..c7ce4fcd1bc
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot;
+
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/** {@link LogStorageAccess} implementation. */
+public class LogStorageAccessImpl implements LogStorageAccess {
+    private final ReplicaManager replicaManager;
+
+    /** Constructor. */
+    public LogStorageAccessImpl(ReplicaManager replicaManager) {
+        this.replicaManager = replicaManager;
+    }
+
+    @Override
+    public void destroy(ReplicationGroupId replicationGroupId, boolean 
isVolatile) throws NodeStoppingException {
+        replicaManager.destroyReplicationProtocolStorages(replicationGroupId, 
isVolatile);
+    }
+
+    @Override
+    public void createMetaStorage(ReplicationGroupId replicationGroupId) 
throws NodeStoppingException {
+        replicaManager.createMetaStorage(replicationGroupId);
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
index ae5c8aac731..37afbbed07a 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
@@ -17,9 +17,13 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot;
 
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
 /**
  * Uniquely identifies a partition.
  */
 public interface PartitionKey {
     int partitionId();
+
+    ReplicationGroupId toReplicationGroupId();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
index 83924f03fee..cae0a756feb 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
@@ -200,4 +200,7 @@ public interface PartitionMvStorageAccess {
      * @param newLowWatermark Candidate for update.
      */
     void updateLowWatermark(HybridTimestamp newLowWatermark);
+
+    /** Returns {@code true} if this storage is volatile (i.e. stores its data 
in memory), or {@code false} if it's persistent. */
+    boolean isVolatile();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index be57c369751..f1b390bacab 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -98,6 +98,8 @@ public class PartitionSnapshotStorage {
 
     private final long waitForMetadataCatchupMs;
 
+    private final LogStorageAccess logStorage;
+
     /** Constructor. */
     public PartitionSnapshotStorage(
             PartitionKey partitionKey,
@@ -106,7 +108,8 @@ public class PartitionSnapshotStorage {
             PartitionTxStateAccess txState,
             CatalogService catalogService,
             FailureProcessor failureProcessor,
-            Executor incomingSnapshotsExecutor
+            Executor incomingSnapshotsExecutor,
+            LogStorageAccess logStorage
     ) {
         this(
                 partitionKey,
@@ -116,7 +119,8 @@ public class PartitionSnapshotStorage {
                 catalogService,
                 failureProcessor,
                 incomingSnapshotsExecutor,
-                DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS
+                DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS,
+                logStorage
         );
     }
 
@@ -129,7 +133,8 @@ public class PartitionSnapshotStorage {
             CatalogService catalogService,
             FailureProcessor failureProcessor,
             Executor incomingSnapshotsExecutor,
-            long waitForMetadataCatchupMs
+            long waitForMetadataCatchupMs,
+            LogStorageAccess logStorage
     ) {
         this.partitionKey = partitionKey;
         this.topologyService = topologyService;
@@ -139,6 +144,7 @@ public class PartitionSnapshotStorage {
         this.failureProcessor = failureProcessor;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
         this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
+        this.logStorage = logStorage;
     }
 
     public PartitionKey partitionKey() {
@@ -356,4 +362,9 @@ public class PartitionSnapshotStorage {
                 .oldLearnersList(configuration.oldLearners())
                 .build();
     }
+
+    /** Returns the replication log storage. */
+    public LogStorageAccess logStorage() {
+        return logStorage;
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
index 43bfe476c0e..b85947e65fb 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.partition.replicator.raft.snapshot;
 
 import java.util.Objects;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -72,4 +74,9 @@ public class ZonePartitionKey implements PartitionKey {
     public String toString() {
         return S.toString(ZonePartitionKey.class, this);
     }
+
+    @Override
+    public ReplicationGroupId toReplicationGroupId() {
+        return new ZonePartitionId(zoneId, partitionId);
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 768cf64ead4..f41798a6807 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -20,11 +20,16 @@ package 
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.anyOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -32,6 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -45,6 +51,8 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.IgniteThrottledLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -61,6 +69,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
@@ -236,11 +245,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier 
{
             } catch (CancellationException ignored) {
                 // Ignored.
             } catch (ExecutionException e) {
-                Throwable cause = e.getCause();
-
-                if (!(cause instanceof CancellationException)) {
+                if (!hasCause(e, CancellationException.class, 
NodeStoppingException.class)) {
                     // TODO https://issues.apache.org/jira/browse/IGNITE-26811 
HandshakeException is thrown when node is stopping.
-                    if (!(cause instanceof HandshakeException)) {
+                    if (!hasCause(e, HandshakeException.class)) {
                         
partitionSnapshotStorage.failureProcessor().process(new FailureContext(e, 
"Error when completing the copier"));
                     }
 
@@ -249,7 +256,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
                     }
 
                     // By analogy with LocalSnapshotCopier#join.
-                    throw new IllegalStateException(cause);
+                    throw new IllegalStateException(e);
                 }
             }
         }
@@ -688,7 +695,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
             return allOf(
                     
aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance, 
snapshotContext),
                     partitionSnapshotStorage.txState().startRebalance()
-            );
+            ).thenComposeAsync(unused -> 
startRebalanceForReplicationLogStorages(snapshotContext), executor);
         } finally {
             busyLock.leaveBusy();
         }
@@ -736,4 +743,35 @@ public class IncomingSnapshotCopier extends SnapshotCopier 
{
             this.partitionsByTableId = partitionsByTableId;
         }
     }
+
+    private CompletableFuture<Void> 
startRebalanceForReplicationLogStorages(SnapshotContext snapshotContext) {
+        if (!busyLock.enterBusy()) {
+            return nullCompletedFuture();
+        }
+
+        try {
+            Set<ReplicationLogStorageKey> keys = 
collectReplicationLogStorageKeys(snapshotContext);
+
+            return runAsync(() -> inBusyLockSafe(busyLock, () -> 
keys.forEach(this::startRebalanceForReplicationLogStorage)), executor);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private Set<ReplicationLogStorageKey> 
collectReplicationLogStorageKeys(SnapshotContext snapshotContext) {
+        return snapshotContext.partitionsByTableId.values().stream()
+                .map(partitionMvStorage -> 
ReplicationLogStorageKey.create(partitionSnapshotStorage, partitionMvStorage))
+                .collect(toSet());
+    }
+
+    private void 
startRebalanceForReplicationLogStorage(ReplicationLogStorageKey key) throws 
IgniteInternalException {
+        try {
+            LogStorageAccess logStorage = 
partitionSnapshotStorage.logStorage();
+
+            logStorage.destroy(key.replicationGroupId(), key.isVolatile());
+            logStorage.createMetaStorage(key.replicationGroupId());
+        } catch (NodeStoppingException e) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, e);
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/ReplicationLogStorageKey.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/ReplicationLogStorageKey.java
new file mode 100644
index 00000000000..7c05c8bfd6f
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/ReplicationLogStorageKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
+
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Helper class for starting rebalancing for the replication log. */
+class ReplicationLogStorageKey {
+    @IgniteToStringInclude
+    private final ReplicationGroupId replicationGroupId;
+
+    private final boolean isVolatile;
+
+    private ReplicationLogStorageKey(ReplicationGroupId replicationGroupId, 
boolean isVolatile) {
+        this.replicationGroupId = replicationGroupId;
+        this.isVolatile = isVolatile;
+    }
+
+    static ReplicationLogStorageKey create(PartitionSnapshotStorage 
snapshotStorage, PartitionMvStorageAccess mvStorage) {
+        return new 
ReplicationLogStorageKey(snapshotStorage.partitionKey().toReplicationGroupId(), 
mvStorage.isVolatile());
+    }
+
+    ReplicationGroupId replicationGroupId() {
+        return replicationGroupId;
+    }
+
+    boolean isVolatile() {
+        return isVolatile;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ReplicationLogStorageKey that = (ReplicationLogStorageKey) o;
+
+        return isVolatile == that.isVolatile && 
replicationGroupId.equals(that.replicationGroupId);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = replicationGroupId.hashCode();
+        result = 31 * result + (isVolatile ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(ReplicationLogStorageKey.class, this);
+    }
+}
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index d1cccbc6e64..1bade1611ac 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -202,34 +202,6 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 new PendingComparableValuesTracker<>(0L)
         ));
 
-        zoneResourcesManager = spy(new ZoneResourcesManager(
-                sharedTxStateStorage,
-                txManager,
-                outgoingSnapshotsManager,
-                topologyService,
-                catalogService,
-                failureManager,
-                executorService
-        ) {
-            @Override
-            protected TxStateStorage createTxStateStorage(int zoneId, int 
partitionCount) {
-                TxStateStorage txStateStorage = new 
TxStateRocksDbStorage(zoneId, partitionCount, sharedTxStateStorage) {
-                    @Override
-                    protected TxStateRocksDbPartitionStorage 
createPartitionStorage(int partitionId) {
-                        return txStatePartitionStorage;
-                    }
-                };
-
-                if (ThreadAssertions.enabled()) {
-                    txStateStorage = new 
ThreadAssertingTxStateStorage(txStateStorage);
-                }
-
-                txStateStorage.start();
-
-                return txStateStorage;
-            }
-        });
-
         when(raftManager.startRaftGroupNode(any(), any(), any(), any(), 
any(RaftGroupOptions.class), any()))
                 .thenReturn(topologyAwareRaftGroupService);
 
@@ -264,6 +236,35 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 executorService
         ));
 
+        zoneResourcesManager = spy(new ZoneResourcesManager(
+                sharedTxStateStorage,
+                txManager,
+                outgoingSnapshotsManager,
+                topologyService,
+                catalogService,
+                failureManager,
+                executorService,
+                replicaManager
+        ) {
+            @Override
+            protected TxStateStorage createTxStateStorage(int zoneId, int 
partitionCount) {
+                TxStateStorage txStateStorage = new 
TxStateRocksDbStorage(zoneId, partitionCount, sharedTxStateStorage) {
+                    @Override
+                    protected TxStateRocksDbPartitionStorage 
createPartitionStorage(int partitionId) {
+                        return txStatePartitionStorage;
+                    }
+                };
+
+                if (ThreadAssertions.enabled()) {
+                    txStateStorage = new 
ThreadAssertingTxStateStorage(txStateStorage);
+                }
+
+                txStateStorage.start();
+
+                return txStateStorage;
+            }
+        });
+
         partitionReplicaLifecycleManager = new 
PartitionReplicaLifecycleManager(
                 catalogManager,
                 replicaManager,
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
index 991dda265ef..6d2b0c50cf7 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -72,6 +73,7 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
             @Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
             @Mock TopologyService topologyService,
             @Mock CatalogService catalogService,
+            @Mock ReplicaManager replicaManager,
             @InjectExecutorService ScheduledExecutorService scheduler,
             @InjectExecutorService ExecutorService executor
     ) {
@@ -92,7 +94,8 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
                 topologyService,
                 catalogService,
                 mock(FailureProcessor.class),
-                executor
+                executor,
+                replicaManager
         );
 
         storageIndexTracker = new PendingComparableValuesTracker<>(0L);
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
index 4704cf829d1..6373adc63af 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -49,7 +49,8 @@ class PartitionSnapshotStorageFactoryTest extends 
BaseIgniteAbstractTest {
             mock(PartitionTxStateAccess.class),
             mock(CatalogService.class),
             mock(FailureProcessor.class),
-            mock(Executor.class)
+            mock(Executor.class),
+            mock(LogStorageAccess.class)
     );
 
     @Mock
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
index 59219e1648d..014b76f1375 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
@@ -59,7 +59,8 @@ public class PartitionSnapshotStorageTest extends 
BaseIgniteAbstractTest {
             mock(PartitionTxStateAccess.class),
             mock(CatalogService.class),
             mock(FailureProcessor.class),
-            mock(Executor.class)
+            mock(Executor.class),
+            mock(LogStorageAccess.class)
     );
 
     @Test
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index dfbdb45a3cf..3d69d271916 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -37,6 +37,7 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -86,6 +87,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
@@ -93,6 +95,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartiti
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
+import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -199,6 +202,8 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
     private final TestLowWatermark lowWatermark = spy(new TestLowWatermark());
 
+    private final ReplicaManager replicaManager = mock(ReplicaManager.class);
+
     @BeforeEach
     void setUp(
             @Mock Catalog catalog,
@@ -219,7 +224,7 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void test() {
+    void test() throws Exception {
         fillOriginalStorages();
 
         createTargetStorages();
@@ -269,6 +274,7 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
         verify(incomingMvTableStorage).startRebalancePartition(PARTITION_ID);
         verify(incomingTxStatePartitionStorage).startRebalance();
+        verify(replicaManager).destroyReplicationProtocolStorages(any(), 
anyBoolean());
 
         var expSnapshotInfo = new PartitionSnapshotInfo(
                 expLastAppliedIndex,
@@ -396,7 +402,8 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                 catalogService,
                 mock(FailureProcessor.class),
                 executorService,
-                0
+                0,
+                new LogStorageAccessImpl(replicaManager)
         );
 
         storage.addMvPartition(TABLE_ID, spy(new PartitionMvStorageAccessImpl(
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index 47af48813a7..b7283ad58e2 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
@@ -84,7 +85,8 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
                 txStateAccess,
                 catalogService,
                 mock(FailureProcessor.class),
-                mock(Executor.class)
+                mock(Executor.class),
+                mock(LogStorageAccess.class)
         );
 
         snapshotStorage.addMvPartition(TABLE_ID_1, partitionAccess1);
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 280c7f736c9..5f12e28d7e5 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -108,7 +108,6 @@ import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
@@ -763,7 +762,6 @@ class ItTableRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
         assertThat(getFromNode(2, 1), is("one"));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26849";)
     @Test
     void testRestartNodeAfterTruncateRaftLogPrefixAndAbortRebalance() throws 
Exception {
         createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 18515206f9f..36f7341b127 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -482,6 +482,24 @@ public class Loza implements RaftManager {
         }
     }
 
+    /**
+     * Creates replication log meta storage for the given group ID.
+     *
+     * @param nodeId ID of the Raft node.
+     * @throws NodeStoppingException If the node is being stopped.
+     */
+    public void createMetaStorage(RaftNodeId nodeId) throws 
NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            raftServer.createMetaStorage(nodeId);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /**
      * Returns Raft node IDs for which any storage (log storage or Raft meta 
storage) is present on disk.
      */
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index f4e375a883b..9a98ab4c0d2 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -92,6 +92,7 @@ import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.core.NodeImpl.LogEntryAndClosure;
 import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl.ReadIndexEvent;
 import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
@@ -613,6 +614,21 @@ public class JraftServerImpl implements RaftServer {
         destroyRaftNodeStoragesInternal(nodeId, groupOptions, true);
     }
 
+    /**
+     * Creates replication log meta storage for the given group ID.
+     *
+     * @param nodeId ID of the Raft node.
+     */
+    public void createMetaStorage(RaftNodeId nodeId) {
+        RaftGroupService raftGroupService = nodes.get(nodeId);
+
+        if (raftGroupService == null) {
+            return;
+        }
+
+        ((NodeImpl) 
raftGroupService.getRaftNode()).metaStorage().createAfterDestroy();
+    }
+
     private void destroyRaftNodeStoragesInternal(RaftNodeId nodeId, 
RaftGroupOptions groupOptions, boolean durable) {
         StorageDestructionIntent intent = 
groupStoragesContextResolver.getIntent(nodeId, groupOptions.volatileStores());
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
index aed8e8108d6..38657691a26 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
@@ -72,4 +72,9 @@ public class VolatileRaftMetaStorage implements 
RaftMetaStorage, VolatileStorage
 
         return true;
     }
+
+    @Override
+    public void createAfterDestroy() {
+        // no-op
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 9be02c8b265..4e6e0bacf88 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -4355,4 +4355,8 @@ public class NodeImpl implements Node, RaftServerService {
     public FSMCaller fsmCaller() {
         return fsmCaller;
     }
+
+    public RaftMetaStorage metaStorage() {
+        return metaStorage;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
index 94244126edf..cc3fd625d4f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
@@ -24,7 +24,6 @@ import 
org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions;
  * Raft metadata storage service.
  */
 public interface RaftMetaStorage extends Lifecycle<RaftMetaStorageOptions>, 
Storage {
-
     /**
      * Set current term.
      */
@@ -49,4 +48,7 @@ public interface RaftMetaStorage extends 
Lifecycle<RaftMetaStorageOptions>, Stor
      * Set term and voted for information.
      */
     boolean setTermAndVotedFor(final long term, final PeerId peerId);
+
+    /** Creates a meta storage after it is destroyed. */
+    void createAfterDestroy();
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
index a539df16344..e2a58e75cd3 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
@@ -38,7 +38,6 @@ import org.apache.ignite.raft.jraft.util.Utils;
  * Raft meta storage,it's not thread-safe.
  */
 public class LocalRaftMetaStorage implements RaftMetaStorage {
-
     private static final IgniteLogger LOG = 
Loggers.forClass(LocalRaftMetaStorage.class);
     private static final String RAFT_META = "raft_meta";
 
@@ -95,7 +94,7 @@ public class LocalRaftMetaStorage implements RaftMetaStorage {
             return true;
         }
         catch (final IOException e) {
-            LOG.error("Fail to load raft meta storage [node={}].", 
this.node.getNodeId(), e);
+            LOG.error("Fail to load raft meta storage [node={}].", e, 
this.node.getNodeId());
             return false;
         }
     }
@@ -119,7 +118,7 @@ public class LocalRaftMetaStorage implements 
RaftMetaStorage {
             return true;
         }
         catch (final Exception e) {
-            LOG.error("Fail to save raft meta [node={}].", 
this.node.getNodeId(), e);
+            LOG.error("Fail to save raft meta [node={}].", e, 
this.node.getNodeId());
             reportIOError();
             return false;
         }
@@ -187,6 +186,13 @@ public class LocalRaftMetaStorage implements 
RaftMetaStorage {
         return save();
     }
 
+    @Override
+    public void createAfterDestroy() {
+        if (!Utils.mkdir(new File(path))) {
+            LOG.error("Fail to mkdir [node={}, path={}].", 
this.node.getNodeId(), this.path);
+        }
+    }
+
     @Override
     public String toString() {
         return "RaftMetaStorageImpl [path=" + this.path + ", term=" + 
this.term + ", votedFor=" + this.votedFor + "]";
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index ec8b2bbaeb8..340a43deef0 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -1380,6 +1380,18 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         ((Loza) raftManager).destroyRaftNodeStoragesDurably(raftNodeId, 
groupOptions);
     }
 
+    /**
+     * Creates replication log meta storage for the given group ID.
+     *
+     * @param replicaGrpId Replication group ID.
+     * @throws NodeStoppingException If the node is being stopped.
+     */
+    public void createMetaStorage(ReplicationGroupId replicaGrpId) throws 
NodeStoppingException {
+        var raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNodeConsistentId));
+
+        ((Loza) raftManager).createMetaStorage(raftNodeId);
+    }
+
     /**
      * Returns IDs of all partitions of tables for which any storage of 
replication protocol is present on disk.
      */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 03e9855cf4d..6a15c5872f0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -162,6 +162,7 @@ import 
org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
 import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
@@ -2681,7 +2682,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 txStateAccess,
                 catalogService,
                 failureProcessor,
-                incomingSnapshotsExecutor
+                incomingSnapshotsExecutor,
+                new LogStorageAccessImpl(replicaMgr)
         );
 
         snapshotStorage.addMvPartition(internalTable.tableId(), 
partitionAccess);
@@ -2800,6 +2802,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                                 || !nodeProperties.colocationEnabled()
                                         && 
txStatePartitionStorage.lastAppliedIndex() == 
TxStatePartitionStorage.REBALANCE_IN_PROGRESS) {
                             if (nodeProperties.colocationEnabled()) {
+                                destroyReplicationProtocolStorages(
+                                        new ZonePartitionId(table.zoneId(), 
partitionId),
+                                        table,
+                                        false
+                                );
+
                                 return 
internalTable.storage().clearPartition(partitionId);
                             } else {
                                 return allOf(
@@ -3141,22 +3149,26 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
     }
 
-    private void destroyReplicationProtocolStorages(TablePartitionId 
tablePartitionId, TableViewInternal table, boolean destroyDurably) {
+    private void destroyReplicationProtocolStorages(
+            ReplicationGroupId replicationGroupId,
+            TableViewInternal table,
+            boolean destroyDurably
+    ) {
         var internalTbl = (InternalTableImpl) table.internalTable();
 
-        destroyReplicationProtocolStorages(tablePartitionId, 
internalTbl.storage().isVolatile(), destroyDurably);
+        destroyReplicationProtocolStorages(replicationGroupId, 
internalTbl.storage().isVolatile(), destroyDurably);
     }
 
     private void destroyReplicationProtocolStorages(
-            TablePartitionId tablePartitionId,
+            ReplicationGroupId replicationGroupId,
             boolean isVolatileStorage,
             boolean destroyDurably
     ) {
         try {
             if (destroyDurably) {
-                
replicaMgr.destroyReplicationProtocolStoragesDurably(tablePartitionId, 
isVolatileStorage);
+                
replicaMgr.destroyReplicationProtocolStoragesDurably(replicationGroupId, 
isVolatileStorage);
             } else {
-                
replicaMgr.destroyReplicationProtocolStorages(tablePartitionId, 
isVolatileStorage);
+                
replicaMgr.destroyReplicationProtocolStorages(replicationGroupId, 
isVolatileStorage);
             }
         } catch (NodeStoppingException e) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, e);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
index 9736c7a0416..77dd0133e81 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
@@ -261,6 +261,11 @@ public class PartitionMvStorageAccessImpl implements 
PartitionMvStorageAccess {
         lowWatermark.updateLowWatermark(newLowWatermark);
     }
 
+    @Override
+    public boolean isVolatile() {
+        return mvTableStorage.isVolatile();
+    }
+
     private MvPartitionStorage getMvPartitionStorage() {
         int partitionId = partitionId();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
index 1c2cd841ee5..83e7f2dc988 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
@@ -19,6 +19,8 @@ package 
org.apache.ignite.internal.table.distributed.raft.snapshot;
 
 import java.util.Objects;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -73,4 +75,9 @@ public class TablePartitionKey implements PartitionKey {
     public String toString() {
         return S.toString(TablePartitionKey.class, this);
     }
+
+    @Override
+    public ReplicationGroupId toReplicationGroupId() {
+        return new TablePartitionId(tableId, partitionId);
+    }
 }

Reply via email to