This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26849 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 5bb65e76f85771082591d6c5e4390719c8d41595 Author: Kirill Tkalenko <[email protected]> AuthorDate: Mon Nov 24 18:28:32 2025 +0300 IGNITE-26849 wip --- .../replicator/raft/snapshot/LogStorageAccess.java | 13 +++- .../raft/snapshot/LogStorageAccessImpl.java | 8 +-- .../replicator/raft/snapshot/PartitionKey.java | 4 ++ .../raft/snapshot/PartitionMvStorageAccess.java | 3 + .../replicator/raft/snapshot/ZonePartitionKey.java | 7 ++ .../incoming/DestroyReplicationLogStorageKey.java | 76 ++++++++++++++++++++++ .../snapshot/incoming/IncomingSnapshotCopier.java | 41 +++++++++--- .../incoming/IncomingSnapshotCopierTest.java | 5 +- .../raftsnapshot/ItTableRaftSnapshotsTest.java | 2 - .../snapshot/PartitionMvStorageAccessImpl.java | 5 ++ .../raft/snapshot/TablePartitionKey.java | 7 ++ 11 files changed, 152 insertions(+), 19 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java index 1d05876a491..e3b78cd0e89 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java @@ -17,7 +17,16 @@ package org.apache.ignite.internal.partition.replicator.raft.snapshot; -/** No doc yet. */ -// TODO: IGNITE-26849 Добаить документацию и методы +import org.apache.ignite.internal.replicator.ReplicationGroupId; + +/** Small abstraction for accessing the replication log. */ public interface LogStorageAccess { + /** + * Destroys the replication log. + * + * @param replicationGroupId Replication group ID. + * @param isVolatile Is storage volatile. + * @throws Exception If there was an error during destruction. + */ + void destroy(ReplicationGroupId replicationGroupId, boolean isVolatile) throws Exception; } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java index 81307c3f74b..6161094d684 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java @@ -20,8 +20,7 @@ package org.apache.ignite.internal.partition.replicator.raft.snapshot; import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicationGroupId; -/** No doc yet. */ -// TODO: IGNITE-26849 Добаить документацию и методы +/** {@link LogStorageAccess} implementation. */ public class LogStorageAccessImpl implements LogStorageAccess { private final ReplicaManager replicaManager; @@ -30,7 +29,8 @@ public class LogStorageAccessImpl implements LogStorageAccess { this.replicaManager = replicaManager; } - public void destroy(ReplicationGroupId replicationGroupId) throws Exception { - replicaManager.destroyReplicationProtocolStorages(replicationGroupId, false); + @Override + public void destroy(ReplicationGroupId replicationGroupId, boolean isVolatile) throws Exception { + replicaManager.destroyReplicationProtocolStorages(replicationGroupId, isVolatile); } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java index ae5c8aac731..37afbbed07a 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java @@ -17,9 +17,13 @@ package org.apache.ignite.internal.partition.replicator.raft.snapshot; +import org.apache.ignite.internal.replicator.ReplicationGroupId; + /** * Uniquely identifies a partition. */ public interface PartitionKey { int partitionId(); + + ReplicationGroupId toReplicationGroupId(); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java index 83924f03fee..cae0a756feb 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java @@ -200,4 +200,7 @@ public interface PartitionMvStorageAccess { * @param newLowWatermark Candidate for update. */ void updateLowWatermark(HybridTimestamp newLowWatermark); + + /** Returns {@code true} if this storage is volatile (i.e. stores its data in memory), or {@code false} if it's persistent. */ + boolean isVolatile(); } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java index 43bfe476c0e..b85947e65fb 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.partition.replicator.raft.snapshot; import java.util.Objects; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.tostring.S; /** @@ -72,4 +74,9 @@ public class ZonePartitionKey implements PartitionKey { public String toString() { return S.toString(ZonePartitionKey.class, this); } + + @Override + public ReplicationGroupId toReplicationGroupId() { + return new ZonePartitionId(zoneId, partitionId); + } } diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/DestroyReplicationLogStorageKey.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/DestroyReplicationLogStorageKey.java new file mode 100644 index 00000000000..93da54b01f9 --- /dev/null +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/DestroyReplicationLogStorageKey.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming; + +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; + +/** Helper class for destroying the replication log. */ +class DestroyReplicationLogStorageKey { + @IgniteToStringInclude + private final ReplicationGroupId replicationGroupId; + + private final boolean isVolatile; + + private DestroyReplicationLogStorageKey(ReplicationGroupId replicationGroupId, boolean isVolatile) { + this.replicationGroupId = replicationGroupId; + this.isVolatile = isVolatile; + } + + static DestroyReplicationLogStorageKey create(PartitionSnapshotStorage snapshotStorage, PartitionMvStorageAccess mvStorage) { + return new DestroyReplicationLogStorageKey(snapshotStorage.partitionKey().toReplicationGroupId(), mvStorage.isVolatile()); + } + + ReplicationGroupId replicationGroupId() { + return replicationGroupId; + } + + boolean isVolatile() { + return isVolatile; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + DestroyReplicationLogStorageKey that = (DestroyReplicationLogStorageKey) o; + + return isVolatile == that.isVolatile && replicationGroupId.equals(that.replicationGroupId); + } + + @Override + public int hashCode() { + int result = replicationGroupId.hashCode(); + result = 31 * result + (isVolatile ? 1 : 0); + return result; + } + + @Override + public String toString() { + return S.toString(DestroyReplicationLogStorageKey.class, this); + } +} diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java index f423452b75e..2baa7569fc2 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java @@ -22,10 +22,13 @@ import static java.util.concurrent.CompletableFuture.anyOf; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe; +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; @@ -33,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -46,6 +50,7 @@ import java.util.stream.Stream; import org.apache.ignite.internal.catalog.Catalog; import org.apache.ignite.internal.failure.FailureContext; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.IgniteThrottledLogger; import org.apache.ignite.internal.logger.Loggers; @@ -61,16 +66,11 @@ import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa import org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage; import org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo; import org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage; import org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri; -import org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey; import org.apache.ignite.internal.raft.RaftGroupConfiguration; import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer; -import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.ReadResult; @@ -690,7 +690,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier { return allOf( aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance, snapshotContext), partitionSnapshotStorage.txState().startRebalance() - ); + ).thenComposeAsync(unused -> destroyReplicationLogStorages(snapshotContext), executor); } finally { busyLock.leaveBusy(); } @@ -739,8 +739,31 @@ public class IncomingSnapshotCopier extends SnapshotCopier { } } - // TODO: IGNITE-26849 Реализовать и по нормальному - private CompletableFuture<Void> destroyReplicationLogStorage(SnapshotContext snapshotContext) { - return nullCompletedFuture(); + private CompletableFuture<Void> destroyReplicationLogStorages(SnapshotContext snapshotContext) { + if (!busyLock.enterBusy()) { + return nullCompletedFuture(); + } + + try { + Set<DestroyReplicationLogStorageKey> keys = collectDestroyReplicationLogStorageKeys(snapshotContext); + + return runAsync(() -> inBusyLockSafe(busyLock, () -> keys.forEach(this::destroyReplicationLogStorage)), executor); + } finally { + busyLock.leaveBusy(); + } + } + + private Set<DestroyReplicationLogStorageKey> collectDestroyReplicationLogStorageKeys(SnapshotContext snapshotContext) { + return snapshotContext.partitionsByTableId.values().stream() + .map(partitionMvStorage -> DestroyReplicationLogStorageKey.create(partitionSnapshotStorage, partitionMvStorage)) + .collect(toSet()); + } + + private void destroyReplicationLogStorage(DestroyReplicationLogStorageKey key) { + try { + partitionSnapshotStorage.logStorage().destroy(key.replicationGroupId(), key.isVolatile()); + } catch (Exception e) { + throw new IgniteInternalException(INTERNAL_ERR, "Failed to destroy replication log storage: {}", e, key); + } } } diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java index 27b32b04f38..44e63b64024 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java @@ -37,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -223,7 +224,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { } @Test - void test() { + void test() throws Exception { fillOriginalStorages(); createTargetStorages(); @@ -273,6 +274,7 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { verify(incomingMvTableStorage).startRebalancePartition(PARTITION_ID); verify(incomingTxStatePartitionStorage).startRebalance(); + verify(replicaManager).destroyReplicationProtocolStorages(any(), anyBoolean()); var expSnapshotInfo = new PartitionSnapshotInfo( expLastAppliedIndex, @@ -401,7 +403,6 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { mock(FailureProcessor.class), executorService, 0, - // TODO: IGNITE-26849 Добавить проверку что вызывается метод в каком-то тесте new LogStorageAccessImpl(replicaManager) ); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java index 280c7f736c9..5f12e28d7e5 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java @@ -108,7 +108,6 @@ import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; @@ -763,7 +762,6 @@ class ItTableRaftSnapshotsTest extends ClusterPerTestIntegrationTest { assertThat(getFromNode(2, 1), is("one")); } - @Disabled("https://issues.apache.org/jira/browse/IGNITE-26849") @Test void testRestartNodeAfterTruncateRaftLogPrefixAndAbortRebalance() throws Exception { createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java index 9736c7a0416..77dd0133e81 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java @@ -261,6 +261,11 @@ public class PartitionMvStorageAccessImpl implements PartitionMvStorageAccess { lowWatermark.updateLowWatermark(newLowWatermark); } + @Override + public boolean isVolatile() { + return mvTableStorage.isVolatile(); + } + private MvPartitionStorage getMvPartitionStorage() { int partitionId = partitionId(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java index 1c2cd841ee5..83e7f2dc988 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.table.distributed.raft.snapshot; import java.util.Objects; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tostring.S; /** @@ -73,4 +75,9 @@ public class TablePartitionKey implements PartitionKey { public String toString() { return S.toString(TablePartitionKey.class, this); } + + @Override + public ReplicationGroupId toReplicationGroupId() { + return new TablePartitionId(tableId, partitionId); + } }
