This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4406ebf825 IGNITE-20256 Refuse to install Raft snapshots on partitions
when not enough schemas are available (#2473)
4406ebf825 is described below
commit 4406ebf825083e21ebd0a20d2fafed738acdff55
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Sep 4 18:57:10 2023 +0400
IGNITE-20256 Refuse to install Raft snapshots on partitions when not enough
schemas are available (#2473)
---
.../internal/catalog/CatalogManagerSelfTest.java | 2 +
.../internal/testframework/IgniteTestUtils.java | 4 +-
.../ignite/raft/jraft/entity/RaftOutter.java | 3 +
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 202 ++++++++++++++--
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 2 +-
.../internal/table/distributed/TableManager.java | 1 +
.../raft/snapshot/PartitionSnapshotStorage.java | 13 ++
.../snapshot/PartitionSnapshotStorageFactory.java | 13 +-
.../snapshot/incoming/IncomingSnapshotCopier.java | 126 ++++++++--
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 8 +-
.../snapshot/outgoing/OutgoingSnapshotReader.java | 2 +-
.../raft/snapshot/outgoing/SnapshotMetaUtils.java | 6 +-
.../schema/CatalogVersionSufficiency.java | 2 +-
.../PartitionSnapshotStorageFactoryTest.java | 3 +
.../snapshot/PartitionSnapshotStorageTest.java | 2 +
.../incoming/IncomingSnapshotCopierTest.java | 254 ++++++++++++++-------
.../outgoing/OutgoingSnapshotCommonTest.java | 11 +-
.../OutgoingSnapshotMvDataStreamingTest.java | 6 +-
.../outgoing/OutgoingSnapshotReaderTest.java | 2 +
.../OutgoingSnapshotTxDataStreamingTest.java | 6 +-
.../outgoing/OutgoingSnapshotsManagerTest.java | 6 +-
.../snapshot/outgoing/SnapshotMetaUtilsTest.java | 5 +-
22 files changed, 536 insertions(+), 143 deletions(-)
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 21f92469f9..23239c547f 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -173,6 +173,8 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
assertEquals(INFINITE_TIMER_VALUE,
zone.dataNodesAutoAdjustScaleDown());
assertEquals(DEFAULT_STORAGE_ENGINE, zone.dataStorage().engine());
assertEquals(DEFAULT_DATA_REGION, zone.dataStorage().dataRegion());
+
+ assertThat(manager.latestCatalogVersion(), is(0));
}
@Test
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index a8c22d1eeb..c21b6e70e9 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -190,9 +190,9 @@ public final class IgniteTestUtils {
* @param fieldName name of the field
* @return field value
*/
- public static Object getFieldValue(@Nullable Object target, Class<?>
declaredClass, String fieldName) {
+ public static <T> T getFieldValue(@Nullable Object target, Class<?>
declaredClass, String fieldName) {
try {
- return getField(target, declaredClass, fieldName).get(target);
+ return (T) getField(target, declaredClass, fieldName).get(target);
} catch (IllegalAccessException e) {
throw new IgniteInternalException("Cannot get field value", e);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
index 8e36f3ec75..935b4fbda1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
@@ -73,5 +73,8 @@ public final class RaftOutter {
@Nullable
Collection<String> oldLearnersList();
+
+ /** Minimum catalog version that is required for the snapshot to be
accepted by a follower. */
+ int requiredCatalogVersion();
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 88a2ec05e5..37299f5675 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.raftsnapshot;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -33,9 +35,11 @@ import java.net.ConnectException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,15 +55,21 @@ import
org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.ReplicationGroupsUtils;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
@@ -71,14 +81,27 @@ import
org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.Replicator;
+import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.ActionRequest;
+import org.apache.ignite.raft.jraft.rpc.Message;
+import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
+import org.apache.ignite.raft.jraft.rpc.RaftServerService;
+import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
+import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
+import org.apache.ignite.raft.jraft.rpc.RpcServer;
+import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import
org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -126,23 +149,21 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
private LogInspector replicatorLogInspector;
- private @Nullable Handler replicaLoggerHandler;
+ private LogInspector copierLogInspector;
@BeforeEach
void createCluster(TestInfo testInfo) {
cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
replicatorLogInspector = LogInspector.create(Replicator.class, true);
+ copierLogInspector = LogInspector.create(IncomingSnapshotCopier.class,
true);
}
@AfterEach
@Timeout(60)
void shutdownCluster() {
- if (replicaLoggerHandler != null) {
- replicatorLogInspector.removeHandler(replicaLoggerHandler);
- }
-
replicatorLogInspector.stop();
+ copierLogInspector.stop();
cluster.shutdown();
}
@@ -426,21 +447,23 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
* on it for the sole table partition in the cluster.
*/
private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex)
throws InterruptedException {
+ CountDownLatch snapshotInstalledLatch =
snapshotInstalledLatch(nodeIndex);
+
+ reanimateNode(nodeIndex);
+
+ assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did
not install a snapshot in time");
+ }
+
+ private CountDownLatch snapshotInstalledLatch(int nodeIndex) {
CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
- Handler handler = replicatorLogInspector.addHandler(
+ replicatorLogInspector.addHandler(
evt -> evt.getMessage().getFormattedMessage().matches(
- "Node .+ received InstallSnapshotResponse from .+_" +
nodeIndex + " .+ success=true"),
- () -> snapshotInstalledLatch.countDown()
+ "Node \\S+ received InstallSnapshotResponse from
\\S+_" + nodeIndex + " .+ success=true"),
+ snapshotInstalledLatch::countDown
);
- try {
- reanimateNode(nodeIndex);
-
- assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS),
"Did not install a snapshot in time");
- } finally {
- replicatorLogInspector.removeHandler(handler);
- }
+ return snapshotInstalledLatch;
}
private void reanimateNode(int nodeIndex) {
@@ -724,11 +747,12 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
int nodeIndexTo,
CompletableFuture<Void> snapshotInstallSuccessfullyFuture
) {
- String regexp = "Node .+" + nodeIndexFrom + " received
InstallSnapshotResponse from .+_" + nodeIndexTo + " .+ success=true";
+ String regexp = "Node \\S+" + nodeIndexFrom + " received
InstallSnapshotResponse from \\S+_" + nodeIndexTo + " .+ success=true";
- replicaLoggerHandler = replicatorLogInspector.addHandler(
+ replicatorLogInspector.addHandler(
evt -> evt.getMessage().getFormattedMessage().matches(regexp),
- () -> snapshotInstallSuccessfullyFuture.complete(null));
+ () -> snapshotInstallSuccessfullyFuture.complete(null)
+ );
}
/**
@@ -782,6 +806,115 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
}
+ /**
+ * The replication mechanism must not replicate commands for which schemas
are not yet available on the node
+ * to which replication happens (in Raft, it means that followers/learners
cannot receive commands that they
+ * cannot execute without waiting for schemas). This method tests that
snapshots bringing such commands are
+ * rejected, and that, when metadata catches up, the snapshot gets
successfully installed.
+ */
+ @Test
+ void laggingSchemasOnFollowerPreventSnapshotInstallation() throws
Exception {
+ cluster.startAndInit(3);
+
+ createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+ // Prepare the scene: force node 0 to be a leader, and node 2 to be a
follower.
+ final int leaderIndex = 0;
+ final int followerIndex = 2;
+
+ transferLeadershipOnSolePartitionTo(leaderIndex);
+ cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
+
+ // Block AppendEntries from being accepted on the follower so that the
leader will have to use a snapshot.
+ blockIncomingAppendEntriesAt(followerIndex);
+
+ // Inhibit the MetaStorage on the follower to make snapshots not
eligible for installation.
+ WatchListenerInhibitor listenerInhibitor =
inhibitMetastorageListenersAt(followerIndex);
+
+ try {
+ // Add some data in a schema that is not yet available on the
follower
+ updateTableSchemaAt(leaderIndex);
+ putToTableAt(leaderIndex);
+
+ CountDownLatch installationRejected = installationRejectedLatch();
+ CountDownLatch snapshotInstalled =
snapshotInstalledLatch(followerIndex);
+
+ // Force InstallSnapshot to be used.
+ causeLogTruncationOnSolePartitionLeader(leaderIndex);
+
+ assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did
not see snapshot installation rejection");
+
+ assertThat("Snapshot was installed before unblocking",
snapshotInstalled.getCount(), is(not(0L)));
+
+ listenerInhibitor.stopInhibit();
+
+ assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not
see a snapshot installed");
+ } finally {
+ listenerInhibitor.stopInhibit();
+ }
+ }
+
+ private void updateTableSchemaAt(int nodeIndex) {
+ cluster.doInSession(nodeIndex, session -> {
+ session.execute(null, "alter table test add column added int");
+ });
+ }
+
+ private void putToTableAt(int nodeIndex) {
+ KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex)
+ .tables()
+ .table("test")
+ .keyValueView();
+ kvView.put(null, Tuple.create().set("key", 1),
Tuple.create().set("val", "one"));
+ }
+
+ private void blockIncomingAppendEntriesAt(int nodeIndex) {
+ BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower =
installBlockingAppendEntriesProcessor(nodeIndex);
+
+ blockingProcessorOnFollower.startBlocking();
+ }
+
+ private WatchListenerInhibitor inhibitMetastorageListenersAt(int
nodeIndex) {
+ IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex);
+
+ WatchListenerInhibitor listenerInhibitor =
WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
+ listenerInhibitor.startInhibit();
+
+ return listenerInhibitor;
+ }
+
+ private CountDownLatch installationRejectedLatch() {
+ CountDownLatch installationRejected = new CountDownLatch(1);
+
+ copierLogInspector.addHandler(
+ event ->
event.getMessage().getFormattedMessage().startsWith("Metadata not yet
available, rejecting snapshot installation"),
+ installationRejected::countDown
+ );
+
+ return installationRejected;
+ }
+
+ private BlockingAppendEntriesRequestProcessor
installBlockingAppendEntriesProcessor(int nodeIndex) {
+ RaftServer raftServer = cluster.node(nodeIndex).raftManager().server();
+ RpcServer<?> rpcServer = getFieldValue(raftServer,
JraftServerImpl.class, "rpcServer");
+ Map<String, RpcProcessor<?>> processors = getFieldValue(rpcServer,
IgniteRpcServer.class, "processors");
+
+ AppendEntriesRequestProcessor originalProcessor =
+ (AppendEntriesRequestProcessor)
processors.get(AppendEntriesRequest.class.getName());
+ Executor appenderExecutor = getFieldValue(originalProcessor,
RpcRequestProcessor.class, "executor");
+ RaftMessagesFactory raftMessagesFactory =
getFieldValue(originalProcessor, RpcRequestProcessor.class, "msgFactory");
+
+ BlockingAppendEntriesRequestProcessor blockingProcessor = new
BlockingAppendEntriesRequestProcessor(
+ appenderExecutor,
+ raftMessagesFactory,
+ cluster.solePartitionId().toString()
+ );
+
+ rpcServer.registerProcessor(blockingProcessor);
+
+ return blockingProcessor;
+ }
+
/**
* This exception is thrown to indicate that an operation can not possibly
succeed after some error condition.
* For example there is no reason to retry an operation that inserts a
certain key after receiving a duplicate key error.
@@ -794,4 +927,35 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
super(cause);
}
}
+
+ /**
+ * {@link AppendEntriesRequestProcessor} that, when blocking is enabled,
blocks all AppendEntriesRequests of
+ * the given group (that is, returns EBUSY error code, which makes JRaft
repeat them).
+ */
+ private static class BlockingAppendEntriesRequestProcessor extends
AppendEntriesRequestProcessor {
+ private final String idOfGroupToBlock;
+ private volatile boolean block;
+
+ public BlockingAppendEntriesRequestProcessor(Executor executor,
+ RaftMessagesFactory msgFactory, String idOfGroupToBlock) {
+ super(executor, msgFactory);
+
+ this.idOfGroupToBlock = idOfGroupToBlock;
+ }
+
+ @Override
+ public Message processRequest0(RaftServerService service,
AppendEntriesRequest request, RpcRequestClosure done) {
+ if (block && idOfGroupToBlock.equals(request.groupId())) {
+ return RaftRpcFactory.DEFAULT //
+ .newResponse(done.getMsgFactory(), RaftError.EBUSY,
+ "Blocking AppendEntries on '%s'.",
request.groupId());
+ }
+
+ return super.processRequest0(service, request, done);
+ }
+
+ public void startBlocking() {
+ block = true;
+ }
+ }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index cb501fbd8a..c801e1b341 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1431,7 +1431,7 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
throwExceptionDependingOnStorageStateOnRebalance(state.get(),
createStorageInfo());
}
- // Changed storage states and expect all storage operations to stop
soon.
+ // Change storage states and expect all storage operations to stop
soon.
busyLock.block();
try {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index aa70f553f6..ce64e38fb6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1098,6 +1098,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
partitionUpdateHandlers.indexUpdateHandler,
partitionUpdateHandlers.gcUpdateHandler
),
+ catalogService,
incomingSnapshotsExecutor
));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
index 16d58e23cb..1d803c97cd 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorage.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.table.distributed.raft.snapshot;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotReader;
@@ -56,6 +57,8 @@ public class PartitionSnapshotStorage implements
SnapshotStorage {
/** Instance of partition. */
private final PartitionAccess partition;
+ private final CatalogService catalogService;
+
/**
* Snapshot meta, constructed from the storage data and raft group
configuration at startup.
* {@code null} if the storage is empty.
@@ -81,6 +84,7 @@ public class PartitionSnapshotStorage implements
SnapshotStorage {
* @param snapshotUri Snapshot URI.
* @param raftOptions RAFT options.
* @param partition Partition.
+ * @param catalogService Catalog service.
* @param startupSnapshotMeta Snapshot meta at startup. {@code null} if
the storage is empty.
* @param incomingSnapshotsExecutor Incoming snapshots executor.
*/
@@ -90,6 +94,7 @@ public class PartitionSnapshotStorage implements
SnapshotStorage {
String snapshotUri,
RaftOptions raftOptions,
PartitionAccess partition,
+ CatalogService catalogService,
@Nullable SnapshotMeta startupSnapshotMeta,
Executor incomingSnapshotsExecutor
) {
@@ -98,6 +103,7 @@ public class PartitionSnapshotStorage implements
SnapshotStorage {
this.snapshotUri = snapshotUri;
this.raftOptions = raftOptions;
this.partition = partition;
+ this.catalogService = catalogService;
this.startupSnapshotMeta = startupSnapshotMeta;
this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
}
@@ -137,6 +143,13 @@ public class PartitionSnapshotStorage implements
SnapshotStorage {
return partition;
}
+ /**
+ * Returns catalog service.
+ */
+ public CatalogService catalogService() {
+ return catalogService;
+ }
+
/**
* Returns a snapshot meta, constructed from the storage data and raft
group configuration at startup.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
index 56641e34a2..99588bb5af 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
@@ -50,6 +51,8 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
/** Partition storage. */
private final PartitionAccess partition;
+ private final CatalogService catalogService;
+
/**
* RAFT log index, min of {@link MvPartitionStorage#lastAppliedIndex()}
and {@link TxStateStorage#lastAppliedIndex()}
* at the moment of this factory instantiation.
@@ -62,6 +65,8 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
/** RAFT configuration corresponding to {@link #lastIncludedRaftIndex}. */
private final RaftGroupConfiguration lastIncludedConfiguration;
+ private final int lastCatalogVersionAtStart;
+
/** Incoming snapshots executor. */
private final Executor incomingSnapshotsExecutor;
@@ -71,6 +76,7 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
* @param topologyService Topology service.
* @param outgoingSnapshotsManager Snapshot manager.
* @param partition MV partition storage.
+ * @param catalogService Access to the Catalog.
* @param incomingSnapshotsExecutor Incoming snapshots executor.
* @see SnapshotMeta
*/
@@ -79,11 +85,13 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
TopologyService topologyService,
OutgoingSnapshotsManager outgoingSnapshotsManager,
PartitionAccess partition,
+ CatalogService catalogService,
Executor incomingSnapshotsExecutor
) {
this.topologyService = topologyService;
this.outgoingSnapshotsManager = outgoingSnapshotsManager;
this.partition = partition;
+ this.catalogService = catalogService;
this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
// We must choose the minimum applied index for local recovery so that
we don't skip the raft commands for the storage with the
@@ -92,12 +100,14 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
lastIncludedRaftTerm = partition.minLastAppliedTerm();
lastIncludedConfiguration = partition.committedGroupConfiguration();
+
+ lastCatalogVersionAtStart = catalogService.latestCatalogVersion();
}
@Override
public PartitionSnapshotStorage createSnapshotStorage(String uri,
RaftOptions raftOptions) {
SnapshotMeta startupSnapshotMeta = lastIncludedRaftIndex == 0 ? null :
SnapshotMetaUtils.snapshotMetaAt(
- lastIncludedRaftIndex, lastIncludedRaftTerm,
lastIncludedConfiguration
+ lastIncludedRaftIndex, lastIncludedRaftTerm,
lastIncludedConfiguration, lastCatalogVersionAtStart
);
return new PartitionSnapshotStorage(
@@ -106,6 +116,7 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
uri,
raftOptions,
partition,
+ catalogService,
startupSnapshotMeta,
incomingSnapshotsExecutor
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index fd5a9a3c37..be713c7918 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -17,16 +17,24 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
+import static java.util.concurrent.CompletableFuture.anyOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -68,6 +76,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private static final int MAX_TX_DATA_BATCH_SIZE = 1000;
+ /** Number of milliseconds that the follower is allowed to try to catch up
the required catalog version. */
+ private static final int WAIT_FOR_METADATA_CATCHUP_MS = 3000;
+
private final PartitionSnapshotStorage partitionSnapshotStorage;
private final SnapshotUri snapshotUri;
@@ -87,7 +98,10 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private volatile SnapshotMeta snapshotMeta;
@Nullable
- private volatile CompletableFuture<?> rebalanceFuture;
+ private volatile CompletableFuture<Boolean> metadataSufficiencyFuture;
+
+ @Nullable
+ private volatile CompletableFuture<Void> rebalanceFuture;
/**
* Future is to wait in {@link #join()} because it is important for us to
wait for the rebalance to finish or abort.
@@ -112,20 +126,57 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
LOG.info("Copier is started for the partition [{}]",
createPartitionInfo());
- rebalanceFuture = partitionSnapshotStorage.partition().startRebalance()
- .thenCompose(unused -> {
- ClusterNode snapshotSender =
getSnapshotSender(snapshotUri.nodeName);
+ ClusterNode snapshotSender = getSnapshotSender(snapshotUri.nodeName);
+
+ metadataSufficiencyFuture = snapshotSender == null
+ ? failedFuture(new StorageRebalanceException("Snapshot sender
not found: " + snapshotUri.nodeName))
+ : loadSnapshotMeta(snapshotSender)
+ // Give metadata some time to catch up as it's very
probable that the leader is ahead metadata-wise.
+ .thenCompose(unused -> waitForMetadataWithTimeout())
+ .thenApply(unused -> metadataIsSufficientlyComplete())
+ ;
+
+ rebalanceFuture =
metadataSufficiencyFuture.thenCompose(metadataSufficient -> {
+ if (metadataSufficient) {
+ return partitionSnapshotStorage.partition().startRebalance()
+ .thenCompose(unused -> {
+ assert snapshotSender != null;
+
+ return loadSnapshotMvData(snapshotSender, executor)
+ .thenCompose(unused1 ->
loadSnapshotTxData(snapshotSender, executor));
+ });
+ } else {
+ logMetadataInsufficiencyAndSetError();
+
+ return completedFuture(null);
+ }
+ });
- if (snapshotSender == null) {
- throw new StorageRebalanceException("Snapshot sender
not found: " + snapshotUri.nodeName);
- }
+ joinFuture = metadataSufficiencyFuture.thenCompose(metadataSufficient
-> {
+ if (metadataSufficient) {
+ return rebalanceFuture.handle((unused, throwable) ->
completeRebalance(throwable)).thenCompose(Function.identity());
+ } else {
+ return completedFuture(null);
+ }
+ });
+ }
- return loadSnapshotMeta(snapshotSender)
- .thenCompose(unused1 ->
loadSnapshotMvData(snapshotSender, executor))
- .thenCompose(unused1 ->
loadSnapshotTxData(snapshotSender, executor));
- });
+ private CompletableFuture<?> waitForMetadataWithTimeout() {
+ CompletableFuture<?> metadataReadyFuture =
partitionSnapshotStorage.catalogService()
+ .catalogReadyFuture(snapshotMeta.requiredCatalogVersion());
+ CompletableFuture<?> readinessTimeoutFuture =
completeOnMetadataReadinessTimeout();
- joinFuture = rebalanceFuture.handle((unused, throwable) ->
completeRebalance(throwable)).thenCompose(Function.identity());
+ return anyOf(metadataReadyFuture, readinessTimeoutFuture);
+ }
+
+ private static CompletableFuture<?> completeOnMetadataReadinessTimeout() {
+ return new CompletableFuture<>()
+ .orTimeout(WAIT_FOR_METADATA_CATCHUP_MS, TimeUnit.MILLISECONDS)
+ .exceptionally(ex -> {
+ assert (ex instanceof TimeoutException);
+
+ return null;
+ });
}
@Override
@@ -140,14 +191,16 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
} catch (ExecutionException e) {
Throwable cause = e.getCause();
- LOG.error("Error when completing the copier", cause);
+ if (!(cause instanceof CancellationException)) {
+ LOG.error("Error when completing the copier", cause);
- if (!isOk()) {
- setError(RaftError.UNKNOWN, "Unknown error on completion
the copier");
- }
+ if (isOk()) {
+ setError(RaftError.UNKNOWN, "Unknown error on
completion the copier");
+ }
- // By analogy with LocalSnapshotCopier#join.
- throw new IllegalStateException(cause);
+ // By analogy with LocalSnapshotCopier#join.
+ throw new IllegalStateException(cause);
+ }
}
}
}
@@ -163,11 +216,14 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
LOG.info("Copier is canceled for partition [{}]",
createPartitionInfo());
- CompletableFuture<?> fut = rebalanceFuture;
+ // Cancel all futures that might be upstream wrt joinFuture.
+ List<CompletableFuture<?>> futuresToCancel =
Stream.of(metadataSufficiencyFuture, rebalanceFuture)
+ .filter(Objects::nonNull)
+ .collect(toList());
- if (fut != null) {
- fut.cancel(false);
+ futuresToCancel.forEach(future -> future.cancel(false));
+ if (!futuresToCancel.isEmpty()) {
try {
// Because after the cancellation, no one waits for #join.
join();
@@ -215,6 +271,28 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
}
}
+ private boolean metadataIsSufficientlyComplete() {
+ return isMetadataAvailableFor(snapshotMeta.requiredCatalogVersion(),
partitionSnapshotStorage.catalogService());
+ }
+
+ private void logMetadataInsufficiencyAndSetError() {
+ LOG.warn(
+ "Metadata not yet available, rejecting snapshot installation
[uri={}, requiredVersion={}].",
+ this.snapshotUri,
+ snapshotMeta.requiredCatalogVersion()
+ );
+
+ String errorMessage = String.format(
+ "Metadata not yet available, URI '%s', required level %s;
rejecting snapshot installation.",
+ this.snapshotUri,
+ snapshotMeta.requiredCatalogVersion()
+ );
+
+ if (isOk()) {
+ setError(RaftError.EBUSY, errorMessage);
+ }
+ }
+
/**
* Requests and stores data into {@link MvPartitionStorage}.
*/
@@ -276,7 +354,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
/**
* Requests and stores data into {@link TxStateStorage}.
*/
- private CompletableFuture<?> loadSnapshotTxData(ClusterNode
snapshotSender, Executor executor) {
+ private CompletableFuture<Void> loadSnapshotTxData(ClusterNode
snapshotSender, Executor executor) {
if (!busyLock.enterBusy()) {
return completedFuture(null);
}
@@ -340,7 +418,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
*/
private CompletableFuture<Void> completeRebalance(@Nullable Throwable
throwable) {
if (!busyLock.enterBusy()) {
- if (!isOk()) {
+ if (isOk()) {
setError(RaftError.ECANCELED, "Copier is cancelled");
}
@@ -351,7 +429,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
if (throwable != null) {
LOG.error("Partition rebalancing error [{}]", throwable,
createPartitionInfo());
- if (!isOk()) {
+ if (isOk()) {
setError(RaftError.UNKNOWN, throwable.getMessage());
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index 82544f9f06..6cec945695 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -25,6 +25,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -63,6 +64,8 @@ public class OutgoingSnapshot {
private final PartitionAccess partition;
+ private final CatalogService catalogService;
+
/**
* Lock that is used for mutual exclusion of MV snapshot reading (by this
class) and threads that write MV data to the same
* partition (currently, via {@link SnapshotAwarePartitionDataStorage}).
@@ -113,9 +116,10 @@ public class OutgoingSnapshot {
/**
* Creates a new instance.
*/
- public OutgoingSnapshot(UUID id, PartitionAccess partition) {
+ public OutgoingSnapshot(UUID id, PartitionAccess partition, CatalogService
catalogService) {
this.id = id;
this.partition = partition;
+ this.catalogService = catalogService;
lastRowId = RowId.lowestRowId(partition.partitionKey().partitionId());
}
@@ -159,7 +163,7 @@ public class OutgoingSnapshot {
assert config != null : "Configuration should never be null when
installing a snapshot";
- return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex,
lastAppliedTerm, config);
+ return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex,
lastAppliedTerm, config, catalogService.latestCatalogVersion());
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index 24224f92c5..0c7ad9a585 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -50,7 +50,7 @@ public class OutgoingSnapshotReader extends SnapshotReader {
public OutgoingSnapshotReader(PartitionSnapshotStorage snapshotStorage) {
this.snapshotStorage = snapshotStorage;
- snapshot = new OutgoingSnapshot(id, snapshotStorage.partition());
+ snapshot = new OutgoingSnapshot(id, snapshotStorage.partition(),
snapshotStorage.catalogService());
LOG.info("Starting snapshot reader for snapshot {}", id);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
index 0ed0b403a1..cff8692500 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java
@@ -32,14 +32,16 @@ public class SnapshotMetaUtils {
* @param logIndex RAFT log index.
* @param term Term corresponding to the index.
* @param config RAFT group configuration.
+ * @param requiredCatalogVersion Catalog version that a follower/learner
must have to have ability to accept this snapshot.
* @return SnapshotMeta corresponding to the given log index.
*/
- public static SnapshotMeta snapshotMetaAt(long logIndex, long term,
RaftGroupConfiguration config) {
+ public static SnapshotMeta snapshotMetaAt(long logIndex, long term,
RaftGroupConfiguration config, int requiredCatalogVersion) {
SnapshotMetaBuilder metaBuilder = new
RaftMessagesFactory().snapshotMeta()
.lastIncludedIndex(logIndex)
.lastIncludedTerm(term)
.peersList(config.peers())
- .learnersList(config.learners());
+ .learnersList(config.learners())
+ .requiredCatalogVersion(requiredCatalogVersion);
if (!config.isStable()) {
//noinspection ConstantConditions
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
index 499249eeae..d61f3197ad 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
@@ -34,7 +34,7 @@ public class CatalogVersionSufficiency {
* @param catalogService Catalog service.
* @return {@code true} iff the local Catalog version is sufficient.
*/
- static boolean isMetadataAvailableFor(int requiredCatalogVersion,
CatalogService catalogService) {
+ public static boolean isMetadataAvailableFor(int requiredCatalogVersion,
CatalogService catalogService) {
return requiredCatalogVersion <= catalogService.latestCatalogVersion();
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
index 57a9a037f7..d8f6d0b504 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -55,6 +56,7 @@ public class PartitionSnapshotStorageFactoryTest extends
BaseIgniteAbstractTest
mock(TopologyService.class),
mock(OutgoingSnapshotsManager.class),
partitionAccess,
+ mock(CatalogService.class),
mock(Executor.class)
);
@@ -70,6 +72,7 @@ public class PartitionSnapshotStorageFactoryTest extends
BaseIgniteAbstractTest
mock(TopologyService.class),
mock(OutgoingSnapshotsManager.class),
partitionAccess,
+ mock(CatalogService.class),
mock(Executor.class)
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
index ed83f131e0..a51c75e468 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.startup.StartupPartitionSnapshotReader;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -61,6 +62,7 @@ class PartitionSnapshotStorageTest extends
BaseIgniteAbstractTest {
"",
mock(RaftOptions.class),
mock(PartitionAccess.class),
+ mock(CatalogService.class),
metaForCleanStorage,
mock(Executor.class)
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 9bc08ce2cb..9574aa777a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -39,6 +40,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -54,8 +56,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -80,6 +84,7 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorage;
import org.apache.ignite.internal.table.distributed.raft.snapshot.SnapshotUri;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaRequest;
+import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataRequest;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
@@ -98,6 +103,7 @@ import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
@@ -137,13 +143,28 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
private final RaftGroupConfigurationConverter
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
- private MvGc mvGc;
+ private final MvGc mvGc = mock(MvGc.class);
+
+ private final CatalogService catalogService = mock(CatalogService.class);
+
+ private final MvPartitionStorage outgoingMvPartitionStorage = new
TestMvPartitionStorage(TEST_PARTITION);
+ private final TxStateStorage outgoingTxStatePartitionStorage = new
TestTxStateStorage();
+
+ private final MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
+ private final TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
+
+ private final long expLastAppliedIndex = 100500L;
+ private final long expLastAppliedTerm = 100L;
+ private final RaftGroupConfiguration expLastGroupConfig =
generateRaftGroupConfig();
+
+ private final List<RowId> rowIds = generateRowIds();
+ private final List<UUID> txIds = generateTxIds();
@BeforeEach
void setUp() {
- mvGc = mock(MvGc.class);
-
when(mvGc.removeStorage(any(TablePartitionId.class))).then(invocation
-> completedFuture(null));
+
+
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(completedFuture(null));
}
@AfterEach
@@ -153,27 +174,12 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
@Test
void test() {
- MvPartitionStorage outgoingMvPartitionStorage = new
TestMvPartitionStorage(TEST_PARTITION);
- TxStateStorage outgoingTxStatePartitionStorage = new
TestTxStateStorage();
-
- long expLastAppliedIndex = 100500L;
- long expLastAppliedTerm = 100L;
- RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+ fillOriginalStorages();
- List<RowId> rowIds = generateRowIds();
- List<UUID> txIds = generateTxIds();
-
- fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
- fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
-
- MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
- TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
-
- assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION),
willCompleteSuccessfully());
- incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+ createTargetStorages();
MessagingService messagingService =
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
- outgoingTxStatePartitionStorage, expLastAppliedIndex,
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+ outgoingTxStatePartitionStorage, rowIds, txIds);
PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
snapshotId,
@@ -215,31 +221,21 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
verify(incomingTxStatePartitionStorage, times(1)).startRebalance();
}
- private MessagingService
messagingServiceForSuccessScenario(MvPartitionStorage
outgoingMvPartitionStorage,
- TxStateStorage outgoingTxStatePartitionStorage, long
expLastAppliedIndex, long expLastAppliedTerm,
- RaftGroupConfiguration expLastGroupConfig, List<RowId> rowIds,
List<UUID> txIds, UUID snapshotId) {
- MessagingService messagingService = mock(MessagingService.class);
+ private void createTargetStorages() {
+ assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION),
willCompleteSuccessfully());
+ incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+ }
- when(messagingService.invoke(eq(clusterNode),
any(SnapshotMetaRequest.class), anyLong())).then(answer -> {
- SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1);
+ private void fillOriginalStorages() {
+ fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+ fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
+ }
- assertEquals(snapshotId, snapshotMetaRequest.id());
+ private MessagingService
messagingServiceForSuccessScenario(MvPartitionStorage
outgoingMvPartitionStorage,
+ TxStateStorage outgoingTxStatePartitionStorage, List<RowId>
rowIds, List<UUID> txIds) {
+ MessagingService messagingService = mock(MessagingService.class);
- return completedFuture(
- TABLE_MSG_FACTORY.snapshotMetaResponse()
- .meta(
- RAFT_MSG_FACTORY.snapshotMeta()
-
.lastIncludedIndex(expLastAppliedIndex)
-
.lastIncludedTerm(expLastAppliedTerm)
-
.peersList(expLastGroupConfig.peers())
-
.learnersList(expLastGroupConfig.learners())
-
.oldPeersList(expLastGroupConfig.oldPeers())
-
.oldLearnersList(expLastGroupConfig.oldLearners())
- .build()
- )
- .build()
- );
- });
+ returnSnapshotMetaWhenAskedForIt(messagingService);
when(messagingService.invoke(eq(clusterNode),
any(SnapshotMvDataRequest.class), anyLong())).then(answer -> {
SnapshotMvDataRequest snapshotMvDataRequest =
answer.getArgument(1);
@@ -266,6 +262,32 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
return messagingService;
}
+ private void returnSnapshotMetaWhenAskedForIt(MessagingService
messagingService) {
+ when(messagingService.invoke(eq(clusterNode),
any(SnapshotMetaRequest.class), anyLong())).then(answer -> {
+ SnapshotMetaRequest snapshotMetaRequest = answer.getArgument(1);
+
+ assertEquals(snapshotId, snapshotMetaRequest.id());
+
+ return completedFuture(snapshotMetaResponse(0));
+ });
+ }
+
+ private SnapshotMetaResponse snapshotMetaResponse(int
requiredCatalogVersion) {
+ return TABLE_MSG_FACTORY.snapshotMetaResponse()
+ .meta(
+ RAFT_MSG_FACTORY.snapshotMeta()
+ .lastIncludedIndex(expLastAppliedIndex)
+ .lastIncludedTerm(expLastAppliedTerm)
+ .peersList(expLastGroupConfig.peers())
+ .learnersList(expLastGroupConfig.learners())
+ .oldPeersList(expLastGroupConfig.oldPeers())
+
.oldLearnersList(expLastGroupConfig.oldLearners())
+ .requiredCatalogVersion(requiredCatalogVersion)
+ .build()
+ )
+ .build();
+ }
+
private PartitionSnapshotStorage createPartitionSnapshotStorage(
UUID snapshotId,
MvTableStorage incomingTableStorage,
@@ -293,6 +315,7 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
mock(IndexUpdateHandler.class),
mock(GcUpdateHandler.class)
)),
+ catalogService,
mock(SnapshotMeta.class),
executorService
);
@@ -435,18 +458,14 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
}
@Test
- void cancellationMakesJoinFinishIfHangingOnNetworkCall() throws Exception {
- MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
- TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
-
- assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION),
willCompleteSuccessfully());
- incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+ void cancellationMakesJoinFinishIfHangingOnNetworkCallToSnapshotMetadata()
throws Exception {
+ createTargetStorages();
CountDownLatch networkInvokeLatch = new CountDownLatch(1);
MessagingService messagingService = mock(MessagingService.class);
- when(messagingService.invoke(any(ClusterNode.class), any(),
anyLong())).then(invocation -> {
+ when(messagingService.invoke(any(ClusterNode.class),
any(SnapshotMetaRequest.class), anyLong())).then(invocation -> {
networkInvokeLatch.countDown();
return new CompletableFuture<>();
@@ -474,32 +493,58 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS));
- verify(partitionSnapshotStorage.partition()).abortRebalance();
+ verify(partitionSnapshotStorage.partition(), never()).startRebalance();
+ verify(partitionSnapshotStorage.partition(), never()).abortRebalance();
}
@Test
- void testCancelOnMiddleRebalance() {
- MvPartitionStorage outgoingMvPartitionStorage = new
TestMvPartitionStorage(TEST_PARTITION);
- TxStateStorage outgoingTxStatePartitionStorage = new
TestTxStateStorage();
+ void cancellationMakesJoinFinishIfHangingOnNetworkCallWhenGettingData()
throws Exception {
+ createTargetStorages();
- long expLastAppliedIndex = 100500L;
- long expLastAppliedTerm = 100L;
- RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+ CountDownLatch networkInvokeLatch = new CountDownLatch(1);
- List<RowId> rowIds = generateRowIds();
- List<UUID> txIds = generateTxIds();
+ MessagingService messagingService = mock(MessagingService.class);
- fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
- fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
+ returnSnapshotMetaWhenAskedForIt(messagingService);
+ when(messagingService.invoke(any(ClusterNode.class),
any(SnapshotMvDataRequest.class), anyLong())).then(invocation -> {
+ networkInvokeLatch.countDown();
- MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
- TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
+ return new CompletableFuture<>();
+ });
- assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION),
willCompleteSuccessfully());
- incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+ PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
+ snapshotId,
+ incomingMvTableStorage,
+ incomingTxStateTableStorage,
+ messagingService
+ );
+
+ SnapshotCopier snapshotCopier =
partitionSnapshotStorage.startToCopyFrom(
+ SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+ mock(SnapshotCopierOptions.class)
+ );
+
+ networkInvokeLatch.await(1, TimeUnit.SECONDS);
+
+ CompletableFuture<?> cancelAndJoinFuture = runAsync(() -> {
+ snapshotCopier.cancel();
+
+ snapshotCopier.join();
+ });
+
+ assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS));
+
+ verify(partitionSnapshotStorage.partition()).abortRebalance();
+ }
+
+ @Test
+ void testCancelOnMiddleRebalance() {
+ fillOriginalStorages();
+
+ createTargetStorages();
MessagingService messagingService =
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
- outgoingTxStatePartitionStorage, expLastAppliedIndex,
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+ outgoingTxStatePartitionStorage, rowIds, txIds);
PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
snapshotId,
@@ -547,27 +592,12 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
@Test
void testErrorInProcessOfRebalance() {
- MvPartitionStorage outgoingMvPartitionStorage = new
TestMvPartitionStorage(TEST_PARTITION);
- TxStateStorage outgoingTxStatePartitionStorage = new
TestTxStateStorage();
-
- long expLastAppliedIndex = 100500L;
- long expLastAppliedTerm = 100L;
- RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
-
- List<RowId> rowIds = generateRowIds();
- List<UUID> txIds = generateTxIds();
-
- fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
- fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
+ fillOriginalStorages();
- MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT));
- TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
-
- assertThat(incomingMvTableStorage.createMvPartition(TEST_PARTITION),
willCompleteSuccessfully());
- incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+ createTargetStorages();
MessagingService messagingService =
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
- outgoingTxStatePartitionStorage, expLastAppliedIndex,
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+ outgoingTxStatePartitionStorage, rowIds, txIds);
PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
snapshotId,
@@ -642,4 +672,60 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
List.of("old-learner")
);
}
+
+ @Test
+ void laggingSchemasPreventSnapshotInstallation() {
+ fillOriginalStorages();
+
+ createTargetStorages();
+
+ MessagingService messagingService = mock(MessagingService.class);
+
+ when(messagingService.invoke(eq(clusterNode),
any(SnapshotMetaRequest.class), anyLong()))
+ .thenReturn(completedFuture(snapshotMetaResponse(42)));
+
+ PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
+ snapshotId,
+ incomingMvTableStorage,
+ incomingTxStateTableStorage,
+ messagingService
+ );
+
+ SnapshotCopier snapshotCopier =
partitionSnapshotStorage.startToCopyFrom(
+ SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+ mock(SnapshotCopierOptions.class)
+ );
+
+ assertThat(runAsync(snapshotCopier::join), willSucceedIn(1,
TimeUnit.SECONDS));
+
+ assertEquals(RaftError.EBUSY.getNumber(), snapshotCopier.getCode());
+
+ verify(messagingService, never()).invoke(any(ClusterNode.class),
any(SnapshotMvDataRequest.class), anyLong());
+ verify(messagingService, never()).invoke(any(ClusterNode.class),
any(SnapshotTxDataRequest.class), anyLong());
+
+ verify(partitionSnapshotStorage.partition(), never()).startRebalance();
+ verify(partitionSnapshotStorage.partition(), never()).abortRebalance();
+
+ assertThatTargetStoragesAreEmpty(incomingMvTableStorage,
incomingTxStateTableStorage);
+ }
+
+ private static void assertThatTargetStoragesAreEmpty(
+ MvTableStorage incomingMvTableStorage,
+ TxStateTableStorage incomingTxStateTableStorage
+ ) {
+ MvPartitionStorage incomingMvPartitionStorage =
incomingMvTableStorage.getMvPartition(TEST_PARTITION);
+ TxStateStorage incomingTxStatePartitionStorage =
incomingTxStateTableStorage.getTxStateStorage(TEST_PARTITION);
+
+ assertEquals(0L, incomingMvPartitionStorage.lastAppliedIndex());
+ assertEquals(0L, incomingMvPartitionStorage.lastAppliedTerm());
+ assertArrayEquals(
+ null,
+ incomingMvPartitionStorage.committedGroupConfiguration()
+ );
+ assertEquals(0L, incomingTxStatePartitionStorage.lastAppliedIndex());
+ assertEquals(0L, incomingTxStatePartitionStorage.lastAppliedTerm());
+
+
assertFalse(incomingMvPartitionStorage.scan(HybridTimestamp.MAX_VALUE).hasNext());
+ assertFalse(incomingTxStatePartitionStorage.scan().hasNext());
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index c4bf8a1b5a..74a19e9471 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
@@ -44,17 +45,22 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
@Mock
private PartitionAccess partitionAccess;
+ @Mock
+ private CatalogService catalogService;
+
private OutgoingSnapshot snapshot;
private final TableMessagesFactory messagesFactory = new
TableMessagesFactory();
private final PartitionKey partitionKey = new PartitionKey(1, 1);
+ private static final int REQUIRED_CATALOG_VERSION = 42;
+
@BeforeEach
void createTestInstance() {
when(partitionAccess.partitionKey()).thenReturn(partitionKey);
- snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess);
+ snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess,
catalogService);
}
@Test
@@ -75,6 +81,8 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
List.of("learner1:3000")
));
+
when(catalogService.latestCatalogVersion()).thenReturn(REQUIRED_CATALOG_VERSION);
+
snapshot.freezeScopeUnderMvLock();
SnapshotMetaResponse response = getSnapshotMetaResponse();
@@ -85,6 +93,7 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
assertThat(response.meta().learnersList(), is(List.of("learner1:3000",
"learner2:3000")));
assertThat(response.meta().oldPeersList(), is(List.of("peer1:3000")));
assertThat(response.meta().oldLearnersList(),
is(List.of("learner1:3000")));
+ assertThat(response.meta().requiredCatalogVersion(),
is(REQUIRED_CATALOG_VERSION));
}
private SnapshotMetaResponse getSnapshotMetaResponse() {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 1e868a3190..4615eec80a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -61,6 +62,9 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
@Mock
private PartitionAccess partitionAccess;
+ @Mock
+ private CatalogService catalogService;
+
private OutgoingSnapshot snapshot;
private final TableMessagesFactory messagesFactory = new
TableMessagesFactory();
@@ -83,7 +87,7 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
void createTestInstance() {
when(partitionAccess.partitionKey()).thenReturn(partitionKey);
- snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess);
+ snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess,
catalogService);
}
@BeforeEach
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index 82a598f4ca..c1ed183e0e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.concurrent.Executor;
+import org.apache.ignite.internal.catalog.CatalogService;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
@@ -60,6 +61,7 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
"",
mock(RaftOptions.class),
partitionAccess,
+ mock(CatalogService.class),
mock(SnapshotMeta.class),
mock(Executor.class)
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
index 3e25c9b1e1..998bfd41c2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -59,6 +60,9 @@ class OutgoingSnapshotTxDataStreamingTest extends
BaseIgniteAbstractTest {
@Mock
private PartitionAccess partitionAccess;
+ @Mock
+ private CatalogService catalogService;
+
private OutgoingSnapshot snapshot;
private final TableMessagesFactory messagesFactory = new
TableMessagesFactory();
@@ -82,7 +86,7 @@ class OutgoingSnapshotTxDataStreamingTest extends
BaseIgniteAbstractTest {
lenient().when(partitionAccess.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class));
- snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess);
+ snapshot = new OutgoingSnapshot(UUID.randomUUID(), partitionAccess,
catalogService);
}
@Test
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
index e01f7c9a25..6b57d31250 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccess;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
@@ -44,6 +45,9 @@ class OutgoingSnapshotsManagerTest extends
BaseIgniteAbstractTest {
@Mock
private PartitionAccess partitionAccess;
+ @Mock
+ private CatalogService catalogService;
+
private final PartitionKey partitionKey = new PartitionKey(1, 1);
@SuppressWarnings("EmptyTryBlock")
@@ -68,7 +72,7 @@ class OutgoingSnapshotsManagerTest extends
BaseIgniteAbstractTest {
when(partitionAccess.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class));
- OutgoingSnapshot snapshot = new OutgoingSnapshot(UUID.randomUUID(),
partitionAccess);
+ OutgoingSnapshot snapshot = new OutgoingSnapshot(UUID.randomUUID(),
partitionAccess, catalogService);
assertDoesNotThrow(() ->
manager.startOutgoingSnapshot(UUID.randomUUID(), snapshot));
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
index f29cae8d0a..054ebe7e7d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtilsTest.java
@@ -38,7 +38,7 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
List.of("peer1:3000"), List.of("learner1:3000")
);
- SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, config);
+ SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, config,
42);
assertThat(meta.lastIncludedIndex(), is(100L));
assertThat(meta.lastIncludedTerm(), is(3L));
@@ -46,11 +46,12 @@ class SnapshotMetaUtilsTest extends BaseIgniteAbstractTest {
assertThat(meta.learnersList(), is(List.of("learner1:3000",
"learner2:3000")));
assertThat(meta.oldPeersList(), is(List.of("peer1:3000")));
assertThat(meta.oldLearnersList(), is(List.of("learner1:3000")));
+ assertThat(meta.requiredCatalogVersion(), is(42));
}
@Test
void doesNotIncludeOldConfigWhenItIsNotThere() {
- SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, new
RaftGroupConfiguration(List.of(), List.of(), null, null));
+ SnapshotMeta meta = SnapshotMetaUtils.snapshotMetaAt(100, 3, new
RaftGroupConfiguration(List.of(), List.of(), null, null), 42);
assertThat(meta.oldPeersList(), is(nullValue()));
assertThat(meta.oldLearnersList(), is(nullValue()));