This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26849
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 5bb65e76f85771082591d6c5e4390719c8d41595
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Nov 24 18:28:32 2025 +0300

    IGNITE-26849 wip
---
 .../replicator/raft/snapshot/LogStorageAccess.java | 13 +++-
 .../raft/snapshot/LogStorageAccessImpl.java        |  8 +--
 .../replicator/raft/snapshot/PartitionKey.java     |  4 ++
 .../raft/snapshot/PartitionMvStorageAccess.java    |  3 +
 .../replicator/raft/snapshot/ZonePartitionKey.java |  7 ++
 .../incoming/DestroyReplicationLogStorageKey.java  | 76 ++++++++++++++++++++++
 .../snapshot/incoming/IncomingSnapshotCopier.java  | 41 +++++++++---
 .../incoming/IncomingSnapshotCopierTest.java       |  5 +-
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  2 -
 .../snapshot/PartitionMvStorageAccessImpl.java     |  5 ++
 .../raft/snapshot/TablePartitionKey.java           |  7 ++
 11 files changed, 152 insertions(+), 19 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
index 1d05876a491..e3b78cd0e89 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
@@ -17,7 +17,16 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot;
 
-/** No doc yet. */
-// TODO: IGNITE-26849 Добаить документацию и методы
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/** Small abstraction for accessing the replication log. */
 public interface LogStorageAccess {
+    /**
+     * Destroys the replication log.
+     *
+     * @param replicationGroupId Replication group ID.
+     * @param isVolatile Is storage volatile.
+     * @throws Exception If there was an error during destruction.
+     */
+    void destroy(ReplicationGroupId replicationGroupId, boolean isVolatile) 
throws Exception;
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
index 81307c3f74b..6161094d684 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
@@ -20,8 +20,7 @@ package 
org.apache.ignite.internal.partition.replicator.raft.snapshot;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 
-/** No doc yet. */
-// TODO: IGNITE-26849 Добаить документацию и методы
+/** {@link LogStorageAccess} implementation. */
 public class LogStorageAccessImpl implements LogStorageAccess {
     private final ReplicaManager replicaManager;
 
@@ -30,7 +29,8 @@ public class LogStorageAccessImpl implements LogStorageAccess 
{
         this.replicaManager = replicaManager;
     }
 
-    public void destroy(ReplicationGroupId replicationGroupId) throws 
Exception {
-        replicaManager.destroyReplicationProtocolStorages(replicationGroupId, 
false);
+    @Override
+    public void destroy(ReplicationGroupId replicationGroupId, boolean 
isVolatile) throws Exception {
+        replicaManager.destroyReplicationProtocolStorages(replicationGroupId, 
isVolatile);
     }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
index ae5c8aac731..37afbbed07a 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
@@ -17,9 +17,13 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot;
 
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
 /**
  * Uniquely identifies a partition.
  */
 public interface PartitionKey {
     int partitionId();
+
+    ReplicationGroupId toReplicationGroupId();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
index 83924f03fee..cae0a756feb 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
@@ -200,4 +200,7 @@ public interface PartitionMvStorageAccess {
      * @param newLowWatermark Candidate for update.
      */
     void updateLowWatermark(HybridTimestamp newLowWatermark);
+
+    /** Returns {@code true} if this storage is volatile (i.e. stores its data 
in memory), or {@code false} if it's persistent. */
+    boolean isVolatile();
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
index 43bfe476c0e..b85947e65fb 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.partition.replicator.raft.snapshot;
 
 import java.util.Objects;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -72,4 +74,9 @@ public class ZonePartitionKey implements PartitionKey {
     public String toString() {
         return S.toString(ZonePartitionKey.class, this);
     }
+
+    @Override
+    public ReplicationGroupId toReplicationGroupId() {
+        return new ZonePartitionId(zoneId, partitionId);
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/DestroyReplicationLogStorageKey.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/DestroyReplicationLogStorageKey.java
new file mode 100644
index 00000000000..93da54b01f9
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/DestroyReplicationLogStorageKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
+
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Helper class for destroying the replication log. */
+class DestroyReplicationLogStorageKey {
+    @IgniteToStringInclude
+    private final ReplicationGroupId replicationGroupId;
+
+    private final boolean isVolatile;
+
+    private DestroyReplicationLogStorageKey(ReplicationGroupId 
replicationGroupId, boolean isVolatile) {
+        this.replicationGroupId = replicationGroupId;
+        this.isVolatile = isVolatile;
+    }
+
+    static DestroyReplicationLogStorageKey create(PartitionSnapshotStorage 
snapshotStorage, PartitionMvStorageAccess mvStorage) {
+        return new 
DestroyReplicationLogStorageKey(snapshotStorage.partitionKey().toReplicationGroupId(),
 mvStorage.isVolatile());
+    }
+
+    ReplicationGroupId replicationGroupId() {
+        return replicationGroupId;
+    }
+
+    boolean isVolatile() {
+        return isVolatile;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DestroyReplicationLogStorageKey that = 
(DestroyReplicationLogStorageKey) o;
+
+        return isVolatile == that.isVolatile && 
replicationGroupId.equals(that.replicationGroupId);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = replicationGroupId.hashCode();
+        result = 31 * result + (isVolatile ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(DestroyReplicationLogStorageKey.class, this);
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index f423452b75e..2baa7569fc2 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -22,10 +22,13 @@ import static java.util.concurrent.CompletableFuture.anyOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -33,6 +36,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -46,6 +50,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.IgniteThrottledLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -61,16 +66,11 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
-import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer;
-import org.apache.ignite.internal.replicator.TablePartitionId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -690,7 +690,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
             return allOf(
                     
aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance, 
snapshotContext),
                     partitionSnapshotStorage.txState().startRebalance()
-            );
+            ).thenComposeAsync(unused -> 
destroyReplicationLogStorages(snapshotContext), executor);
         } finally {
             busyLock.leaveBusy();
         }
@@ -739,8 +739,31 @@ public class IncomingSnapshotCopier extends SnapshotCopier 
{
         }
     }
 
-    // TODO: IGNITE-26849 Реализовать и по нормальному
-    private CompletableFuture<Void> 
destroyReplicationLogStorage(SnapshotContext snapshotContext) {
-        return nullCompletedFuture();
+    private CompletableFuture<Void> 
destroyReplicationLogStorages(SnapshotContext snapshotContext) {
+        if (!busyLock.enterBusy()) {
+            return nullCompletedFuture();
+        }
+
+        try {
+            Set<DestroyReplicationLogStorageKey> keys = 
collectDestroyReplicationLogStorageKeys(snapshotContext);
+
+            return runAsync(() -> inBusyLockSafe(busyLock, () -> 
keys.forEach(this::destroyReplicationLogStorage)), executor);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private Set<DestroyReplicationLogStorageKey> 
collectDestroyReplicationLogStorageKeys(SnapshotContext snapshotContext) {
+        return snapshotContext.partitionsByTableId.values().stream()
+                .map(partitionMvStorage -> 
DestroyReplicationLogStorageKey.create(partitionSnapshotStorage, 
partitionMvStorage))
+                .collect(toSet());
+    }
+
+    private void destroyReplicationLogStorage(DestroyReplicationLogStorageKey 
key) {
+        try {
+            
partitionSnapshotStorage.logStorage().destroy(key.replicationGroupId(), 
key.isVolatile());
+        } catch (Exception e) {
+            throw new IgniteInternalException(INTERNAL_ERR, "Failed to destroy 
replication log storage: {}", e, key);
+        }
     }
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 27b32b04f38..44e63b64024 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -37,6 +37,7 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -223,7 +224,7 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    void test() {
+    void test() throws Exception {
         fillOriginalStorages();
 
         createTargetStorages();
@@ -273,6 +274,7 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
 
         verify(incomingMvTableStorage).startRebalancePartition(PARTITION_ID);
         verify(incomingTxStatePartitionStorage).startRebalance();
+        verify(replicaManager).destroyReplicationProtocolStorages(any(), 
anyBoolean());
 
         var expSnapshotInfo = new PartitionSnapshotInfo(
                 expLastAppliedIndex,
@@ -401,7 +403,6 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                 mock(FailureProcessor.class),
                 executorService,
                 0,
-                // TODO: IGNITE-26849 Добавить проверку что вызывается метод в 
каком-то тесте
                 new LogStorageAccessImpl(replicaManager)
         );
 
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 280c7f736c9..5f12e28d7e5 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -108,7 +108,6 @@ import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
@@ -763,7 +762,6 @@ class ItTableRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
         assertThat(getFromNode(2, 1), is("one"));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26849";)
     @Test
     void testRestartNodeAfterTruncateRaftLogPrefixAndAbortRebalance() throws 
Exception {
         createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
index 9736c7a0416..77dd0133e81 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
@@ -261,6 +261,11 @@ public class PartitionMvStorageAccessImpl implements 
PartitionMvStorageAccess {
         lowWatermark.updateLowWatermark(newLowWatermark);
     }
 
+    @Override
+    public boolean isVolatile() {
+        return mvTableStorage.isVolatile();
+    }
+
     private MvPartitionStorage getMvPartitionStorage() {
         int partitionId = partitionId();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
index 1c2cd841ee5..83e7f2dc988 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
@@ -19,6 +19,8 @@ package 
org.apache.ignite.internal.table.distributed.raft.snapshot;
 
 import java.util.Objects;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -73,4 +75,9 @@ public class TablePartitionKey implements PartitionKey {
     public String toString() {
         return S.toString(TablePartitionKey.class, this);
     }
+
+    @Override
+    public ReplicationGroupId toReplicationGroupId() {
+        return new TablePartitionId(tableId, partitionId);
+    }
 }

Reply via email to