This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new db73d706343 IGNITE-27097 Improve recovery from interrupted Raft
snapshot install (#7207)
db73d706343 is described below
commit db73d7063439b130b15f320b5cdd38ceefa4bfe1
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Dec 11 17:15:29 2025 +0400
IGNITE-27097 Improve recovery from interrupted Raft snapshot install (#7207)
---
.../LocalBeforeReplicaStartEventParameters.java | 79 ++++++
.../PartitionReplicaLifecycleManager.java | 32 ++-
.../partition/replicator/ZoneResourcesManager.java | 4 +
.../raft/snapshot/PartitionSnapshotStorage.java | 37 ++-
.../incoming/IncomingSnapshotCopierTest.java | 4 +-
modules/raft/build.gradle | 2 +
...InterruptedRaftSnapshotStorageRecoveryTest.java | 274 +++++++++++++++++++++
.../java/org/apache/ignite/internal/Cluster.java | 8 +-
.../internal/table/distributed/TableManager.java | 155 ++++++------
9 files changed, 501 insertions(+), 94 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalBeforeReplicaStartEventParameters.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalBeforeReplicaStartEventParameters.java
new file mode 100644
index 00000000000..8fcbe8229b9
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalBeforeReplicaStartEventParameters.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+
+/**
+ * Parameters for the {@link
LocalPartitionReplicaEvent#BEFORE_REPLICA_STARTED} event.
+ *
+ * <p>Used as a container to pass information from event listeners to the
partition replica lifecycle manager about whether any storage
+ * is seen in the rebalance state.
+ */
+public class LocalBeforeReplicaStartEventParameters extends
LocalPartitionReplicaEventParameters {
+ private volatile boolean anyStorageIsInRebalanceState;
+
+ private final List<Supplier<CompletableFuture<Void>>> cleanupActions = new
CopyOnWriteArrayList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param zonePartitionId Zone partition id.
+ * @param revision Event's revision.
+ * @param onRecovery Flag indicating if this event was produced on node
recovery.
+ */
+ public LocalBeforeReplicaStartEventParameters(
+ ZonePartitionId zonePartitionId,
+ long revision,
+ boolean onRecovery,
+ boolean anyStorageIsInRebalanceState
+ ) {
+ super(zonePartitionId, revision, onRecovery);
+
+ this.anyStorageIsInRebalanceState = anyStorageIsInRebalanceState;
+ }
+
+ /** Returns whether at least one storage is in rebalance state. */
+ public boolean anyStorageIsInRebalanceState() {
+ return anyStorageIsInRebalanceState;
+ }
+
+ /** Registers that at least one storage is in rebalance state. */
+ public void registerStorageInRebalanceState() {
+ this.anyStorageIsInRebalanceState = true;
+ }
+
+ /**
+ * Adds a cleanup action to be executed after this event has been handled
by all event handlers, IF at least one storage is
+ * in the rebalance state.
+ *
+ * @param action Cleanup action to execute.
+ */
+ public void addCleanupAction(Supplier<CompletableFuture<Void>> action) {
+ cleanupActions.add(action);
+ }
+
+ /** Returns the registered cleanup actions. */
+ public List<Supplier<CompletableFuture<Void>>> cleanupActions() {
+ return List.copyOf(cleanupActions);
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 1617b1f9a78..396976ef41b 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -700,7 +700,6 @@ public class PartitionReplicaLifecycleManager extends
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
- var eventParams = new
LocalPartitionReplicaEventParameters(zonePartitionId, revision, onRecovery);
ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(
zonePartitionId,
@@ -708,9 +707,40 @@ public class PartitionReplicaLifecycleManager extends
storageIndexTracker
);
+ var eventParams = new LocalBeforeReplicaStartEventParameters(
+ zonePartitionId,
+ revision,
+ onRecovery,
+ zoneResources.txStatePartitionStorageIsInRebalanceState()
+ );
+
startedReplicationGroups.beforeStartingGroup(zonePartitionId);
return
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
+ .thenCompose(v -> {
+ if (eventParams.anyStorageIsInRebalanceState()) {
+ // We must destroy protocol storages first. If we
do so, then, as MV and TX state storages sync Raft log
+ // before being flushed, there is a guarantee
that, after a possible crash, we will either see some storage
+ // still in the rebalance state (and hence we'll
repeat the destruction on the next start), or the Raft log
+ // destruction will be persisted (and we'll just
recover normally).
+ try {
+
replicaMgr.destroyReplicationProtocolStorages(zonePartitionId, isVolatileZone);
+ } catch (NodeStoppingException e) {
+ return failedFuture(e);
+ }
+
+ CompletableFuture<Void> clearTxStateStorage =
zoneResources.txStatePartitionStorage().clear();
+
+ CompletableFuture<?>[] registeredCleanupFutures =
eventParams.cleanupActions().stream()
+ .map(Supplier::get)
+ .toArray(CompletableFuture[]::new);
+ CompletableFuture<Void> clearMvStorages =
allOf(registeredCleanupFutures);
+
+ return allOf(clearTxStateStorage, clearMvStorages);
+ } else {
+ return nullCompletedFuture();
+ }
+ })
.thenCompose(v -> {
try {
// TODO
https://issues.apache.org/jira/browse/IGNITE-24654 Properly close
storageIndexTracker.
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index a4e314c7913..51c8efe2cea 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -267,6 +267,10 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
return txStatePartitionStorage;
}
+ public boolean txStatePartitionStorageIsInRebalanceState() {
+ return txStatePartitionStorage.lastAppliedIndex() ==
TxStatePartitionStorage.REBALANCE_IN_PROGRESS;
+ }
+
public ZonePartitionRaftListener raftListener() {
return raftListener;
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index f1b390bacab..a28d4bcd2a3 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -48,9 +48,9 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.jetbrains.annotations.Nullable;
/**
- * Snapshot storage for a partition.
+ * Snapshot storage for a zone partition.
*
- * <p>In case of zone partitions manages all table storages of a zone
partition.
+ * <p>Manages all table storages of a zone partition.
*
* <p>Utilizes the fact that every partition already stores its latest applied
index and thus can itself be used as its own snapshot.
*
@@ -66,7 +66,7 @@ public class PartitionSnapshotStorage {
/** Default number of milliseconds that the follower is allowed to try to
catch up the required catalog version. */
private static final int DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS = 3000;
- private final PartitionKey partitionKey;
+ private final ZonePartitionKey partitionKey;
private final TopologyService topologyService;
@@ -102,7 +102,7 @@ public class PartitionSnapshotStorage {
/** Constructor. */
public PartitionSnapshotStorage(
- PartitionKey partitionKey,
+ ZonePartitionKey partitionKey,
TopologyService topologyService,
OutgoingSnapshotsManager outgoingSnapshotsManager,
PartitionTxStateAccess txState,
@@ -126,7 +126,7 @@ public class PartitionSnapshotStorage {
/** Constructor. */
public PartitionSnapshotStorage(
- PartitionKey partitionKey,
+ ZonePartitionKey partitionKey,
TopologyService topologyService,
OutgoingSnapshotsManager outgoingSnapshotsManager,
PartitionTxStateAccess txState,
@@ -302,12 +302,15 @@ public class PartitionSnapshotStorage {
for (PartitionMvStorageAccess partitionStorage :
partitionsByTableId.values()) {
long lastAppliedIndex = partitionStorage.lastAppliedIndex();
- assert lastAppliedIndex >= 0 :
- String.format("Partition storage [tableId=%d,
partitionId=%d] contains an unexpected applied index value: %d.",
- partitionStorage.tableId(),
- partitionStorage.partitionId(),
- lastAppliedIndex
- );
+ if (lastAppliedIndex < 0) {
+ throw new IllegalStateException(String.format(
+ "MV partition storage [tableId=%d, zoneId=%d,
partitionId=%d] contains an unexpected applied index value: %d.",
+ partitionStorage.tableId(),
+ partitionKey.zoneId(),
+ partitionStorage.partitionId(),
+ lastAppliedIndex
+ ));
+ }
if (lastAppliedIndex == 0) {
return null;
@@ -319,7 +322,17 @@ public class PartitionSnapshotStorage {
}
}
- if (txState.lastAppliedIndex() < minLastAppliedIndex) {
+ long txStateLastAppliedIndex = txState.lastAppliedIndex();
+ if (txStateLastAppliedIndex < 0) {
+ throw new IllegalStateException(
+ String.format("Tx state partition storage [key=%s]
contains an unexpected applied index value: %d.",
+ partitionKey,
+ txStateLastAppliedIndex
+ )
+ );
+ }
+
+ if (txStateLastAppliedIndex < minLastAppliedIndex) {
return startupSnapshotMetaFromTxStorage();
} else {
assert storageWithMinLastAppliedIndex != null;
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index d06514b039f..00aa96c7ddb 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -117,7 +117,6 @@ import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.FullStateTransferIndexChooser;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionMvStorageAccessImpl;
-import
org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.TransactionIds;
@@ -146,6 +145,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
/** For {@link IncomingSnapshotCopier} testing. */
@ExtendWith(MockitoExtension.class)
public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest {
+ private static final int ZONE_ID = 0;
private static final int TABLE_ID = 1;
private static final String NODE_NAME = "node";
@@ -396,7 +396,7 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
when(outgoingSnapshotsManager.messagingService()).thenReturn(messagingService);
var storage = new PartitionSnapshotStorage(
- new TablePartitionKey(TABLE_ID, PARTITION_ID),
+ new ZonePartitionKey(ZONE_ID, PARTITION_ID),
topologyService,
outgoingSnapshotsManager,
new
PartitionTxStateAccessImpl(incomingTxStateStorage.getPartitionStorage(PARTITION_ID)),
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index a315d2f74ce..de2a3a40451 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -113,6 +113,8 @@ dependencies {
integrationTestImplementation project(':ignite-low-watermark')
integrationTestImplementation project(':ignite-schema')
integrationTestImplementation project(':ignite-configuration-root')
+ integrationTestImplementation project(':ignite-page-memory')
+ integrationTestImplementation project(':ignite-transactions')
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.dropwizard.metrics
integrationTestImplementation libs.disruptor
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
new file mode 100644
index 00000000000..73d0472dafe
--- /dev/null
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItInterruptedRaftSnapshotStorageRecoveryTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.components.LogSyncer;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.engine.StorageEngine;
+import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
+import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbPartitionStorage;
+import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbStorage;
+import org.apache.ignite.internal.util.Constants;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.table.KeyValueView;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
+class ItInterruptedRaftSnapshotStorageRecoveryTest extends
ClusterPerTestIntegrationTest {
+ private static final String TABLE_NAME = "TEST_TABLE";
+ private static final String ZONE_NAME = "TEST_ZONE";
+
+ @InjectConfiguration("mock.profiles.default {engine = aipersist, sizeBytes
= " + Constants.GiB + "}")
+ private StorageConfiguration storageConfig;
+
+ @InjectExecutorService
+ private ExecutorService executor;
+
+ @InjectExecutorService
+ private ScheduledExecutorService scheduler;
+
+ @Test
+ void raftSnapshotWorksAfterNonFinishedRaftSnapshotInstallOnMvStorage() {
+
testRaftSnapshotWorksAfterNonFinishedRaftSnapshotInstall(this::simulateNonFinishedRaftSnapshotInstallInMvStorage);
+ }
+
+ @Test
+ void
raftSnapshotWorksAfterNonFinishedRaftSnapshotInstallOnTxStateStorage() {
+
testRaftSnapshotWorksAfterNonFinishedRaftSnapshotInstall(this::simulateNonFinishedRaftSnapshotInstallInTxStateStorage);
+ }
+
+ private void
testRaftSnapshotWorksAfterNonFinishedRaftSnapshotInstall(Consumer<Path>
executeOnStoppedNode2StoragePath) {
+ createTableWithOnePartitionAnd3Replicas(TABLE_NAME);
+
+ // Using explicit transaction to be sure that TxStateStorage gets
written.
+ putOneRowInExplicitTransaction();
+
+ waitForAllNodesToHave1RowInTable(TABLE_NAME);
+
+ // Truncate log prefix to force snapshot installation to node 2 when
its storages will be cleared on startup.
+ // This also causes flushes of both MV and TxState storages, so, after
we simulate non-finished rebalance in either MV or
+ // TX state storage and restart node 2, the corresponding storage data
will not be rewritten by reapplying the log.
+ truncateLogPrefixOnAllNodes(cluster.solePartitionId(ZONE_NAME,
TABLE_NAME));
+
+ Path node2PartitionsDbPath =
unwrapIgniteImpl(cluster.node(2)).partitionsWorkDir().dbPath();
+
+ cluster.stopNode(2);
+
+ executeOnStoppedNode2StoragePath.accept(node2PartitionsDbPath);
+
+ Ignite node2 = cluster.startNode(2);
+
+ waitForNodeToHave1RowInTable(TABLE_NAME, node2);
+ }
+
+ private void createTableWithOnePartitionAnd3Replicas(String tableName) {
+ cluster.aliveNode().sql().executeScript("CREATE ZONE " + ZONE_NAME + "
(PARTITIONS 1, REPLICAS 3) STORAGE PROFILES ['"
+ + CatalogService.DEFAULT_STORAGE_PROFILE + "'];"
+ + "CREATE TABLE " + tableName + " (ID INT PRIMARY KEY, VAL
VARCHAR) ZONE " + ZONE_NAME + ";");
+ }
+
+ private void putOneRowInExplicitTransaction() {
+ KeyValueView<Integer, String> kvView =
cluster.aliveNode().tables().table(TABLE_NAME).keyValueView(Integer.class,
String.class);
+ cluster.aliveNode().transactions().runInTransaction(tx -> {
+ kvView.put(tx, 1, "one");
+ });
+ }
+
+ private void waitForAllNodesToHave1RowInTable(String tableName) {
+ cluster.nodes().parallelStream().forEach(node ->
waitForNodeToHave1RowInTable(tableName, node));
+ }
+
+ private static void waitForNodeToHave1RowInTable(String tableName, Ignite
node) {
+ TableViewInternal table =
unwrapTableViewInternal(node.tables().table(tableName));
+
+ MvPartitionStorage partition =
table.internalTable().storage().getMvPartition(0);
+ assertThat(partition, is(notNullValue()));
+
+ await().until(partition::estimatedSize, is(1L));
+ }
+
+ private void truncateLogPrefixOnAllNodes(ReplicationGroupId
replicationGroupId) {
+ cluster.nodes().parallelStream().forEach(node ->
truncateLogPrefix(replicationGroupId, node));
+ }
+
+ private static void truncateLogPrefix(ReplicationGroupId
replicationGroupId, Ignite node) {
+ RaftGroupService raftGroupService =
Cluster.raftGroupService(unwrapIgniteImpl(node), replicationGroupId);
+ assertThat(raftGroupService, is(notNullValue()));
+
+ doSnapshotOn(raftGroupService);
+ }
+
+ private static void doSnapshotOn(RaftGroupService raftGroupService) {
+ var fut = new CompletableFuture<Status>();
+ raftGroupService.getRaftNode().snapshot(fut::complete, true);
+
+ assertThat(fut, willCompleteSuccessfully());
+ assertEquals(RaftError.SUCCESS, fut.join().getRaftError());
+ }
+
+ private StorageTableDescriptor storageTableDescriptor() {
+ TableViewInternal table =
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+ MvTableStorage mvTableStorage = table.internalTable().storage();
+ return mvTableStorage.getTableDescriptor();
+ }
+
+ private int zoneId() {
+ CatalogManager catalogManager =
unwrapIgniteImpl(cluster.aliveNode()).catalogManager();
+ Catalog catalog =
catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+ CatalogZoneDescriptor zone = catalog.zone(ZONE_NAME);
+ assertThat(zone, is(notNullValue()));
+
+ return zone.id();
+ }
+
+ private void simulateNonFinishedRaftSnapshotInstallInMvStorage(Path
storagePath) {
+ assertTrue(Files.exists(storagePath));
+ assertTrue(Files.isDirectory(storagePath));
+
+ var metricManager = new TestMetricManager();
+
+ StorageEngine engine = createPersistentPageMemoryEngine(storagePath,
metricManager);
+
+ engine.start();
+
+ try {
+ assertThat(metricManager.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ StorageTableDescriptor tableDescriptor = storageTableDescriptor();
+
+ MvTableStorage tableStorage =
engine.createMvTable(tableDescriptor, new StorageIndexDescriptorSupplier() {
+ @Override
+ public @Nullable StorageIndexDescriptor get(int indexId) {
+ return null;
+ }
+ });
+ assertThat(tableStorage.createMvPartition(0),
willCompleteSuccessfully());
+ assertThat(tableStorage.startRebalancePartition(0),
willCompleteSuccessfully());
+
+ // Flush to make sure it gets written to disk.
+ flushPartition0(tableStorage);
+ } finally {
+ engine.stop();
+ }
+ }
+
+ private StorageEngine createPersistentPageMemoryEngine(Path storagePath,
MetricManager metricManager) {
+ var ioRegistry = new PageIoRegistry();
+
+ ioRegistry.loadFromServiceLoader();
+
+ return new PersistentPageMemoryStorageEngine(
+ "test",
+ metricManager,
+ storageConfig,
+ null,
+ ioRegistry,
+ storagePath,
+ null,
+ mock(FailureManager.class),
+ mock(LogSyncer.class),
+ scheduler,
+ new HybridClockImpl()
+ );
+ }
+
+ private static void flushPartition0(MvTableStorage tableStorage) {
+ MvPartitionStorage partitionStorage = tableStorage.getMvPartition(0);
+ assertThat(partitionStorage, is(notNullValue()));
+ assertThat(partitionStorage.flush(), willCompleteSuccessfully());
+ }
+
+ private void simulateNonFinishedRaftSnapshotInstallInTxStateStorage(Path
storagePath) {
+ Path txStateStoragePath = storagePath.resolve("tx-state");
+ assertTrue(Files.exists(txStateStoragePath));
+ assertTrue(Files.isDirectory(txStateStoragePath));
+
+ var sharedStorage = new TxStateRocksDbSharedStorage(
+ "test",
+ txStateStoragePath,
+ scheduler,
+ executor,
+ mock(LogSyncer.class),
+ mock(FailureProcessor.class),
+ () -> 0
+ );
+ assertThat(sharedStorage.startAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ try {
+ var zoneTxStateStorage = new TxStateRocksDbStorage(zoneId(),
storageTableDescriptor().getPartitions(), sharedStorage);
+ zoneTxStateStorage.start();
+
+ TxStateRocksDbPartitionStorage txStatePartitionStorage =
zoneTxStateStorage.getOrCreatePartitionStorage(0);
+ assertThat(txStatePartitionStorage.startRebalance(),
willCompleteSuccessfully());
+
+ // Flush to make sure it gets written to disk.
+ sharedStorage.flush();
+ } finally {
+ assertThat(sharedStorage.stopAsync(), willCompleteSuccessfully());
+ }
+ }
+}
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 8225608e18b..1cd5d6cae92 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -620,7 +620,13 @@ public class Cluster {
.orElse(null);
}
- private static @Nullable RaftGroupService raftGroupService(IgniteImpl
igniteImpl, ReplicationGroupId groupId) {
+ /**
+ * Finds a Raft group service for the given group ID on the given node.
+ *
+ * @param igniteImpl Ignite instance.
+ * @param groupId Replication group ID.
+ */
+ public static @Nullable RaftGroupService raftGroupService(IgniteImpl
igniteImpl, ReplicationGroupId groupId) {
JraftServerImpl server = (JraftServerImpl)
igniteImpl.raftManager().server();
return server.localNodes().stream()
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 36d7895ff8b..b6c672be1f0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.anyOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.groupingBy;
@@ -122,6 +123,7 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
+import
org.apache.ignite.internal.partition.replicator.LocalBeforeReplicaStartEventParameters;
import
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
import org.apache.ignite.internal.partition.replicator.NaiveAsyncReadWriteLock;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
@@ -138,7 +140,6 @@ import
org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
import org.apache.ignite.internal.raft.service.RaftCommandRunner;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
@@ -184,6 +185,7 @@ import
org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
@@ -361,11 +363,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
/** Configuration of rebalance retries delay. */
private final SystemDistributedConfigurationPropertyHolder<Integer>
rebalanceRetryDelayConfiguration;
- private final EventListener<LocalPartitionReplicaEventParameters>
onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
+ private final EventListener<LocalBeforeReplicaStartEventParameters>
onBeforeZoneReplicaStartedListener = this::beforeZoneReplicaStarted;
private final EventListener<LocalPartitionReplicaEventParameters>
onZoneReplicaStoppedListener = this::onZoneReplicaStopped;
private final EventListener<LocalPartitionReplicaEventParameters>
onZoneReplicaDestroyedListener = this::onZoneReplicaDestroyed;
- private final EventListener<CreateTableEventParameters>
onTableCreateWithColocationListener = this::loadTableToZoneOnTableCreate;
+ private final EventListener<CreateTableEventParameters>
onTableCreateListener = this::loadTableToZoneOnTableCreate;
private final EventListener<DropTableEventParameters> onTableDropListener
= fromConsumer(this::onTableDrop);
private final EventListener<CatalogEventParameters> onTableAlterListener =
this::onTableAlter;
@@ -556,7 +558,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
cleanUpResourcesForDroppedTablesOnRecoveryBusy();
- catalogService.listen(CatalogEvent.TABLE_CREATE,
onTableCreateWithColocationListener);
+ catalogService.listen(CatalogEvent.TABLE_CREATE,
onTableCreateListener);
catalogService.listen(CatalogEvent.TABLE_DROP,
onTableDropListener);
catalogService.listen(CatalogEvent.TABLE_ALTER,
onTableAlterListener);
@@ -574,14 +576,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
});
}
- private CompletableFuture<Boolean>
beforeZoneReplicaStarted(LocalPartitionReplicaEventParameters parameters) {
+ private CompletableFuture<Boolean>
beforeZoneReplicaStarted(LocalBeforeReplicaStartEventParameters parameters) {
return inBusyLockAsync(busyLock, () -> readyToProcessReplicaStarts
.thenCompose(v -> beforeZoneReplicaStartedImpl(parameters))
.thenApply(unused -> false)
);
}
- private CompletableFuture<Void>
beforeZoneReplicaStartedImpl(LocalPartitionReplicaEventParameters parameters) {
+ private CompletableFuture<Void>
beforeZoneReplicaStartedImpl(LocalBeforeReplicaStartEventParameters parameters)
{
return inBusyLockAsync(busyLock, () -> {
ZonePartitionId zonePartitionId = parameters.zonePartitionId();
@@ -595,7 +597,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return readLockAcquisitionFuture.thenCompose(stamp -> {
Set<TableViewInternal> zoneTables =
zoneTablesRawSet(zonePartitionId.zoneId());
- return
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables,
parameters.onRecovery());
+ return
createPartitionsAndLoadResourcesToZoneReplica(zonePartitionId, zoneTables,
parameters);
}).whenComplete((unused, t) ->
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead));
} catch (Throwable t) {
readLockAcquisitionFuture.thenAccept(zoneLock::unlockRead);
@@ -608,42 +610,78 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private CompletableFuture<Void>
createPartitionsAndLoadResourcesToZoneReplica(
ZonePartitionId zonePartitionId,
Set<TableViewInternal> zoneTables,
- boolean onRecovery
+ LocalBeforeReplicaStartEventParameters event
) {
int partitionIndex = zonePartitionId.partitionId();
PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex);
- CompletableFuture<?>[] futures = zoneTables.stream()
+ List<CompletableFuture<?>> storageCreationFutures = zoneTables.stream()
.map(tbl -> inBusyLockAsync(busyLock, () -> {
- return getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
- .thenRunAsync(() -> inBusyLock(busyLock, () -> {
- localPartsByTableId.compute(
- tbl.tableId(),
- (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
- );
-
- lowWatermark.getLowWatermarkSafe(lwm ->
- registerIndexesToTable(
- tbl,
- catalogService,
- singlePartitionIdSet,
- tbl.schemaView(),
- lwm
- )
- );
-
-
preparePartitionResourcesAndLoadToZoneReplicaBusy(tbl, zonePartitionId,
onRecovery);
- }), ioExecutor)
+ return createPartitionStoragesIfAbsent(tbl,
singlePartitionIdSet)
// If the table is already closed, it's not a
problem (probably the node is stopping).
.exceptionally(ignoreTableClosedException());
}))
- .toArray(CompletableFuture[]::new);
+ .collect(toList());
+
+ return CompletableFutures.allOf(storageCreationFutures)
+ .thenRunAsync(() ->
scheduleMvPartitionsCleanupIfNeeded(zoneTables, partitionIndex, event),
ioExecutor)
+ // If a table is already closed, it's not a problem (probably
the node is stopping).
+ .exceptionally(ignoreTableClosedException())
+ .thenCompose(unused -> {
+ CompletableFuture<?>[] futures = zoneTables.stream()
+ .map(tbl -> inBusyLockAsync(busyLock, () -> {
+ return runAsync(() -> inBusyLock(busyLock, ()
-> {
+ localPartsByTableId.compute(
+ tbl.tableId(),
+ (tableId, oldPartitionSet) ->
extendPartitionSet(oldPartitionSet, partitionIndex)
+ );
+
+ lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(
+ tbl,
+ catalogService,
+ singlePartitionIdSet,
+ tbl.schemaView(),
+ lwm
+ )
+ );
+
+
preparePartitionResourcesAndLoadToZoneReplicaBusy(tbl, zonePartitionId,
event.onRecovery());
+ }), ioExecutor)
+ // If the table is already closed, it's not a
problem (probably the node is stopping).
+ .exceptionally(ignoreTableClosedException());
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ });
+ }
+
+ private static void scheduleMvPartitionsCleanupIfNeeded(
+ Set<TableViewInternal> zoneTables,
+ int partitionIndex,
+ LocalBeforeReplicaStartEventParameters event
+ ) {
+ boolean anyMvPartitionStorageIsInRebalanceState = zoneTables.stream()
+ .map(table ->
table.internalTable().storage().getMvPartition(partitionIndex))
+ .filter(Objects::nonNull)
+ .anyMatch(partitionStorage ->
partitionStorage.lastAppliedIndex() ==
MvPartitionStorage.REBALANCE_IN_PROGRESS);
+
+ if (anyMvPartitionStorageIsInRebalanceState) {
+ event.registerStorageInRebalanceState();
+ }
- return allOf(futures);
+ // Adding the cleanup action even if no MV partition storage is in
rebalance state as it might be that the TX state storage is.
+ event.addCleanupAction(() -> {
+ CompletableFuture<?>[] clearFutures = zoneTables.stream()
+ .map(table ->
table.internalTable().storage().clearPartition(partitionIndex))
+ .toArray(CompletableFuture[]::new);
+ return allOf(clearFutures);
+ });
}
- private static Function<Throwable, Void> ignoreTableClosedException() {
+ private static <T> Function<Throwable, T> ignoreTableClosedException() {
return ex -> {
if (hasCause(ex, TableClosedException.class)) {
return null;
@@ -799,7 +837,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
}
- return getOrCreatePartitionStorages(table,
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
+ return createPartitionStoragesIfAbsent(table,
parts).thenRun(() -> localPartsByTableId.put(tableId, parts));
}, ioExecutor))
// If the table is already closed, it's not a problem
(probably the node is stopping).
.exceptionally(ignoreTableClosedException())
@@ -1066,7 +1104,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
onLowWatermarkChangedListener);
- catalogService.removeListener(CatalogEvent.TABLE_CREATE,
onTableCreateWithColocationListener);
+ catalogService.removeListener(CatalogEvent.TABLE_CREATE,
onTableCreateListener);
catalogService.removeListener(CatalogEvent.TABLE_DROP,
onTableDropListener);
catalogService.removeListener(CatalogEvent.TABLE_ALTER,
onTableAlterListener);
}
@@ -1488,34 +1526,21 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
// TODO: https://issues.apache.org/jira/browse/IGNITE-19739 Create
storages only once.
- private CompletableFuture<Void>
getOrCreatePartitionStorages(TableViewInternal table, PartitionSet partitions) {
+ private CompletableFuture<Void>
createPartitionStoragesIfAbsent(TableViewInternal table, PartitionSet
partitions) {
InternalTable internalTable = table.internalTable();
- CompletableFuture<?>[] storageFuts =
partitions.stream().mapToObj(partitionId -> {
+ List<CompletableFuture<MvPartitionStorage>> storageFuts =
partitions.stream().mapToObj(partitionId -> {
MvPartitionStorage mvPartition;
try {
mvPartition =
internalTable.storage().getMvPartition(partitionId);
} catch (StorageClosedException e) {
- return failedFuture(new TableClosedException(table.tableId(),
e));
+ return CompletableFuture.<MvPartitionStorage>failedFuture(new
TableClosedException(table.tableId(), e));
}
- return (mvPartition != null ? completedFuture(mvPartition) :
internalTable.storage().createMvPartition(partitionId))
- .thenComposeAsync(mvPartitionStorage -> {
- if (mvPartitionStorage.lastAppliedIndex() ==
MvPartitionStorage.REBALANCE_IN_PROGRESS) {
- destroyReplicationProtocolStorages(
- new ZonePartitionId(table.zoneId(),
partitionId),
- table,
- false
- );
-
- return
internalTable.storage().clearPartition(partitionId);
- } else {
- return nullCompletedFuture();
- }
- }, ioExecutor);
- }).toArray(CompletableFuture[]::new);
+ return mvPartition != null ? completedFuture(mvPartition) :
internalTable.storage().createMvPartition(partitionId);
+ }).collect(toList());
- return allOf(storageFuts);
+ return CompletableFutures.allOf(storageFuts);
}
private CompletableFuture<Void>
stopAndDestroyTablePartitions(TableViewInternal table) {
@@ -1657,32 +1682,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return allOf(destroyFutures.toArray(new CompletableFuture[]{}));
}
- private void destroyReplicationProtocolStorages(
- ReplicationGroupId replicationGroupId,
- TableViewInternal table,
- boolean destroyDurably
- ) {
- var internalTbl = (InternalTableImpl) table.internalTable();
-
- destroyReplicationProtocolStorages(replicationGroupId,
internalTbl.storage().isVolatile(), destroyDurably);
- }
-
- private void destroyReplicationProtocolStorages(
- ReplicationGroupId replicationGroupId,
- boolean isVolatileStorage,
- boolean destroyDurably
- ) {
- try {
- if (destroyDurably) {
-
replicaMgr.destroyReplicationProtocolStoragesDurably(replicationGroupId,
isVolatileStorage);
- } else {
-
replicaMgr.destroyReplicationProtocolStorages(replicationGroupId,
isVolatileStorage);
- }
- } catch (NodeStoppingException e) {
- throw new IgniteInternalException(NODE_STOPPING_ERR, e);
- }
- }
-
private static void closePartitionTrackers(InternalTable internalTable,
int partitionId) {
closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));