This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f6eaf1254be IGNITE-26849 Fix node recovery after aborting raft
snapshot installation and raft log truncation (#7054)
f6eaf1254be is described below
commit f6eaf1254bebef6514ec976e6aa8e307eee801e4
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Nov 25 15:44:35 2025 +0300
IGNITE-26849 Fix node recovery after aborting raft snapshot installation
and raft log truncation (#7054)
---
.../ItZonePartitionRaftListenerRecoveryTest.java | 8 ++-
.../PartitionReplicaLifecycleManager.java | 3 +-
.../partition/replicator/ZoneResourcesManager.java | 11 +++-
.../{PartitionKey.java => LogStorageAccess.java} | 26 ++++++--
.../raft/snapshot/LogStorageAccessImpl.java | 42 ++++++++++++
.../replicator/raft/snapshot/PartitionKey.java | 4 ++
.../raft/snapshot/PartitionMvStorageAccess.java | 3 +
.../raft/snapshot/PartitionSnapshotStorage.java | 17 ++++-
.../replicator/raft/snapshot/ZonePartitionKey.java | 7 ++
.../snapshot/incoming/IncomingSnapshotCopier.java | 50 ++++++++++++--
.../incoming/ReplicationLogStorageKey.java | 76 ++++++++++++++++++++++
.../PartitionReplicaLifecycleManagerTest.java | 57 ++++++++--------
.../replicator/ZoneResourcesManagerTest.java | 5 +-
.../PartitionSnapshotStorageFactoryTest.java | 3 +-
.../snapshot/PartitionSnapshotStorageTest.java | 3 +-
.../incoming/IncomingSnapshotCopierTest.java | 11 +++-
.../outgoing/OutgoingSnapshotReaderTest.java | 4 +-
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 2 -
.../java/org/apache/ignite/internal/raft/Loza.java | 18 +++++
.../internal/raft/server/impl/JraftServerImpl.java | 16 +++++
.../raft/storage/impl/VolatileRaftMetaStorage.java | 5 ++
.../apache/ignite/raft/jraft/core/NodeImpl.java | 4 ++
.../ignite/raft/jraft/storage/RaftMetaStorage.java | 4 +-
.../jraft/storage/impl/LocalRaftMetaStorage.java | 12 +++-
.../ignite/internal/replicator/ReplicaManager.java | 12 ++++
.../internal/table/distributed/TableManager.java | 24 +++++--
.../snapshot/PartitionMvStorageAccessImpl.java | 5 ++
.../raft/snapshot/TablePartitionKey.java | 7 ++
28 files changed, 374 insertions(+), 65 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index bdcb3605a2c..df3fc9078ea 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
@@ -98,6 +99,7 @@ import
org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRowImpl;
@@ -236,7 +238,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
@InjectConfiguration SystemLocalConfiguration
systemLocalConfiguration,
@InjectExecutorService ScheduledExecutorService
scheduledExecutorService,
@Mock Catalog catalog,
- @Mock CatalogIndexDescriptor catalogIndexDescriptor
+ @Mock CatalogIndexDescriptor catalogIndexDescriptor,
+ @Mock ReplicaManager replicaManager
) {
when(catalogService.activeCatalog(anyLong())).thenReturn(catalog);
when(catalog.indexes(anyInt())).thenReturn(List.of(catalogIndexDescriptor));
@@ -311,7 +314,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
new
PartitionTxStateAccessImpl(txStateStorage.getOrCreatePartitionStorage(PARTITION_ID.partitionId())),
catalogService,
failureProcessor,
- executor
+ executor,
+ new LogStorageAccessImpl(replicaManager)
);
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index f3fee7e95cc..ac5d5cc02e8 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -334,7 +334,8 @@ public class PartitionReplicaLifecycleManager extends
topologyService,
catalogService,
failureProcessor,
- partitionOperationsExecutor
+ partitionOperationsExecutor,
+ replicaMgr
)
);
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index e2298bba0e0..a4e314c7913 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -30,10 +30,12 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tx.TxManager;
import
org.apache.ignite.internal.tx.storage.state.ThreadAssertingTxStateStorage;
@@ -66,6 +68,8 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
private final Executor partitionOperationsExecutor;
+ private final ReplicaManager replicaManager;
+
/** Map from zone IDs to their resource holders. */
private final Map<Integer, ZoneResources> resourcesByZoneId = new
ConcurrentHashMap<>();
@@ -78,7 +82,8 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
TopologyService topologyService,
CatalogService catalogService,
FailureProcessor failureProcessor,
- Executor partitionOperationsExecutor
+ Executor partitionOperationsExecutor,
+ ReplicaManager replicaManager
) {
this.sharedTxStateStorage = sharedTxStateStorage;
this.txManager = txManager;
@@ -87,6 +92,7 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
this.catalogService = catalogService;
this.failureProcessor = failureProcessor;
this.partitionOperationsExecutor = partitionOperationsExecutor;
+ this.replicaManager = replicaManager;
}
ZonePartitionResources allocateZonePartitionResources(
@@ -121,7 +127,8 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
new PartitionTxStateAccessImpl(txStatePartitionStorage),
catalogService,
failureProcessor,
- partitionOperationsExecutor
+ partitionOperationsExecutor,
+ new LogStorageAccessImpl(replicaManager)
);
var zonePartitionResources = new ZonePartitionResources(
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
similarity index 50%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
copy to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
index ae5c8aac731..e8e49c2d5ce 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccess.java
@@ -17,9 +17,25 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot;
-/**
- * Uniquely identifies a partition.
- */
-public interface PartitionKey {
- int partitionId();
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/** Small abstraction for accessing the replication log. */
+public interface LogStorageAccess {
+ /**
+ * Destroys the replication log.
+ *
+ * @param replicationGroupId Replication group ID.
+ * @param isVolatile Is storage volatile.
+ * @throws NodeStoppingException If the node is being stopped.
+ */
+ void destroy(ReplicationGroupId replicationGroupId, boolean isVolatile)
throws NodeStoppingException;
+
+ /**
+ * Creates a replication log meta storage.
+ *
+ * @param replicationGroupId Replication group ID.
+ * @throws NodeStoppingException If the node is being stopped.
+ */
+ void createMetaStorage(ReplicationGroupId replicationGroupId) throws
NodeStoppingException;
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
new file mode 100644
index 00000000000..c7ce4fcd1bc
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/LogStorageAccessImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot;
+
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.replicator.ReplicaManager;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/** {@link LogStorageAccess} implementation. */
+public class LogStorageAccessImpl implements LogStorageAccess {
+ private final ReplicaManager replicaManager;
+
+ /** Constructor. */
+ public LogStorageAccessImpl(ReplicaManager replicaManager) {
+ this.replicaManager = replicaManager;
+ }
+
+ @Override
+ public void destroy(ReplicationGroupId replicationGroupId, boolean
isVolatile) throws NodeStoppingException {
+ replicaManager.destroyReplicationProtocolStorages(replicationGroupId,
isVolatile);
+ }
+
+ @Override
+ public void createMetaStorage(ReplicationGroupId replicationGroupId)
throws NodeStoppingException {
+ replicaManager.createMetaStorage(replicationGroupId);
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
index ae5c8aac731..37afbbed07a 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionKey.java
@@ -17,9 +17,13 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
/**
* Uniquely identifies a partition.
*/
public interface PartitionKey {
int partitionId();
+
+ ReplicationGroupId toReplicationGroupId();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
index 83924f03fee..cae0a756feb 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionMvStorageAccess.java
@@ -200,4 +200,7 @@ public interface PartitionMvStorageAccess {
* @param newLowWatermark Candidate for update.
*/
void updateLowWatermark(HybridTimestamp newLowWatermark);
+
+ /** Returns {@code true} if this storage is volatile (i.e. stores its data
in memory), or {@code false} if it's persistent. */
+ boolean isVolatile();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index be57c369751..f1b390bacab 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -98,6 +98,8 @@ public class PartitionSnapshotStorage {
private final long waitForMetadataCatchupMs;
+ private final LogStorageAccess logStorage;
+
/** Constructor. */
public PartitionSnapshotStorage(
PartitionKey partitionKey,
@@ -106,7 +108,8 @@ public class PartitionSnapshotStorage {
PartitionTxStateAccess txState,
CatalogService catalogService,
FailureProcessor failureProcessor,
- Executor incomingSnapshotsExecutor
+ Executor incomingSnapshotsExecutor,
+ LogStorageAccess logStorage
) {
this(
partitionKey,
@@ -116,7 +119,8 @@ public class PartitionSnapshotStorage {
catalogService,
failureProcessor,
incomingSnapshotsExecutor,
- DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS
+ DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS,
+ logStorage
);
}
@@ -129,7 +133,8 @@ public class PartitionSnapshotStorage {
CatalogService catalogService,
FailureProcessor failureProcessor,
Executor incomingSnapshotsExecutor,
- long waitForMetadataCatchupMs
+ long waitForMetadataCatchupMs,
+ LogStorageAccess logStorage
) {
this.partitionKey = partitionKey;
this.topologyService = topologyService;
@@ -139,6 +144,7 @@ public class PartitionSnapshotStorage {
this.failureProcessor = failureProcessor;
this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
+ this.logStorage = logStorage;
}
public PartitionKey partitionKey() {
@@ -356,4 +362,9 @@ public class PartitionSnapshotStorage {
.oldLearnersList(configuration.oldLearners())
.build();
}
+
+ /** Returns the replication log storage. */
+ public LogStorageAccess logStorage() {
+ return logStorage;
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
index 43bfe476c0e..b85947e65fb 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/ZonePartitionKey.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot;
import java.util.Objects;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.tostring.S;
/**
@@ -72,4 +74,9 @@ public class ZonePartitionKey implements PartitionKey {
public String toString() {
return S.toString(ZonePartitionKey.class, this);
}
+
+ @Override
+ public ReplicationGroupId toReplicationGroupId() {
+ return new ZonePartitionId(zoneId, partitionId);
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 768cf64ead4..f41798a6807 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -20,11 +20,16 @@ package
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.anyOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -32,6 +37,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -45,6 +51,8 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.IgniteThrottledLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -61,6 +69,7 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
import
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
@@ -236,11 +245,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
} catch (CancellationException ignored) {
// Ignored.
} catch (ExecutionException e) {
- Throwable cause = e.getCause();
-
- if (!(cause instanceof CancellationException)) {
+ if (!hasCause(e, CancellationException.class,
NodeStoppingException.class)) {
// TODO https://issues.apache.org/jira/browse/IGNITE-26811
HandshakeException is thrown when node is stopping.
- if (!(cause instanceof HandshakeException)) {
+ if (!hasCause(e, HandshakeException.class)) {
partitionSnapshotStorage.failureProcessor().process(new FailureContext(e,
"Error when completing the copier"));
}
@@ -249,7 +256,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
// By analogy with LocalSnapshotCopier#join.
- throw new IllegalStateException(cause);
+ throw new IllegalStateException(e);
}
}
}
@@ -688,7 +695,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
return allOf(
aggregateFutureFromPartitions(PartitionMvStorageAccess::startRebalance,
snapshotContext),
partitionSnapshotStorage.txState().startRebalance()
- );
+ ).thenComposeAsync(unused ->
startRebalanceForReplicationLogStorages(snapshotContext), executor);
} finally {
busyLock.leaveBusy();
}
@@ -736,4 +743,35 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
this.partitionsByTableId = partitionsByTableId;
}
}
+
+ private CompletableFuture<Void>
startRebalanceForReplicationLogStorages(SnapshotContext snapshotContext) {
+ if (!busyLock.enterBusy()) {
+ return nullCompletedFuture();
+ }
+
+ try {
+ Set<ReplicationLogStorageKey> keys =
collectReplicationLogStorageKeys(snapshotContext);
+
+ return runAsync(() -> inBusyLockSafe(busyLock, () ->
keys.forEach(this::startRebalanceForReplicationLogStorage)), executor);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private Set<ReplicationLogStorageKey>
collectReplicationLogStorageKeys(SnapshotContext snapshotContext) {
+ return snapshotContext.partitionsByTableId.values().stream()
+ .map(partitionMvStorage ->
ReplicationLogStorageKey.create(partitionSnapshotStorage, partitionMvStorage))
+ .collect(toSet());
+ }
+
+ private void
startRebalanceForReplicationLogStorage(ReplicationLogStorageKey key) throws
IgniteInternalException {
+ try {
+ LogStorageAccess logStorage =
partitionSnapshotStorage.logStorage();
+
+ logStorage.destroy(key.replicationGroupId(), key.isVolatile());
+ logStorage.createMetaStorage(key.replicationGroupId());
+ } catch (NodeStoppingException e) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, e);
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/ReplicationLogStorageKey.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/ReplicationLogStorageKey.java
new file mode 100644
index 00000000000..7c05c8bfd6f
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/ReplicationLogStorageKey.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
+
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+
+/** Helper class for starting rebalancing for the replication log. */
+class ReplicationLogStorageKey {
+ @IgniteToStringInclude
+ private final ReplicationGroupId replicationGroupId;
+
+ private final boolean isVolatile;
+
+ private ReplicationLogStorageKey(ReplicationGroupId replicationGroupId,
boolean isVolatile) {
+ this.replicationGroupId = replicationGroupId;
+ this.isVolatile = isVolatile;
+ }
+
+ static ReplicationLogStorageKey create(PartitionSnapshotStorage
snapshotStorage, PartitionMvStorageAccess mvStorage) {
+ return new
ReplicationLogStorageKey(snapshotStorage.partitionKey().toReplicationGroupId(),
mvStorage.isVolatile());
+ }
+
+ ReplicationGroupId replicationGroupId() {
+ return replicationGroupId;
+ }
+
+ boolean isVolatile() {
+ return isVolatile;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReplicationLogStorageKey that = (ReplicationLogStorageKey) o;
+
+ return isVolatile == that.isVolatile &&
replicationGroupId.equals(that.replicationGroupId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = replicationGroupId.hashCode();
+ result = 31 * result + (isVolatile ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(ReplicationLogStorageKey.class, this);
+ }
+}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index d1cccbc6e64..1bade1611ac 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -202,34 +202,6 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
new PendingComparableValuesTracker<>(0L)
));
- zoneResourcesManager = spy(new ZoneResourcesManager(
- sharedTxStateStorage,
- txManager,
- outgoingSnapshotsManager,
- topologyService,
- catalogService,
- failureManager,
- executorService
- ) {
- @Override
- protected TxStateStorage createTxStateStorage(int zoneId, int
partitionCount) {
- TxStateStorage txStateStorage = new
TxStateRocksDbStorage(zoneId, partitionCount, sharedTxStateStorage) {
- @Override
- protected TxStateRocksDbPartitionStorage
createPartitionStorage(int partitionId) {
- return txStatePartitionStorage;
- }
- };
-
- if (ThreadAssertions.enabled()) {
- txStateStorage = new
ThreadAssertingTxStateStorage(txStateStorage);
- }
-
- txStateStorage.start();
-
- return txStateStorage;
- }
- });
-
when(raftManager.startRaftGroupNode(any(), any(), any(), any(),
any(RaftGroupOptions.class), any()))
.thenReturn(topologyAwareRaftGroupService);
@@ -264,6 +236,35 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
executorService
));
+ zoneResourcesManager = spy(new ZoneResourcesManager(
+ sharedTxStateStorage,
+ txManager,
+ outgoingSnapshotsManager,
+ topologyService,
+ catalogService,
+ failureManager,
+ executorService,
+ replicaManager
+ ) {
+ @Override
+ protected TxStateStorage createTxStateStorage(int zoneId, int
partitionCount) {
+ TxStateStorage txStateStorage = new
TxStateRocksDbStorage(zoneId, partitionCount, sharedTxStateStorage) {
+ @Override
+ protected TxStateRocksDbPartitionStorage
createPartitionStorage(int partitionId) {
+ return txStatePartitionStorage;
+ }
+ };
+
+ if (ThreadAssertions.enabled()) {
+ txStateStorage = new
ThreadAssertingTxStateStorage(txStateStorage);
+ }
+
+ txStateStorage.start();
+
+ return txStateStorage;
+ }
+ });
+
partitionReplicaLifecycleManager = new
PartitionReplicaLifecycleManager(
catalogManager,
replicaManager,
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
index 991dda265ef..6d2b0c50cf7 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -72,6 +73,7 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
@Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
@Mock TopologyService topologyService,
@Mock CatalogService catalogService,
+ @Mock ReplicaManager replicaManager,
@InjectExecutorService ScheduledExecutorService scheduler,
@InjectExecutorService ExecutorService executor
) {
@@ -92,7 +94,8 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
topologyService,
catalogService,
mock(FailureProcessor.class),
- executor
+ executor,
+ replicaManager
);
storageIndexTracker = new PendingComparableValuesTracker<>(0L);
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
index 4704cf829d1..6373adc63af 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -49,7 +49,8 @@ class PartitionSnapshotStorageFactoryTest extends
BaseIgniteAbstractTest {
mock(PartitionTxStateAccess.class),
mock(CatalogService.class),
mock(FailureProcessor.class),
- mock(Executor.class)
+ mock(Executor.class),
+ mock(LogStorageAccess.class)
);
@Mock
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
index 59219e1648d..014b76f1375 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
@@ -59,7 +59,8 @@ public class PartitionSnapshotStorageTest extends
BaseIgniteAbstractTest {
mock(PartitionTxStateAccess.class),
mock(CatalogService.class),
mock(FailureProcessor.class),
- mock(Executor.class)
+ mock(Executor.class),
+ mock(LogStorageAccess.class)
);
@Test
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index dfbdb45a3cf..3d69d271916 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -37,6 +37,7 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
@@ -86,6 +87,7 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
import
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
@@ -93,6 +95,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartiti
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
+import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -199,6 +202,8 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
private final TestLowWatermark lowWatermark = spy(new TestLowWatermark());
+ private final ReplicaManager replicaManager = mock(ReplicaManager.class);
+
@BeforeEach
void setUp(
@Mock Catalog catalog,
@@ -219,7 +224,7 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
}
@Test
- void test() {
+ void test() throws Exception {
fillOriginalStorages();
createTargetStorages();
@@ -269,6 +274,7 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
verify(incomingMvTableStorage).startRebalancePartition(PARTITION_ID);
verify(incomingTxStatePartitionStorage).startRebalance();
+ verify(replicaManager).destroyReplicationProtocolStorages(any(),
anyBoolean());
var expSnapshotInfo = new PartitionSnapshotInfo(
expLastAppliedIndex,
@@ -396,7 +402,8 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
catalogService,
mock(FailureProcessor.class),
executorService,
- 0
+ 0,
+ new LogStorageAccessImpl(replicaManager)
);
storage.addMvPartition(TABLE_ID, spy(new PartitionMvStorageAccessImpl(
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index 47af48813a7..b7283ad58e2 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
@@ -84,7 +85,8 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
txStateAccess,
catalogService,
mock(FailureProcessor.class),
- mock(Executor.class)
+ mock(Executor.class),
+ mock(LogStorageAccess.class)
);
snapshotStorage.addMvPartition(TABLE_ID_1, partitionAccess1);
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 280c7f736c9..5f12e28d7e5 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -108,7 +108,6 @@ import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
@@ -763,7 +762,6 @@ class ItTableRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
assertThat(getFromNode(2, 1), is("one"));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26849")
@Test
void testRestartNodeAfterTruncateRaftLogPrefixAndAbortRebalance() throws
Exception {
createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 18515206f9f..36f7341b127 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -482,6 +482,24 @@ public class Loza implements RaftManager {
}
}
+ /**
+ * Creates replication log meta storage for the given group ID.
+ *
+ * @param nodeId ID of the Raft node.
+ * @throws NodeStoppingException If the node is being stopped.
+ */
+ public void createMetaStorage(RaftNodeId nodeId) throws
NodeStoppingException {
+ if (!busyLock.enterBusy()) {
+ throw new NodeStoppingException();
+ }
+
+ try {
+ raftServer.createMetaStorage(nodeId);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/**
* Returns Raft node IDs for which any storage (log storage or Raft meta
storage) is present on disk.
*/
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index f4e375a883b..9a98ab4c0d2 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -92,6 +92,7 @@ import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.core.FSMCallerImpl.ApplyTask;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl.LogEntryAndClosure;
import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl.ReadIndexEvent;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
@@ -613,6 +614,21 @@ public class JraftServerImpl implements RaftServer {
destroyRaftNodeStoragesInternal(nodeId, groupOptions, true);
}
+ /**
+ * Creates replication log meta storage for the given group ID.
+ *
+ * @param nodeId ID of the Raft node.
+ */
+ public void createMetaStorage(RaftNodeId nodeId) {
+ RaftGroupService raftGroupService = nodes.get(nodeId);
+
+ if (raftGroupService == null) {
+ return;
+ }
+
+ ((NodeImpl)
raftGroupService.getRaftNode()).metaStorage().createAfterDestroy();
+ }
+
private void destroyRaftNodeStoragesInternal(RaftNodeId nodeId,
RaftGroupOptions groupOptions, boolean durable) {
StorageDestructionIntent intent =
groupStoragesContextResolver.getIntent(nodeId, groupOptions.volatileStores());
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
index aed8e8108d6..38657691a26 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileRaftMetaStorage.java
@@ -72,4 +72,9 @@ public class VolatileRaftMetaStorage implements
RaftMetaStorage, VolatileStorage
return true;
}
+
+ @Override
+ public void createAfterDestroy() {
+ // no-op
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 9be02c8b265..4e6e0bacf88 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -4355,4 +4355,8 @@ public class NodeImpl implements Node, RaftServerService {
public FSMCaller fsmCaller() {
return fsmCaller;
}
+
+ public RaftMetaStorage metaStorage() {
+ return metaStorage;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
index 94244126edf..cc3fd625d4f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/RaftMetaStorage.java
@@ -24,7 +24,6 @@ import
org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions;
* Raft metadata storage service.
*/
public interface RaftMetaStorage extends Lifecycle<RaftMetaStorageOptions>,
Storage {
-
/**
* Set current term.
*/
@@ -49,4 +48,7 @@ public interface RaftMetaStorage extends
Lifecycle<RaftMetaStorageOptions>, Stor
* Set term and voted for information.
*/
boolean setTermAndVotedFor(final long term, final PeerId peerId);
+
+ /** Creates a meta storage after it is destroyed. */
+ void createAfterDestroy();
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
index a539df16344..e2a58e75cd3 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalRaftMetaStorage.java
@@ -38,7 +38,6 @@ import org.apache.ignite.raft.jraft.util.Utils;
* Raft meta storage,it's not thread-safe.
*/
public class LocalRaftMetaStorage implements RaftMetaStorage {
-
private static final IgniteLogger LOG =
Loggers.forClass(LocalRaftMetaStorage.class);
private static final String RAFT_META = "raft_meta";
@@ -95,7 +94,7 @@ public class LocalRaftMetaStorage implements RaftMetaStorage {
return true;
}
catch (final IOException e) {
- LOG.error("Fail to load raft meta storage [node={}].",
this.node.getNodeId(), e);
+ LOG.error("Fail to load raft meta storage [node={}].", e,
this.node.getNodeId());
return false;
}
}
@@ -119,7 +118,7 @@ public class LocalRaftMetaStorage implements
RaftMetaStorage {
return true;
}
catch (final Exception e) {
- LOG.error("Fail to save raft meta [node={}].",
this.node.getNodeId(), e);
+ LOG.error("Fail to save raft meta [node={}].", e,
this.node.getNodeId());
reportIOError();
return false;
}
@@ -187,6 +186,13 @@ public class LocalRaftMetaStorage implements
RaftMetaStorage {
return save();
}
+ @Override
+ public void createAfterDestroy() {
+ if (!Utils.mkdir(new File(path))) {
+ LOG.error("Fail to mkdir [node={}, path={}].",
this.node.getNodeId(), this.path);
+ }
+ }
+
@Override
public String toString() {
return "RaftMetaStorageImpl [path=" + this.path + ", term=" +
this.term + ", votedFor=" + this.votedFor + "]";
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index ec8b2bbaeb8..340a43deef0 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -1380,6 +1380,18 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
((Loza) raftManager).destroyRaftNodeStoragesDurably(raftNodeId,
groupOptions);
}
+ /**
+ * Creates replication log meta storage for the given group ID.
+ *
+ * @param replicaGrpId Replication group ID.
+ * @throws NodeStoppingException If the node is being stopped.
+ */
+ public void createMetaStorage(ReplicationGroupId replicaGrpId) throws
NodeStoppingException {
+ var raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
+
+ ((Loza) raftManager).createMetaStorage(raftNodeId);
+ }
+
/**
* Returns IDs of all partitions of tables for which any storage of
replication protocol is present on disk.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 03e9855cf4d..6a15c5872f0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -162,6 +162,7 @@ import
org.apache.ignite.internal.partition.replicator.ReliableCatalogVersions;
import org.apache.ignite.internal.partition.replicator.ReplicaTableProcessor;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.ChangePeersAndLearnersAsyncReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
@@ -2681,7 +2682,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
txStateAccess,
catalogService,
failureProcessor,
- incomingSnapshotsExecutor
+ incomingSnapshotsExecutor,
+ new LogStorageAccessImpl(replicaMgr)
);
snapshotStorage.addMvPartition(internalTable.tableId(),
partitionAccess);
@@ -2800,6 +2802,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
|| !nodeProperties.colocationEnabled()
&&
txStatePartitionStorage.lastAppliedIndex() ==
TxStatePartitionStorage.REBALANCE_IN_PROGRESS) {
if (nodeProperties.colocationEnabled()) {
+ destroyReplicationProtocolStorages(
+ new ZonePartitionId(table.zoneId(),
partitionId),
+ table,
+ false
+ );
+
return
internalTable.storage().clearPartition(partitionId);
} else {
return allOf(
@@ -3141,22 +3149,26 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
}
- private void destroyReplicationProtocolStorages(TablePartitionId
tablePartitionId, TableViewInternal table, boolean destroyDurably) {
+ private void destroyReplicationProtocolStorages(
+ ReplicationGroupId replicationGroupId,
+ TableViewInternal table,
+ boolean destroyDurably
+ ) {
var internalTbl = (InternalTableImpl) table.internalTable();
- destroyReplicationProtocolStorages(tablePartitionId,
internalTbl.storage().isVolatile(), destroyDurably);
+ destroyReplicationProtocolStorages(replicationGroupId,
internalTbl.storage().isVolatile(), destroyDurably);
}
private void destroyReplicationProtocolStorages(
- TablePartitionId tablePartitionId,
+ ReplicationGroupId replicationGroupId,
boolean isVolatileStorage,
boolean destroyDurably
) {
try {
if (destroyDurably) {
-
replicaMgr.destroyReplicationProtocolStoragesDurably(tablePartitionId,
isVolatileStorage);
+
replicaMgr.destroyReplicationProtocolStoragesDurably(replicationGroupId,
isVolatileStorage);
} else {
-
replicaMgr.destroyReplicationProtocolStorages(tablePartitionId,
isVolatileStorage);
+
replicaMgr.destroyReplicationProtocolStorages(replicationGroupId,
isVolatileStorage);
}
} catch (NodeStoppingException e) {
throw new IgniteInternalException(NODE_STOPPING_ERR, e);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
index 9736c7a0416..77dd0133e81 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionMvStorageAccessImpl.java
@@ -261,6 +261,11 @@ public class PartitionMvStorageAccessImpl implements
PartitionMvStorageAccess {
lowWatermark.updateLowWatermark(newLowWatermark);
}
+ @Override
+ public boolean isVolatile() {
+ return mvTableStorage.isVolatile();
+ }
+
private MvPartitionStorage getMvPartitionStorage() {
int partitionId = partitionId();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
index 1c2cd841ee5..83e7f2dc988 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/TablePartitionKey.java
@@ -19,6 +19,8 @@ package
org.apache.ignite.internal.table.distributed.raft.snapshot;
import java.util.Objects;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tostring.S;
/**
@@ -73,4 +75,9 @@ public class TablePartitionKey implements PartitionKey {
public String toString() {
return S.toString(TablePartitionKey.class, this);
}
+
+ @Override
+ public ReplicationGroupId toReplicationGroupId() {
+ return new TablePartitionId(tableId, partitionId);
+ }
}