This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eeecdfd56e3 IGNITE-27460 Add schema compatibility validation for full
commands (#7598)
eeecdfd56e3 is described below
commit eeecdfd56e3acf753df7e0b11fca1df22b08647b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Feb 20 21:03:49 2026 +0400
IGNITE-27460 Add schema compatibility validation for full commands (#7598)
---
.../asserts/CompletableFutureAssert.java | 68 ++++-
.../rebalance/ItRebalanceDistributedTest.java | 4 +
.../partition/replicator/fixtures/Node.java | 41 +--
.../ItZonePartitionRaftListenerRecoveryTest.java | 2 +-
.../handlers/TxFinishReplicaRequestHandler.java | 19 +-
.../network/command/UpdateAllCommand.java | 14 +-
.../replicator/network/command/UpdateCommand.java | 12 +-
.../{UpdateCommand.java => UpdateCommandBase.java} | 28 +-
.../replicator/raft/ZonePartitionRaftListener.java | 2 +-
.../schemacompat/CompatValidationResult.java | 23 ++
.../schemacompat/SchemaCompatibilityValidator.java | 9 +-
.../PartitionReplicaLifecycleManagerTest.java | 2 +
.../internal/raft/server/RaftGroupOptions.java | 12 +
.../internal/raft/server/impl/JraftServerImpl.java | 3 +
.../apache/ignite/raft/jraft/core/NodeImpl.java | 38 ++-
.../apache/ignite/raft/jraft/error/RaftError.java | 8 +-
.../ignite/raft/jraft/option/NodeOptions.java | 13 +
.../jraft/option/PermissiveSafeTimeValidator.java} | 27 +-
.../jraft/option/SafeTimeValidationResult.java | 63 ++++
.../raft/jraft/option/SafeTimeValidator.java | 36 +++
.../ItPlacementDriverReplicaSideTest.java | 2 +
.../ignite/internal/replicator/ReplicaManager.java | 8 +
.../internal/replicator/ReplicaManagerTest.java | 2 +
.../runner/app/ItIgniteNodeRestartTest.java | 45 +--
...ockedSchemaSyncAndRaftCommandExecutionTest.java | 213 +++++++++++++
...eSchemaForwardCompatibilityConsistencyTest.java | 29 ++
...tSchemaForwardCompatibilityConsistencyTest.java | 330 +++++++++++++++++++++
...eSchemaForwardCompatibilityConsistencyTest.java | 29 ++
.../org/apache/ignite/internal/app/IgniteImpl.java | 21 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 2 +
.../internal/table/distributed/TableManager.java | 8 +-
.../raft/PartitionSafeTimeValidator.java | 94 ++++++
.../replicator/PartitionReplicaListener.java | 25 +-
.../schema/CheckCatalogVersionOnActionRequest.java | 23 +-
.../schema/CheckCatalogVersionOnAppendEntries.java | 15 +-
...onSufficiency.java => MetadataSufficiency.java} | 8 +-
.../distributed/TableManagerRecoveryTest.java | 4 +
.../table/distributed/TableManagerTest.java | 2 +
.../replication/PartitionReplicaListenerTest.java | 66 +++++
.../CheckCatalogVersionOnActionRequestTest.java | 6 +-
...iencyTest.java => MetadataSufficiencyTest.java} | 10 +-
.../CompoundValidationSchemasSource.java | 55 ++++
.../apache/ignite/distributed/ItTxTestCluster.java | 17 +-
.../table/impl/DummyInternalTableImpl.java | 11 +-
44 files changed, 1267 insertions(+), 182 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
index f78ec4ccd66..191297512ea 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
+import org.apache.ignite.internal.util.ExceptionUtils;
/**
* Assertions related to {@link CompletableFuture}.
@@ -34,7 +36,7 @@ public class CompletableFutureAssert {
* Asserts that the given future completes with an exception being an
instance of the given class and returns
* that exception for further examination.
*
- * <p>Unlike {@link
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher#willThrowFast(Class)},
+ * <p>Unlike {@link
CompletableFutureExceptionMatcher#willThrowFast(Class)},
* this method allows to examine the actual exception thrown further in
the test.
*
* @param future Future to work on.
@@ -54,7 +56,7 @@ public class CompletableFutureAssert {
* that exception for further examination.
*
* <p>Unlike
- * {@link
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher#willThrow(Class,
int, TimeUnit)},
+ * {@link CompletableFutureExceptionMatcher#willThrow(Class, int,
TimeUnit)},
* this method allows to examine the actual exception thrown further in
the test.
*
* @param future Future to work on.
@@ -71,7 +73,7 @@ public class CompletableFutureAssert {
* that exception for further examination.
*
* <p>Unlike
- * {@link
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher#willThrow(Class,
int, TimeUnit)},
+ * {@link CompletableFutureExceptionMatcher#willThrow(Class, int,
TimeUnit)},
* this method allows to examine the actual exception thrown further in
the test.
*
* @param future Future to work on.
@@ -117,6 +119,66 @@ public class CompletableFutureAssert {
+ ", but it completed normally with result " + normalResult);
}
+ /**
+ * Asserts that the given future completes with an exception having a
cause (transitively) which is an instance of the given class
+ * (in time) and returns that exception for further examination.
+ *
+ * <p>Unlike
+ * {@link CompletableFutureExceptionMatcher#willThrow(Class, int,
TimeUnit)},
+ * this method allows to examine the actual exception thrown further in
the test.
+ *
+ * @param future Future to work on.
+ * @param expectedCauseClass Expected class of the exception.
+ * @return Matched exception.
+ */
+ public static Throwable assertWillThrowCausedBy(
+ CompletableFuture<?> future,
+ Class<?> expectedCauseClass
+ ) {
+ return assertWillThrowCausedBy(future, expectedCauseClass, 10,
SECONDS);
+ }
+
+ /**
+ * Asserts that the given future completes with an exception having a
cause (transitively) which is an instance of the given class
+ * (in time) and returns that exception for further examination.
+ *
+ * <p>Unlike
+ * {@link CompletableFutureExceptionMatcher#willThrow(Class, int,
TimeUnit)},
+ * this method allows to examine the actual exception thrown further in
the test.
+ *
+ * @param future Future to work on.
+ * @param expectedCauseClass Expected class of the exception.
+ * @param timeout Duration to wait for future completion.
+ * @param timeUnit Time unit of the duration.
+ * @return Matched exception.
+ */
+ public static Throwable assertWillThrowCausedBy(
+ CompletableFuture<?> future,
+ Class<?> expectedCauseClass,
+ long timeout,
+ TimeUnit timeUnit
+ ) {
+ Object normalResult;
+
+ try {
+ normalResult = future.get(timeout, timeUnit);
+ } catch (Throwable e) {
+ if (ExceptionUtils.hasCause(e, expectedCauseClass)) {
+ // The user actually expects this exception, let's return it.
+ return e;
+ }
+
+ return fail(
+ "Expected the future to be completed with an exception
with a cause of class "
+ + expectedCauseClass + " , but got something
different",
+ e
+ );
+ }
+
+ return fail("Expected the future to be completed with an exception
with a cause of class " + expectedCauseClass
+ + ", but it completed normally with result " + normalResult);
+ }
+
private static Throwable unwrapThrowable(Throwable e) {
while (e instanceof ExecutionException || e instanceof
CompletionException) {
e = e.getCause();
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index ff04b0d95ac..37a200bddd0 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -180,6 +180,7 @@ import
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
@@ -260,6 +261,7 @@ import
org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
import
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -1481,6 +1483,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
partitionIdleSafeTimePropagationPeriodMsSupplier,
new NoOpFailureManager(),
new
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
+ new PermissiveSafeTimeValidator(),
topologyAwareRaftGroupServiceFactory,
raftManager,
partitionRaftConfigurer,
@@ -1581,6 +1584,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
sharedTxStateStorage,
metaStorageManager,
schemaManager,
+ new CatalogValidationSchemasSource(catalogManager,
schemaManager),
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
threadPoolsManager.commonScheduler(),
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 3a0edc1930a..f0306d79b66 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -124,6 +124,8 @@ import
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -173,6 +175,7 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
@@ -639,6 +642,25 @@ public class Node {
volatileLogStorageManagerCreator = new
VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-"
+ name));
+ schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
+
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
+
+ LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
+
+ schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker,
delayDurationMsSupplier);
+
+ catalogManager = new CatalogManagerImpl(
+ new UpdateLogImpl(metaStorageManager, failureManager),
+ clockService,
+ failureManager,
+ delayDurationMsSupplier,
+ PartitionCountProvider.defaultPartitionCountProvider()
+ );
+
+ schemaManager = new SchemaManager(registry, catalogManager);
+
+ ValidationSchemasSource validationSchemasSource = new
CatalogValidationSchemasSource(catalogManager, schemaManager);
+
replicaManager = new ReplicaManager(
name,
clusterService,
@@ -651,6 +673,7 @@ public class Node {
partitionIdleSafeTimePropagationPeriodMsSupplier,
new NoOpFailureManager(),
new
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
+ new PartitionSafeTimeValidator(validationSchemasSource,
catalogManager, schemaSyncService),
topologyAwareRaftGroupServiceFactory,
raftManager,
partitionRaftConfigurer,
@@ -661,28 +684,11 @@ public class Node {
threadPoolsManager.commonScheduler()
);
- LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
-
- catalogManager = new CatalogManagerImpl(
- new UpdateLogImpl(metaStorageManager, failureManager),
- clockService,
- failureManager,
- delayDurationMsSupplier,
- PartitionCountProvider.defaultPartitionCountProvider()
- );
-
raftManager.appendEntriesRequestInterceptor(new
CheckCatalogVersionOnAppendEntries(catalogManager));
raftManager.actionRequestInterceptor(new
CheckCatalogVersionOnActionRequest(catalogManager));
indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark,
metaStorageManager);
- schemaManager = new SchemaManager(registry, catalogManager);
-
- schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
-
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
-
- schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker,
delayDurationMsSupplier);
-
MinimumRequiredTimeCollectorService minTimeCollectorService = new
MinimumRequiredTimeCollectorServiceImpl();
catalogCompactionRunner = new CatalogCompactionRunner(
@@ -783,6 +789,7 @@ public class Node {
sharedTxStateStorage,
metaStorageManager,
schemaManager,
+ validationSchemasSource,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
threadPoolsManager.commonScheduler(),
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index dc21b2be0a4..57c85c38f28 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -573,7 +573,7 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
.txCoordinatorId(id)
.requiredCatalogVersion(0)
.leaseStartTime(0L)
- .safeTime(now)
+ .safeTime(clock.now())
.build();
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index 57ac98eb864..9049f871372 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -232,24 +232,7 @@ public class TxFinishReplicaRequestHandler {
private static void
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult,
TransactionResult txResult) {
if (!validationResult.isSuccessful()) {
- if (validationResult.isTableDropped()) {
- throw new IncompatibleSchemaAbortException(
- format("Commit failed because a table was already
dropped [table={}]", validationResult.failedTableName()),
- txResult
- );
- } else {
- throw new IncompatibleSchemaAbortException(
- format(
- "Commit failed because schema is not
forward-compatible "
- + "[fromSchemaVersion={},
toSchemaVersion={}, table={}, details={}]",
- validationResult.fromSchemaVersion(),
- validationResult.toSchemaVersion(),
- validationResult.failedTableName(),
- validationResult.details()
- ),
- txResult
- );
- }
+ throw new
IncompatibleSchemaAbortException(validationResult.validationFailedMessage(),
txResult);
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
index 7757887e788..c37bf0ab0cb 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
@@ -20,14 +20,10 @@ package
org.apache.ignite.internal.partition.replicator.network.command;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.PropertyName;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
-import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.util.CollectionUtils;
-import org.jetbrains.annotations.Nullable;
/**
* State machine command for updating a batch of entries.
@@ -35,17 +31,9 @@ import org.jetbrains.annotations.Nullable;
* <p>This command is replaced with {@link UpdateAllCommandV2} and only exists
in the source code for backward compatibility.</p>
*/
@Transferable(PartitionReplicationMessageGroup.Commands.UPDATE_ALL_V1)
-public interface UpdateAllCommand extends PartitionCommand {
- @PropertyName("tablePartitionId")
- ReplicationGroupIdMessage commitPartitionId();
-
+public interface UpdateAllCommand extends UpdateCommandBase {
Map<UUID, TimedBinaryRowMessage> messageRowsToUpdate();
- UUID txCoordinatorId();
-
- /** Lease start time, hybrid timestamp as long, see {@link
HybridTimestamp#longValue()}. Should be non-null for the full transactions.*/
- @Nullable Long leaseStartTime();
-
/**
* Returns the timestamps of the last committed entries for each row.
*/
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
index 56c7eb50250..831b9866a0d 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
@@ -19,10 +19,8 @@ package
org.apache.ignite.internal.partition.replicator.network.command;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.PropertyName;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
-import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.schema.BinaryRow;
import org.jetbrains.annotations.Nullable;
@@ -32,19 +30,11 @@ import org.jetbrains.annotations.Nullable;
* <p>This command is replaced with {@link UpdateCommandV2} and only exists in
the source code for backward compatibility.</p>
*/
@Transferable(Commands.UPDATE_V1)
-public interface UpdateCommand extends PartitionCommand {
- @PropertyName("tablePartitionId")
- ReplicationGroupIdMessage commitPartitionId();
-
+public interface UpdateCommand extends UpdateCommandBase {
UUID rowUuid();
@Nullable TimedBinaryRowMessage messageRowToUpdate();
- UUID txCoordinatorId();
-
- /** Lease start time, hybrid timestamp as long, see {@link
HybridTimestamp#longValue()}. Should be non-null for the full transactions.*/
- @Nullable Long leaseStartTime();
-
/** Returns the row to update or {@code null} if the row should be
removed. */
default @Nullable BinaryRow rowToUpdate() {
TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommandBase.java
similarity index 56%
copy from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
copy to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommandBase.java
index 56c7eb50250..4b3b47de04e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommandBase.java
@@ -20,42 +20,18 @@ package
org.apache.ignite.internal.partition.replicator.network.command;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.annotations.PropertyName;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
-import org.apache.ignite.internal.schema.BinaryRow;
import org.jetbrains.annotations.Nullable;
/**
- * State machine command to update a row specified by a row ID.
- *
- * <p>This command is replaced with {@link UpdateCommandV2} and only exists in
the source code for backward compatibility.</p>
+ * Base for commands executing updates to the storage.
*/
-@Transferable(Commands.UPDATE_V1)
-public interface UpdateCommand extends PartitionCommand {
+public interface UpdateCommandBase extends PartitionCommand {
@PropertyName("tablePartitionId")
ReplicationGroupIdMessage commitPartitionId();
- UUID rowUuid();
-
- @Nullable TimedBinaryRowMessage messageRowToUpdate();
-
UUID txCoordinatorId();
/** Lease start time, hybrid timestamp as long, see {@link
HybridTimestamp#longValue()}. Should be non-null for the full transactions.*/
@Nullable Long leaseStartTime();
-
- /** Returns the row to update or {@code null} if the row should be
removed. */
- default @Nullable BinaryRow rowToUpdate() {
- TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
-
- return tsRoMsg == null ? null : tsRoMsg.binaryRow();
- }
-
- /** Returns the timestamp of the last committed entry. */
- default @Nullable HybridTimestamp lastCommitTimestamp() {
- TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
-
- return tsRoMsg == null ? null : tsRoMsg.timestamp();
- }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index c5dd6d3aa74..498e61a70a1 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -282,7 +282,7 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
@Nullable HybridTimestamp safeTimestamp
) {
if (tableProcessors.isEmpty()) {
- return new CommandResult(null, lastAppliedIndex < commandIndex);
+ return new CommandResult(null, commandIndex > lastAppliedIndex);
}
boolean wasApplied = false;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
index 2a8a3af7d21..2c221eb674e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.partition.replicator.schemacompat;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
import org.jetbrains.annotations.Nullable;
/**
@@ -154,4 +156,25 @@ public class CompatValidationResult {
return details;
}
+
+ /**
+ * Returns error message corresponding to validation failure. Should only
be called for a failed validation result, otherwise an
+ * assertion error may be thrown.
+ */
+ public String validationFailedMessage() {
+ assert !isSuccessful() : "Should not be called on a successful result";
+
+ if (isTableDropped()) {
+ return format("Commit failed because a table was already dropped
[table={}]", failedTableName());
+ } else {
+ return format(
+ "Commit failed because schema is not forward-compatible "
+ + "[fromSchemaVersion={}, toSchemaVersion={},
table={}, details={}]",
+ fromSchemaVersion(),
+ toSchemaVersion(),
+ failedTableName(),
+ details()
+ );
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
index 31cddfe735b..4dedbcb3541 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
@@ -65,7 +65,7 @@ public class SchemaCompatibilityValidator {
}
/**
- * Performs commit validation. That is, checks that each table enlisted in
the tranasction still exists at the commit timestamp,
+ * Performs commit validation. That is, checks that each table enlisted in
the transaction still exists at the commit timestamp,
* and that the initial schema of the table (identified by the begin
timestamp) is forward-compatible with the commit schema
* (identified by the commit timestamp).
*
@@ -84,7 +84,8 @@ public class SchemaCompatibilityValidator {
// Using compareTo() instead of after()/begin() because the latter
methods take clock skew into account
// which only makes sense when comparing 'unrelated' timestamps.
beginTs and commitTs have a causal relationship,
// so we don't need to account for clock skew.
- assert commitTimestamp.compareTo(beginTimestamp) > 0;
+ assert commitTimestamp.compareTo(beginTimestamp) > 0
+ : "Commit timestamp " + commitTimestamp + " is not after begin
timestamp " + beginTimestamp;
return schemaSyncService.waitForMetadataCompleteness(commitTimestamp)
.thenApply(ignored -> validateCommit(enlistedTableIds,
commitTimestamp, beginTimestamp));
@@ -132,7 +133,7 @@ public class SchemaCompatibilityValidator {
) {
List<FullTableSchema> tableSchemas =
validationSchemasSource.tableSchemaVersionsBetween(tableId, beginTimestamp,
commitTimestamp);
- assert !tableSchemas.isEmpty();
+ assert !tableSchemas.isEmpty() : "No table schemas for table " +
tableId + " between " + beginTimestamp + " and " + commitTimestamp;
for (int i = 0; i < tableSchemas.size() - 1; i++) {
FullTableSchema oldSchema = tableSchemas.get(i);
@@ -243,7 +244,7 @@ public class SchemaCompatibilityValidator {
CatalogTableDescriptor tableAtBeginTs =
catalogService.activeCatalog(beginTs.longValue()).table(tableId);
CatalogTableDescriptor tableAtOpTs =
catalogService.activeCatalog(operationTimestamp.longValue()).table(tableId);
- assert tableAtBeginTs != null : "No table " + tableId + " at ts " +
tableAtBeginTs;
+ assert tableAtBeginTs != null : "No table " + tableId + " at ts " +
beginTs;
if (tableAtOpTs == null) {
throw
IncompatibleSchemaVersionException.tableDropped(tableAtBeginTs.name());
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 12c2e7bbcc8..108b6aed850 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -123,6 +123,7 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.internal.worker.ThreadAssertions;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -244,6 +245,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
() -> Long.MAX_VALUE,
failureManager,
null,
+ new PermissiveSafeTimeValidator(),
topologyAwareRaftGroupServiceFactory,
raftManager,
RaftGroupOptionsConfigurer.EMPTY,
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index dddcbb3bc45..11dd0145497 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -23,6 +23,7 @@ import
org.apache.ignite.internal.raft.storage.LogStorageManager;
import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
import org.jetbrains.annotations.Nullable;
/**
@@ -64,6 +65,8 @@ public class RaftGroupOptions {
*/
private boolean isSystemGroup = false;
+ private @Nullable SafeTimeValidator safeTimeValidator;
+
/**
* Gets a system group flag.
*
@@ -267,4 +270,13 @@ public class RaftGroupOptions {
public int maxClockSkew() {
return maxClockSkewMs;
}
+
+ public @Nullable SafeTimeValidator safeTimeValidator() {
+ return safeTimeValidator;
+ }
+
+ public RaftGroupOptions safeTimeValidator(@Nullable SafeTimeValidator
safeTimeValidator) {
+ this.safeTimeValidator = safeTimeValidator;
+ return this;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index b03cf6b25d9..15dcbf16017 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -508,6 +508,9 @@ public class JraftServerImpl implements RaftServer {
if (groupOptions.commandsMarshaller() != null) {
nodeOptions.setCommandsMarshaller(groupOptions.commandsMarshaller());
}
+ if (groupOptions.safeTimeValidator() != null) {
+
nodeOptions.setSafeTimeValidator(groupOptions.safeTimeValidator());
+ }
nodeOptions.setFsm(
new DelegatingStateMachine(nodeId, lsnr, nodeOptions,
failureManager));
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 99ffb0ee2aa..d373a55f6c5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -101,6 +101,8 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.ReadOnlyOption;
import org.apache.ignite.raft.jraft.option.ReadOnlyServiceOptions;
import org.apache.ignite.raft.jraft.option.ReplicatorGroupOptions;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidationResult;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseBuilder;
import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
@@ -1743,6 +1745,14 @@ public class NodeImpl implements Node, RaftServerService
{
}
continue;
}
+
+ // To prevent safe timestamp values from becoming stale, we
must assign them under a valid leader lock.
+ safeTs = tryAssignSafeTimestamp(task, safeTs);
+
+ if (rejectCommandIfSafeTimeIsNotAcceptable(safeTs, task)) {
+ continue;
+ }
+
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(),
task.done)) {
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, new
Status(RaftError.EINTERNAL, "Fail to append task."));
@@ -1750,9 +1760,6 @@ public class NodeImpl implements Node, RaftServerService {
continue;
}
- // To prevent safe timestamp values from becoming stale, we
must assign them under a valid leader lock.
- safeTs = tryAssignSafeTimestamp(task, safeTs);
-
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
@@ -1788,6 +1795,31 @@ public class NodeImpl implements Node, RaftServerService
{
return safeTs;
}
+ private boolean rejectCommandIfSafeTimeIsNotAcceptable(@Nullable
HybridTimestamp safeTs, LogEntryAndClosure task) {
+ if (safeTs != null && task.done instanceof
SafeTimeAwareCommandClosure) {
+ SafeTimeAwareCommandClosure closure =
(SafeTimeAwareCommandClosure) task.done;
+
+ SafeTimeValidator safeTimeValidator =
this.getOptions().getSafeTimeValidator();
+ if (safeTimeValidator.shouldValidateFor(closure.command())) {
+ SafeTimeValidationResult validationResult =
safeTimeValidator.validate(groupId, closure.command(), safeTs);
+
+ if (!validationResult.valid()) {
+ RaftError raftError = validationResult.shouldRetry() ?
RaftError.EBUSY : RaftError.EREJECTED_BY_VALIDATOR;
+ Utils.runClosureInThread(
+ this.getOptions().getCommonExecutor(),
+ task.done,
+ new Status(raftError,
validationResult.errorMessage())
+ );
+ task.reset();
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
/**
* Builds a status for 'Cannot apply because this node is not a leader'
situation.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
index 60dc70f2185..ddfca461f4a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
@@ -240,7 +240,13 @@ public enum RaftError {
/**
* Permission denied
*/
- EACCES(1016);
+ EACCES(1016),
+
+ /**
+ * Rejected by user-supplied validator. If ActionRequest gets such a
response, the corresponding command has not been either saved
+ * to log or applied on any node; also, the Raft client should not retry
the same ActionRequest attempt as it will never succeed.
+ */
+ EREJECTED_BY_VALIDATOR(20000);
private static final Map<Integer, RaftError> RAFT_ERROR_MAP = new
HashMap<>();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 7ae720af618..2a49a10d71b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -314,6 +314,8 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
Utils.MAX_COLLECTOR_SIZE_PER_SERVER
);
+ private SafeTimeValidator safeTimeValidator = new
PermissiveSafeTimeValidator();
+
public NodeOptions() {
raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
}
@@ -794,6 +796,7 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
nodeOptions.setNodeManager(this.getNodeManager());
nodeOptions.setAppendEntriesByteBufferCollectorPool(appendEntriesByteBufferCollectorPool);
nodeOptions.setRaftMetrics(raftMetrics);
+ nodeOptions.setSafeTimeValidator(this.getSafeTimeValidator());
return nodeOptions;
}
@@ -860,4 +863,14 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
public void
setAppendEntriesByteBufferCollectorPool(ByteBufferCollectorPool
appendEntriesByteBufferCollectorPool) {
this.appendEntriesByteBufferCollectorPool =
appendEntriesByteBufferCollectorPool;
}
+
+ /** Gets {@link SafeTimeValidator} to be used by the node. */
+ public SafeTimeValidator getSafeTimeValidator() {
+ return safeTimeValidator;
+ }
+
+ /** Sets {@link SafeTimeValidator} to be used by the node. */
+ public void setSafeTimeValidator(SafeTimeValidator safeTimeValidator) {
+ this.safeTimeValidator = safeTimeValidator;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/PermissiveSafeTimeValidator.java
similarity index 50%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
copy to
modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/PermissiveSafeTimeValidator.java
index 442af48311c..ce11f6d1ffb 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/PermissiveSafeTimeValidator.java
@@ -14,27 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.ignite.raft.jraft.option;
-package org.apache.ignite.internal.table.distributed.schema;
-
-import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.WriteCommand;
/**
- * Logic that allows to determine whether the logcal Catalog version is
sufficient.
+ * Validator that never rejects anything.
*/
-public class CatalogVersionSufficiency {
- private CatalogVersionSufficiency() {
- // Deny instantiation.
+public class PermissiveSafeTimeValidator implements SafeTimeValidator {
+ @Override
+ public boolean shouldValidateFor(WriteCommand command) {
+ return false;
}
- /**
- * Determines whether the local Catalog version is sufficient.
- *
- * @param requiredCatalogVersion Minimal catalog version that is required
to present.
- * @param catalogService Catalog service.
- * @return {@code true} iff the local Catalog version is sufficient.
- */
- public static boolean isMetadataAvailableFor(int requiredCatalogVersion,
CatalogService catalogService) {
- return
catalogService.catalogReadyFuture(requiredCatalogVersion).isDone();
+ @Override
+ public SafeTimeValidationResult validate(String groupId, WriteCommand
command, HybridTimestamp safeTime) {
+ return SafeTimeValidationResult.forValid();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidationResult.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidationResult.java
new file mode 100644
index 00000000000..633b14a0139
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidationResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.option;
+
+/** Result of safe time validation by {@link SafeTimeValidator}. */
+public class SafeTimeValidationResult {
+ private static final SafeTimeValidationResult VALID = new
SafeTimeValidationResult(true, false, "");
+
+ private final boolean valid;
+ private final boolean shouldRetry;
+ private final String errorMessage;
+
+ /** Returns valid result. */
+ public static SafeTimeValidationResult forValid() {
+ return VALID;
+ }
+
+ /** Returns result that requires a retry. */
+ public static SafeTimeValidationResult forRetry(String errorMessage) {
+ return new SafeTimeValidationResult(false, true, errorMessage);
+ }
+
+ /** Returns result saying that the ActionRequest is rejected for good.
Such a request should never be retried by the Raft client. */
+ public static SafeTimeValidationResult forRejected(String errorMessage) {
+ return new SafeTimeValidationResult(false, false, errorMessage);
+ }
+
+ private SafeTimeValidationResult(boolean valid, boolean shouldRetry,
String errorMessage) {
+ this.valid = valid;
+ this.shouldRetry = shouldRetry;
+ this.errorMessage = errorMessage;
+ }
+
+ /** Returns whether the safe time and the corresponding command are valid.
*/
+ public boolean valid() {
+ return valid;
+ }
+
+ /** Returns whether the request should be retried or the request is
rejected for good. */
+ public boolean shouldRetry() {
+ return shouldRetry;
+ }
+
+ /** Returns error message going with the failed result. */
+ public String errorMessage() {
+ return errorMessage;
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidator.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidator.java
new file mode 100644
index 00000000000..74083d624b9
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.option;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.raft.jraft.error.RaftError;
+
+/**
+ * Allows to implement validation for safe time in Raft. If a command's safe
time is invalid, it gets rejected just before being added
+ * to the leader's log (and hence does not get replicated). A rejected command
can either be rejected for retry (and then the Raft client
+ * should try again later) or for good (in which case the Raft client should
not retry the operation and just communicate the error
+ * to the caller instead).
+ */
+public interface SafeTimeValidator {
+ /** Whether to validate safe time for the given command. */
+ boolean shouldValidateFor(WriteCommand command);
+
+ /** Whether the given safe time is valid for the given command. */
+ SafeTimeValidationResult validate(String groupId, WriteCommand command,
HybridTimestamp safeTime);
+}
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 4295fc11e2a..d2926be55aa 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -109,6 +109,7 @@ import
org.apache.ignite.internal.topology.TestLogicalTopologyService;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -235,6 +236,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
mock(FailureProcessor.class),
// TODO: IGNITE-22222 can't pass
ThreadLocalPartitionCommandsMarshaller there due to dependency loop
null,
+ new PermissiveSafeTimeValidator(),
topologyAwareRaftGroupServiceFactory,
raftManager,
partitionsConfigurer,
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 8538d99d6d3..72e9814750f 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -135,6 +135,7 @@ import
org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import org.jetbrains.annotations.VisibleForTesting;
@@ -200,6 +201,9 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Raft command marshaller for raft server endpoints starting. */
private final Marshaller raftCommandsMarshaller;
+ /** Raft safe time validator for partition groups. */
+ private final SafeTimeValidator partitionSafeTimeValidator;
+
/** Message handler for placement driver messages. */
private final NetworkMessageHandler placementDriverMessageHandler;
@@ -251,6 +255,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe
time propagation period in ms.
* @param failureProcessor Failure processor.
* @param raftCommandsMarshaller Command marshaller for raft groups
creation.
+ * @param partitionSafeTimeValidator Partition safe time validator.
* @param raftGroupServiceFactory A factory for raft-clients creation.
* @param raftManager The manager made up of songs and words to spite all
my troubles is not so bad at all.
* @param partitionRaftConfigurer Configurer of raft options on raft group
creation.
@@ -272,6 +277,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
LongSupplier idleSafeTimePropagationPeriodMsSupplier,
FailureProcessor failureProcessor,
@Nullable Marshaller raftCommandsMarshaller,
+ SafeTimeValidator partitionSafeTimeValidator,
TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
RaftManager raftManager,
RaftGroupOptionsConfigurer partitionRaftConfigurer,
@@ -293,6 +299,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
this.idleSafeTimePropagationPeriodMsSupplier =
idleSafeTimePropagationPeriodMsSupplier;
this.failureProcessor = failureProcessor;
this.raftCommandsMarshaller = raftCommandsMarshaller;
+ this.partitionSafeTimeValidator = partitionSafeTimeValidator;
this.raftGroupServiceFactory = raftGroupServiceFactory;
this.raftManager = raftManager;
this.partitionRaftConfigurer = partitionRaftConfigurer;
@@ -873,6 +880,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
raftGroupOptions.snapshotStorageFactory(snapshotFactory);
raftGroupOptions.maxClockSkew((int) clockService.maxClockSkewMillis());
raftGroupOptions.commandsMarshaller(raftCommandsMarshaller);
+ raftGroupOptions.safeTimeValidator(partitionSafeTimeValidator);
// TODO: The options will be used by Loza only. Consider rafactoring.
see https://issues.apache.org/jira/browse/IGNITE-18273
partitionRaftConfigurer.configure(raftGroupOptions);
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index f48729ecb0a..aac4bcb46ec 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -163,6 +164,7 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new NoOpFailureManager(),
marshaller,
+ new PermissiveSafeTimeValidator(),
raftGroupServiceFactory,
raftManager,
partitionsConfigurer,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 03bab008c47..beb61a0502a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -167,6 +167,8 @@ import
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAf
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
@@ -214,6 +216,7 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -599,6 +602,27 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
zoneId -> completedFuture(Set.of())
);
+ var schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
+
+ LongSupplier delayDurationMsSupplier = () ->
TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
+
+ var schemaSyncService = new
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
+
+ var catalogManager = new CatalogManagerImpl(
+ new UpdateLogImpl(metaStorageMgr, failureProcessor),
+ clockService,
+ failureProcessor,
+ delayDurationMsSupplier,
+ PartitionCountProvider.defaultPartitionCountProvider()
+ );
+
+ var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
+
+ SchemaManager schemaManager = new SchemaManager(registry,
catalogManager);
+
+ ValidationSchemasSource validationSchemasSource = new
CatalogValidationSchemasSource(catalogManager, schemaManager);
+
ReplicaManager replicaMgr = new ReplicaManager(
name,
clusterSvc,
@@ -611,6 +635,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
partitionIdleSafeTimePropagationPeriodMsSupplier,
failureProcessor,
new
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()),
+ new PartitionSafeTimeValidator(validationSchemasSource,
catalogManager, schemaSyncService),
topologyAwareRaftGroupServiceFactory,
raftMgr,
partitionRaftConfigurer,
@@ -670,8 +695,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
metricManager
);
- var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
-
DataStorageModules dataStorageModules = new DataStorageModules(
ServiceLoader.load(DataStorageModule.class)
);
@@ -696,20 +719,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
TransactionConfiguration txConfiguration = clusterConfigRegistry
.getConfiguration(TransactionExtensionConfiguration.KEY).transaction();
- LongSupplier delayDurationMsSupplier = () ->
TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
-
- var catalogManager = new CatalogManagerImpl(
- new UpdateLogImpl(metaStorageMgr, failureProcessor),
- clockService,
- failureProcessor,
- delayDurationMsSupplier,
- PartitionCountProvider.defaultPartitionCountProvider()
- );
-
var indexMetaStorage = new IndexMetaStorage(catalogManager,
lowWatermark, metaStorageMgr);
- SchemaManager schemaManager = new SchemaManager(registry,
catalogManager);
-
var dataNodesMock = dataNodesMockByNode.get(idx);
SystemDistributedConfiguration systemDistributedConfiguration =
@@ -737,11 +748,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
}
};
- var schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
-
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
-
- var schemaSyncService = new
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
-
var sqlRef = new AtomicReference<IgniteSqlImpl>();
MinimumRequiredTimeCollectorService minTimeCollectorService = new
MinimumRequiredTimeCollectorServiceImpl();
@@ -799,6 +805,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
sharedTxStateStorage,
metaStorageMgr,
schemaManager,
+ validationSchemasSource,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
threadPoolsManager.commonScheduler(),
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java
new file mode 100644
index 00000000000..e8206f5d756
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
+import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowCausedBy;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.schema.SchemaSyncInhibitor;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.TableImpl;
+import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItBlockedSchemaSyncAndRaftCommandExecutionTest extends
ClusterPerTestIntegrationTest {
+ private static final String ZONE_NAME = "TEST_ZONE";
+ private static final String TABLE_NAME = "TEST";
+
+ private LogInspector inspector;
+
+ @Override
+ protected int initialNodes() {
+ return 3;
+ }
+
+ @BeforeEach
+ void prepare() {
+ inspector =
LogInspector.create(CheckCatalogVersionOnAppendEntries.class, true);
+ }
+
+ @AfterEach
+ void cleanup() {
+ inspector.stop();
+ }
+
+ @Test
+ void operationBlockedOnSchemaSyncDoesNotPreventNodeStop() throws Exception
{
+ InhibitorAndFuture inhibitorAndFuture =
producePutHangingDueToSchemaSyncInLeaderStateMachine();
+
+ assertTimeoutPreemptively(Duration.ofSeconds(10), () ->
cluster.stopNode(0));
+
+ //noinspection ThrowableNotThrown
+ assertWillThrowCausedBy(inhibitorAndFuture.future,
NodeStoppingException.class);
+ }
+
+ private InhibitorAndFuture
producePutHangingDueToSchemaSyncInLeaderStateMachine()
+ throws InterruptedException {
+ Ignite node = cluster.node(0);
+
+ createTableWith1PartitionOnAllNodes(node);
+
+ cluster.transferLeadershipTo(0, cluster.solePartitionId(ZONE_NAME));
+
+ KeyValueView<Integer, String> kvView = node.tables()
+ .table(TABLE_NAME)
+ .keyValueView(Integer.class, String.class);
+
+ CompletableFuture<SchemaSyncInhibitor> inhibitorFuture =
startInhibitingSchemaSyncWhenUpdateCommandArrives();
+
+ CompletableFuture<Void> putFuture = kvView.putAsync(null, 1, "one");
+
+ waitTillCommandStartsExecutionAndBlocksOnSchemaSync();
+
+ assertThat(inhibitorFuture, willCompleteSuccessfully());
+
+ return new InhibitorAndFuture(inhibitorFuture.join(), putFuture);
+ }
+
+ private static void createTableWith1PartitionOnAllNodes(Ignite node) {
+ node.sql().executeScript(
+ "CREATE ZONE " + ZONE_NAME + " (REPLICAS 3, PARTITIONS 1)
STORAGE PROFILES ['"
+ + DEFAULT_AIPERSIST_PROFILE_NAME + "'];"
+ + "CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL
VARCHAR) ZONE " + ZONE_NAME + ";"
+ );
+ }
+
+ private CompletableFuture<SchemaSyncInhibitor>
startInhibitingSchemaSyncWhenUpdateCommandArrives() {
+ AtomicBoolean startedInhibiting = new AtomicBoolean();
+ CompletableFuture<SchemaSyncInhibitor> future = new
CompletableFuture<>();
+
+ for (Ignite node : cluster.nodes()) {
+ IgniteImpl igniteImpl = unwrapIgniteImpl(node);
+
+ igniteImpl.dropMessages((recipientName, message) -> {
+ if (message instanceof WriteActionRequest) {
+ WriteActionRequest actionRequest = (WriteActionRequest)
message;
+
+ if (PartitionGroupId.matchesString(actionRequest.groupId())
+ && actionRequest.deserializedCommand() instanceof
UpdateCommand
+ && startedInhibiting.compareAndSet(false, true)) {
+ SchemaSyncInhibitor inhibitor = new
SchemaSyncInhibitor(igniteImpl);
+ inhibitor.startInhibit();
+
+ // Making sure that commitTs (for which we take
partition safe time) will be at least DelayDuration ahead
+ // of Metastorage safe time, so during schema sync
we'll hang until inhibition is over.
+
waitForAllSafeTimesToReach(igniteImpl.clock().current().tick(), igniteImpl);
+
+ future.complete(inhibitor);
+ }
+ }
+
+ return false;
+ });
+ }
+
+ return future;
+ }
+
+ private void waitForAllSafeTimesToReach(HybridTimestamp current, Ignite
nodeToWaitSafeTime) {
+ ZonePartitionResources zonePartitionResources =
unwrapIgniteImpl(nodeToWaitSafeTime)
+ .partitionReplicaLifecycleManager()
+ .zonePartitionResources(cluster.solePartitionId(ZONE_NAME));
+
+ try {
+ zonePartitionResources.safeTimeTracker().waitFor(current).get(10,
SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException(e);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void waitTillCommandStartsExecutionAndBlocksOnSchemaSync()
throws InterruptedException {
+ // Current implementation doesn't actually block any threads, but we
still give it a chance to get stuck if the implementation
+ // gets changed.
+ Thread.sleep(1000);
+ }
+
+ @Test
+ void operationBlockedBySchemaHangOnLeaderSucceedsOnSchemaCaughtUp() throws
Exception {
+ InhibitorAndFuture inhibitorAndFuture =
producePutHangingDueToSchemaSyncInLeaderStateMachine();
+
+ assertThat(inhibitorAndFuture.future, willTimeoutIn(1, SECONDS));
+
+ inhibitorAndFuture.inhibitor.stopInhibit();
+
+ assertThat(inhibitorAndFuture.future, willCompleteSuccessfully());
+
+ waitTillStoragesOnAllNodesHaveOneRow();
+ }
+
+ private void waitTillStoragesOnAllNodesHaveOneRow() {
+ for (Ignite node : cluster.nodes()) {
+ waitTillStorageHasOneNode(node);
+ }
+ }
+
+ private static void waitTillStorageHasOneNode(Ignite node) {
+ TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME));
+ MvPartitionStorage partitionStorage =
table.internalTable().storage().getMvPartition(0);
+ assertThat(partitionStorage, is(notNullValue()));
+
+ bypassingThreadAssertions(() ->
await().until(partitionStorage::estimatedSize, is(1L)));
+ }
+
+ private static class InhibitorAndFuture {
+ private final SchemaSyncInhibitor inhibitor;
+ private final CompletableFuture<Void> future;
+
+ private InhibitorAndFuture(SchemaSyncInhibitor inhibitor,
CompletableFuture<Void> future) {
+ this.inhibitor = inhibitor;
+ this.future = future;
+ }
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItMultiNodeSchemaForwardCompatibilityConsistencyTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItMultiNodeSchemaForwardCompatibilityConsistencyTest.java
new file mode 100644
index 00000000000..e26dc426364
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItMultiNodeSchemaForwardCompatibilityConsistencyTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItMultiNodeSchemaForwardCompatibilityConsistencyTest extends
ItSchemaForwardCompatibilityConsistencyTest {
+ @Override
+ protected int initialNodes() {
+ return 3;
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityConsistencyTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityConsistencyTest.java
new file mode 100644
index 00000000000..3628c93c614
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityConsistencyTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IncompatibleSchemaException;
+import org.apache.ignite.tx.Transaction;
+import org.jspecify.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+@ExtendWith(ExecutorServiceExtension.class)
+abstract class ItSchemaForwardCompatibilityConsistencyTest extends
ClusterPerTestIntegrationTest {
+ private static final String ZONE_NAME = "TEST_ZONE";
+
+ private Ignite node0;
+
+ @InjectExecutorService
+ private ExecutorService executor;
+
+ @Override
+ protected abstract int initialNodes();
+
+ @BeforeEach
+ void prepare() {
+ node0 = cluster.node(0);
+
+ node0.sql()
+ .execute(
+ "CREATE ZONE " + ZONE_NAME + " (REPLICAS " +
initialNodes() + ") STORAGE PROFILES ['"
+ + DEFAULT_AIPERSIST_PROFILE_NAME + "']"
+ )
+ .close();
+ }
+
+ @ParameterizedTest
+ @EnumSource(SinglePartitionOperation.class)
+ void
forwardIncompatibleSchemaChangesCannotBeCreatedBy1PcTransactions(SinglePartitionOperation
operation) throws Exception {
+ for (int attempt = 0; attempt < 20; attempt++) {
+
testForwardIncompatibleSchemaChangesCannotBeCreatedBy1PcTransactions(operation,
attempt);
+ }
+ }
+
+ private void
testForwardIncompatibleSchemaChangesCannotBeCreatedBy1PcTransactions(SinglePartitionOperation
operation, int attempt)
+ throws InterruptedException {
+ String tableName = "TABLE" + attempt;
+
+ createTable(tableName);
+
+ CountDownLatch startedPutting = new CountDownLatch(1);
+ AtomicBoolean performWrites = new AtomicBoolean(true);
+ AtomicInteger rowsAdded = new AtomicInteger();
+
+ CompletableFuture<Void> putterFuture = CompletableFuture.runAsync(()
-> {
+ try {
+ makeImplicitPut(0, tableName, operation);
+ rowsAdded.incrementAndGet();
+ } finally {
+ startedPutting.countDown();
+ }
+
+ for (int i = 1; performWrites.get(); i++) {
+ makeImplicitPut(i, tableName, operation);
+ rowsAdded.incrementAndGet();
+ }
+ }, executor);
+
+ assertTrue(startedPutting.await(10, SECONDS));
+ applyForwardIncompatibleSchemaChange(tableName);
+
+ performWrites.set(false);
+ assertThat(putterFuture, willCompleteSuccessfully());
+
+ for (Ignite node : cluster.nodes()) {
+ List<ReadResult> readResults = readRowsFromTable(node, tableName,
rowsAdded.get());
+
+ assertThatSchemaVersionIsConsistentWithCommitTs(readResults,
lastSchemaChangeActivationTs(tableName));
+ }
+ }
+
+ private void createTable(String tableName) {
+ cluster.doInSession(0, session -> {
+ executeUpdate(
+ "CREATE TABLE " + tableName + " (id INT PRIMARY KEY, val
INT, column_to_drop INT) ZONE " + ZONE_NAME,
+ session
+ );
+ });
+ }
+
+ private void applyForwardIncompatibleSchemaChange(String tableName) {
+ cluster.doInSession(0, session -> {
+ executeUpdate("ALTER TABLE " + tableName + " DROP COLUMN
column_to_drop", session);
+ });
+ }
+
+ private HybridTimestamp lastSchemaChangeActivationTs(String tableName) {
+ IgniteImpl igniteImpl = unwrapIgniteImpl(node0);
+
+ Catalog currentCatalog =
igniteImpl.catalogManager().activeCatalog(igniteImpl.clock().nowLong());
+
+ CatalogTableDescriptor tableDescriptor =
currentCatalog.table(QualifiedName.DEFAULT_SCHEMA_NAME, tableName);
+ assertThat(tableDescriptor, is(notNullValue()));
+ assertThat(tableDescriptor.latestSchemaVersion(), is(2));
+
+ return HybridTimestamp.hybridTimestamp(currentCatalog.time());
+ }
+
+ private void makeImplicitPut(int value, String tableName,
SinglePartitionOperation singlePartitionOperation) {
+ makePutInTransaction(value, null, tableName, singlePartitionOperation);
+ }
+
+ private void makeExplicitPut(int value, String tableName,
SinglePartitionOperation singlePartitionOperation) {
+ node0.transactions().runInTransaction((Transaction tx) ->
makePutInTransaction(value, tx, tableName, singlePartitionOperation));
+ }
+
+ private void makePutInTransaction(int value, @Nullable Transaction tx,
String tableName, SinglePartitionOperation operation) {
+ operation.put(tx, value, value, tableName, node0);
+ }
+
+ private static List<ReadResult> readRowsFromTable(Ignite node, String
tableName, int expectedRowCount) {
+ List<ReadResult> readResults = readRowsFromTable(node, tableName);
+
+ assertThat(readResults, hasSize(equalTo(expectedRowCount)));
+
+ return readResults;
+ }
+
+ private static List<ReadResult> readRowsFromTable(Ignite node, String
tableName) {
+ try (ResultSet<SqlRow> resultSet = node.sql().execute("SELECT * FROM "
+ tableName)) {
+ while (resultSet.hasNext()) {
+ resultSet.next();
+ }
+ }
+
+ List<ReadResult> readResults = new ArrayList<>();
+
+ IgniteImpl ignite = unwrapIgniteImpl(node);
+ TableImpl table =
unwrapTableImpl(requireNonNull(ignite.distributedTableManager().cachedTable(tableName)));
+
+ for (int partitionIndex = 0; partitionIndex < DEFAULT_PARTITION_COUNT;
partitionIndex++) {
+ collectRowsFromPartition(table, partitionIndex, readResults);
+ }
+
+ return readResults;
+ }
+
+ private static void collectRowsFromPartition(TableImpl table, int
partitionIndex, List<ReadResult> readResults) {
+ MvPartitionStorage partitionStorage =
table.internalTable().storage().getMvPartition(partitionIndex);
+
+ if (partitionStorage != null) {
+ bypassingThreadAssertions(() -> {
+ try (PartitionTimestampCursor cursor =
partitionStorage.scan(HybridTimestamp.MAX_VALUE)) {
+ for (ReadResult readResult : cursor) {
+ readResults.add(readResult);
+ }
+ }
+ });
+ }
+ }
+
+ private static void assertThatSchemaVersionIsConsistentWithCommitTs(
+ List<ReadResult> readResults,
+ HybridTimestamp incompatibleChangeActivationTs
+ ) {
+ for (ReadResult readResult : readResults) {
+ HybridTimestamp commitTimestamp = readResult.commitTimestamp();
+ assertThat(commitTimestamp, is(notNullValue()));
+
+ BinaryRow binaryRow = readResult.binaryRow();
+ assertThat(binaryRow, is(notNullValue()));
+
+ if (commitTimestamp.compareTo(incompatibleChangeActivationTs) < 0)
{
+ assertThat(binaryRow.schemaVersion(), is(1));
+ } else {
+ assertThat(
+ "According to commitTs, schema version must be 2, but
it's still 1; this should never happen for "
+ + "forward incompatible schema changes",
+ binaryRow.schemaVersion(), is(2)
+ );
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(SinglePartitionOperation.class)
+ void
forwardIncompatibleSchemaChangesCannotBeCreatedBy2PcTransactions(SinglePartitionOperation
operation) throws Exception {
+ for (int attempt = 0; attempt < 10; attempt++) {
+
testForwardIncompatibleSchemaChangesCannotBeCreatedBy2PcTransactions(operation,
attempt);
+ }
+ }
+
+ private void
testForwardIncompatibleSchemaChangesCannotBeCreatedBy2PcTransactions(SinglePartitionOperation
operation, int attempt)
+ throws InterruptedException {
+ String tableName = "TABLE" + attempt;
+
+ createTable(tableName);
+
+ CountDownLatch startedPutting = new CountDownLatch(1);
+ AtomicBoolean performWrites = new AtomicBoolean(true);
+ AtomicInteger rowsAdded = new AtomicInteger();
+
+ CompletableFuture<Void> putterFuture = CompletableFuture.runAsync(()
-> {
+ try {
+ makeExplicitPut(0, tableName, operation);
+ rowsAdded.incrementAndGet();
+ } finally {
+ startedPutting.countDown();
+ }
+
+ for (int i = 1; performWrites.get(); i++) {
+ try {
+ makeExplicitPut(i, tableName, operation);
+ rowsAdded.incrementAndGet();
+ } catch (Exception e) {
+ if (!hasCause(e, IncompatibleSchemaException.class)) {
+ throw e;
+ }
+ }
+ }
+ }, executor);
+
+ assertTrue(startedPutting.await(10, SECONDS));
+ applyForwardIncompatibleSchemaChange(tableName);
+
+ performWrites.set(false);
+ assertThat(putterFuture, willCompleteSuccessfully());
+
+ for (Ignite node : cluster.nodes()) {
+ List<ReadResult> readResults = readRowsFromTable(node, tableName,
rowsAdded.get());
+
+ assertThatSchemaVersionIsConsistentWithCommitTs(readResults,
lastSchemaChangeActivationTs(tableName));
+ }
+ }
+
+ private static KeyValueView<Tuple, Tuple> kvView(String tableName, Ignite
ignite) {
+ return ignite.tables().table(tableName).keyValueView();
+ }
+
+ private static Tuple keyTuple(int key) {
+ return Tuple.create().set("id", key);
+ }
+
+ private static Tuple valueTuple(int value) {
+ return Tuple.create().set("val", value);
+ }
+
+ private enum SinglePartitionOperation {
+ KV_SINGLE {
+ @Override
+ void put(@Nullable Transaction tx, int key, int value, String
tableName, Ignite ignite) {
+ kvView(tableName, ignite).put(tx, keyTuple(key),
valueTuple(value));
+ }
+ },
+ KV_BATCH {
+ @Override
+ void put(@Nullable Transaction tx, int key, int value, String
tableName, Ignite ignite) {
+ kvView(tableName, ignite).putAll(tx, Map.of(keyTuple(key),
valueTuple(value)));
+ }
+ },
+ SQL_SINGLE {
+ @Override
+ void put(@Nullable Transaction tx, int key, int value, String
tableName, Ignite ignite) {
+ ignite.sql()
+ .execute(tx, "INSERT INTO " + tableName + " (ID, VAL)
VALUES (" + key + ", " + value + ")")
+ .close();
+ }
+ };
+
+ abstract void put(@Nullable Transaction tx, int key, int value, String
tableName, Ignite ignite);
+ }
+}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSingleNodeSchemaForwardCompatibilityConsistencyTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSingleNodeSchemaForwardCompatibilityConsistencyTest.java
new file mode 100644
index 00000000000..757a93238b1
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSingleNodeSchemaForwardCompatibilityConsistencyTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItSingleNodeSchemaForwardCompatibilityConsistencyTest extends
ItSchemaForwardCompatibilityConsistencyTest {
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f5c29d42a91..0727c4ab1bd 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -198,6 +198,8 @@ import
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAf
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import org.apache.ignite.internal.raft.Loza;
@@ -270,6 +272,7 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
@@ -922,6 +925,15 @@ public class IgniteImpl implements Ignite {
volatileLogStorageManagerCreator = new
VolatileLogStorageManagerCreator(name,
workDir.resolve("volatile-log-spillout"));
+ schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
+
+ SchemaSyncService schemaSyncService = new
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
+
+ schemaManager = new SchemaManager(registry, catalogManager);
+
+ ValidationSchemasSource validationSchemasSource = new
CatalogValidationSchemasSource(catalogManager, schemaManager);
+
replicaMgr = new ReplicaManager(
name,
clusterSvc,
@@ -934,6 +946,7 @@ public class IgniteImpl implements Ignite {
partitionIdleSafeTimePropagationPeriodMsSupplier,
failureManager,
raftMarshaller,
+ new PartitionSafeTimeValidator(validationSchemasSource,
catalogManager, schemaSyncService),
topologyAwareRaftGroupServiceFactory,
raftMgr,
partitionRaftConfigurer,
@@ -998,13 +1011,6 @@ public class IgniteImpl implements Ignite {
raftMgr.appendEntriesRequestInterceptor(new
CheckCatalogVersionOnAppendEntries(catalogManager));
raftMgr.actionRequestInterceptor(new
CheckCatalogVersionOnActionRequest(catalogManager));
- schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
-
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
-
- SchemaSyncService schemaSyncService = new
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
-
- schemaManager = new SchemaManager(registry, catalogManager);
-
distributionZoneManager = new DistributionZoneManager(
name,
() -> clusterSvc.topologyService().localMember().id(),
@@ -1149,6 +1155,7 @@ public class IgniteImpl implements Ignite {
sharedTxStateStorage,
metaStorageMgr,
schemaManager,
+ validationSchemasSource,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
threadPoolsManager.commonScheduler(),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index d100a87c34d..65624b8e156 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -219,6 +220,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
new NoOpFailureManager(),
mock(ThreadLocalPartitionCommandsMarshaller.class),
+ new PermissiveSafeTimeValidator(),
mock(TopologyAwareRaftGroupServiceFactory.class),
raftManager,
RaftGroupOptionsConfigurer.EMPTY,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e19a1b5abaa..afb9b727471 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -126,8 +126,8 @@ import
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.Zone
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
-import
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
import
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
@@ -304,6 +304,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final SchemaVersions schemaVersions;
+ private final ValidationSchemasSource validationSchemasSource;
+
private final PartitionReplicatorNodeRecovery
partitionReplicatorNodeRecovery;
/** Ends at the {@link IgniteComponent#stopAsync(ComponentContext)} with
an {@link NodeStoppingException}. */
@@ -417,6 +419,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
TxStateRocksDbSharedStorage txStateRocksDbSharedStorage,
MetaStorageManager metaStorageMgr,
SchemaManager schemaManager,
+ ValidationSchemasSource validationSchemasSource,
ExecutorService ioExecutor,
Executor partitionOperationsExecutor,
ScheduledExecutorService commonScheduler,
@@ -447,6 +450,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.dataStorageMgr = dataStorageMgr;
this.metaStorageMgr = metaStorageMgr;
this.schemaManager = schemaManager;
+ this.validationSchemasSource = validationSchemasSource;
this.ioExecutor = ioExecutor;
this.partitionOperationsExecutor = partitionOperationsExecutor;
this.clockService = clockService;
@@ -1065,7 +1069,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
safeTimeTracker,
transactionStateResolver,
partitionUpdateHandlers.storageUpdateHandler,
- new CatalogValidationSchemasSource(catalogService,
schemaManager),
+ validationSchemasSource,
localNode(),
executorInclinedSchemaSyncService,
catalogService,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionSafeTimeValidator.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionSafeTimeValidator.java
new file mode 100644
index 00000000000..db646d50b77
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionSafeTimeValidator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.command.TableAwareCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidationResult;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
+
+/**
+ * Validator for partition commands.
+ *
+ * <ul>
+ * <li>requests a retry for full (1PC) update commands that require metadata
to be available by safe time if that metadata
+ * is not yet available;</li>
+ * <li>rejects full (1PC) update commands whose commitTs fails schema
compatibility validation (the caller is expected to retry the
+ * corresponding implicit transaction).</li>
+ * </ul>
+ */
+public class PartitionSafeTimeValidator implements SafeTimeValidator {
+ private static final IgniteLogger LOG =
Loggers.forClass(PartitionSafeTimeValidator.class);
+
+ private final SchemaCompatibilityValidator schemaCompatibilityValidator;
+
+ public PartitionSafeTimeValidator(
+ ValidationSchemasSource validationSchemasSource,
+ CatalogService catalogService,
+ SchemaSyncService schemaSyncService
+ ) {
+ schemaCompatibilityValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ }
+
+ @Override
+ public boolean shouldValidateFor(WriteCommand command) {
+ return command instanceof UpdateCommandBase
+ && ((UpdateCommandBase) command).full()
+ && command instanceof TableAwareCommand;
+ }
+
+ @Override
+ public SafeTimeValidationResult validate(String groupId, WriteCommand
command, HybridTimestamp safeTime) {
+ UpdateCommandBase updateCommand = (UpdateCommandBase) command;
+
+ CompletableFuture<CompatValidationResult> future =
schemaCompatibilityValidator.validateCommit(
+ updateCommand.txId(),
+ Set.of(((TableAwareCommand) updateCommand).tableId()),
+ safeTime
+ );
+
+ if (!future.isDone()) {
+ String template = "Metadata not yet available by safe time,
rejecting ActionRequest with EBUSY [group={}, safeTs={}].";
+
+ // TODO: IGNITE-20298 - throttle logging.
+ LOG.warn(template, groupId, safeTime);
+
+ return
SafeTimeValidationResult.forRetry(IgniteStringFormatter.format(template,
groupId, safeTime));
+ }
+
+ CompatValidationResult compatibilityValidationResult = future.join();
+
+ if (compatibilityValidationResult.isSuccessful()) {
+ return SafeTimeValidationResult.forValid();
+ } else {
+ return
SafeTimeValidationResult.forRejected(compatibilityValidationResult.validationFailedMessage());
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ff0706ad805..e0d2b048f5c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -202,6 +202,7 @@ import
org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
import
org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
@@ -209,6 +210,8 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -2401,6 +2404,22 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return raftCommandApplicator.applyCommandWithExceptionHandling(cmd);
}
+ private static CommandApplicationResult
throwIfFullTxCommitSchemaValidationFailedDuringReplication(
+ CommandApplicationResult res,
+ Throwable ex
+ ) {
+ if (ex != null) {
+ Throwable rootCause = ExceptionUtils.unwrapRootCause(ex);
+ if (rootCause instanceof RaftException && ((RaftException)
rootCause).raftError() == RaftError.EREJECTED_BY_VALIDATOR) {
+ throw new
IncompatibleSchemaVersionException(rootCause.getMessage());
+ }
+
+ sneakyThrow(ex);
+ }
+
+ return res;
+ }
+
/**
* Executes an Update command.
*
@@ -2520,7 +2539,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return completedFuture(new
CommandApplicationResult(safeTs, null));
}
- });
+
}).handle(PartitionReplicaListener::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
}
}
@@ -2658,7 +2677,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return completedFuture(new
CommandApplicationResult(safeTs, null));
}
- });
+
}).handle(PartitionReplicaListener::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
}
}
@@ -3542,7 +3561,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
UUID txId,
boolean full,
UUID txCoordinatorId,
- @Nullable HybridTimestamp initiatorTime,
+ HybridTimestamp initiatorTime,
int catalogVersion,
@Nullable Long leaseStartTime
) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
index 65f5f9773aa..df9461a0e2d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
@@ -17,10 +17,11 @@
package org.apache.ignite.internal.table.distributed.schema;
-import static
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+import static
org.apache.ignite.internal.table.distributed.schema.MetadataSufficiency.isMetadataAvailableForCatalogVersion;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.marshaller.PartitionCommandsMarshaller;
@@ -83,20 +84,18 @@ public class CheckCatalogVersionOnActionRequest implements
ActionRequestIntercep
OptimizedMarshaller.ORDER));
if (requiredCatalogVersion >= 0) {
- if (!isMetadataAvailableFor(requiredCatalogVersion,
catalogService)) {
- // TODO: IGNITE-20298 - throttle logging.
- LOG.warn(
- "Metadata not yet available, rejecting ActionRequest
with EBUSY [group={}, requiredLevel={}].",
- request.groupId(), requiredCatalogVersion
+ if (!isMetadataAvailableForCatalogVersion(requiredCatalogVersion,
catalogService)) {
+ String message = IgniteStringFormatter.format(
+ "Metadata not yet available by catalog version,
rejecting ActionRequest with EBUSY [group={}, requiredLevel={}].",
+ request.groupId(),
+ requiredCatalogVersion
);
+ // TODO: IGNITE-20298 - throttle logging.
+ LOG.warn(message);
+
return RaftRpcFactory.DEFAULT //
- .newResponse(
- node.getRaftOptions().getRaftMessagesFactory(),
- RaftError.EBUSY,
- "Metadata not yet available, rejecting
ActionRequest with EBUSY [group=%s, requiredLevel=%d].",
- request.groupId(), requiredCatalogVersion
- );
+
.newResponse(node.getRaftOptions().getRaftMessagesFactory(), RaftError.EBUSY,
message);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
index 52f52176ee9..d71b22a5f03 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
@@ -18,10 +18,11 @@
package org.apache.ignite.internal.table.distributed.schema;
import static
org.apache.ignite.internal.partition.replicator.marshaller.PartitionCommandsMarshaller.NO_VERSION_REQUIRED;
-import static
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+import static
org.apache.ignite.internal.table.distributed.schema.MetadataSufficiency.isMetadataAvailableForCatalogVersion;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.marshaller.PartitionCommandsMarshaller;
@@ -67,19 +68,21 @@ public class CheckCatalogVersionOnAppendEntries implements
AppendEntriesRequestI
for (RaftOutter.EntryMeta entry : request.entriesList()) {
int requiredCatalogVersion =
readRequiredCatalogVersionForMeta(allData, entry,
node.getOptions().getCommandsMarshaller());
- if (requiredCatalogVersion != NO_VERSION_REQUIRED &&
!isMetadataAvailableFor(requiredCatalogVersion, catalogService)) {
- // TODO: IGNITE-20298 - throttle logging.
- LOG.warn(
+ if (requiredCatalogVersion != NO_VERSION_REQUIRED
+ &&
!isMetadataAvailableForCatalogVersion(requiredCatalogVersion, catalogService)) {
+ String message = IgniteStringFormatter.format(
"Metadata not yet available, rejecting
AppendEntriesRequest with EBUSY [group={}, requiredLevel={}].",
request.groupId(), requiredCatalogVersion
);
+ // TODO: IGNITE-20298 - throttle logging.
+ LOG.warn(message);
+
return RaftRpcFactory.DEFAULT //
.newResponse(
node.getRaftOptions().getRaftMessagesFactory(),
RaftError.EBUSY,
- "Metadata not yet available, rejecting
AppendEntriesRequest with EBUSY [group=%s, requiredLevel=%d].",
- request.groupId(), requiredCatalogVersion
+ message
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiency.java
similarity index 83%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiency.java
index 442af48311c..a19ee37fc4f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiency.java
@@ -20,10 +20,10 @@ package org.apache.ignite.internal.table.distributed.schema;
import org.apache.ignite.internal.catalog.CatalogService;
/**
- * Logic that allows to determine whether the logcal Catalog version is
sufficient.
+ * Logic that allows to determine whether the local schema metadata is
sufficient.
*/
-public class CatalogVersionSufficiency {
- private CatalogVersionSufficiency() {
+public class MetadataSufficiency {
+ private MetadataSufficiency() {
// Deny instantiation.
}
@@ -34,7 +34,7 @@ public class CatalogVersionSufficiency {
* @param catalogService Catalog service.
* @return {@code true} iff the local Catalog version is sufficient.
*/
- public static boolean isMetadataAvailableFor(int requiredCatalogVersion,
CatalogService catalogService) {
+ public static boolean isMetadataAvailableForCatalogVersion(int
requiredCatalogVersion, CatalogService catalogService) {
return
catalogService.catalogReadyFuture(requiredCatalogVersion).isDone();
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index db52dad31cf..64d5286ee2c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -115,6 +115,7 @@ import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
@@ -163,6 +164,7 @@ import
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
import org.apache.ignite.sql.IgniteSql;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -448,6 +450,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
failureProcessor,
null,
+ new PermissiveSafeTimeValidator(),
mock(TopologyAwareRaftGroupServiceFactory.class),
rm,
RaftGroupOptionsConfigurer.EMPTY,
@@ -573,6 +576,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
sharedTxStateStorage,
metaStorageManager,
sm,
+ mock(ValidationSchemasSource.class),
partitionOperationsExecutor,
partitionOperationsExecutor,
scheduledExecutor,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index a3068cd7042..03ebd18c989 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -585,6 +586,7 @@ public class TableManagerTest extends IgniteAbstractTest {
sharedTxStateStorage,
msm,
sm,
+ mock(ValidationSchemasSource.class),
partitionOperationsExecutor,
partitionOperationsExecutor,
scheduledExecutor,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index b284ce1a622..d96a5d45554 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -104,6 +104,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -147,6 +148,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.CatalogVe
import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
@@ -251,6 +253,8 @@ import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.sql.ColumnType;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.tx.TransactionException;
@@ -2106,6 +2110,68 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
.map(Arguments::of);
}
+ @ParameterizedTest
+ @MethodSource("singleRowWriteRequestTypes")
+ public void singleRowWritesRespectFailedSchemaValidationResult(RequestType
requestType) {
+ RwListenerInvocation invocation;
+
+ if (requestType == RW_DELETE || requestType == RW_GET_AND_DELETE) {
+ invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, true);
+ } else {
+ invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, true);
+ }
+
+ testWritesRespectFailedSchemaValidationResult(requestType, invocation);
+ }
+
+ @Test
+ public void replaceRequestRespectsFailedSchemaValidationResult() {
+ RwListenerInvocation invocation = (targetTxId, key) ->
doReplaceRequest(
+ targetTxId,
+ marshalKeyOrKeyValue(RW_REPLACE, key),
+ marshalKeyOrKeyValue(RW_REPLACE, key),
+ true
+ );
+
+ testWritesRespectFailedSchemaValidationResult(RW_REPLACE, invocation);
+ }
+
+ @ParameterizedTest
+ @MethodSource("multiRowsWriteRequestTypes")
+ public void multiRowWritesRespectFailedSchemaValidationResult(RequestType
requestType) {
+ RwListenerInvocation invocation;
+
+ if (requestType == RW_DELETE_ALL) {
+ invocation = (targetTxId, key)
+ -> doMultiRowPkRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, true);
+ } else {
+ invocation = (targetTxId, key)
+ -> doMultiRowRequest(targetTxId,
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, true);
+ }
+
+ testWritesRespectFailedSchemaValidationResult(requestType, invocation);
+ }
+
+ private void testWritesRespectFailedSchemaValidationResult(RequestType
requestType, RwListenerInvocation listenerInvocation) {
+ TestKey key = nextKey();
+
+ if (RequestTypes.looksUpFirst(requestType)) {
+ upsertInNewTxFor(key);
+
+ // While handling the upsert, our mocks were touched, let's reset
them to prevent false-positives during verification.
+ Mockito.reset(schemaSyncService);
+ }
+
+ UUID targetTxId = newTxId();
+
+ when(mockRaftClient.run(any(UpdateCommandBase.class)))
+ .thenReturn(failedFuture(new CompletionException(new
RaftException(RaftError.EREJECTED_BY_VALIDATOR))));
+
+ CompletableFuture<?> future = listenerInvocation.invoke(targetTxId,
key);
+
+ assertThat(future,
willThrow(IncompatibleSchemaVersionException.class));
+ }
+
@CartesianTest
@CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
void singleRowRwOperationsFailIfTableAlteredAfterTxStart(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
index 6d1eb3370a1..b51cceaa935 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
@@ -25,9 +25,9 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;
@@ -164,7 +164,7 @@ class CheckCatalogVersionOnActionRequestTest extends
BaseIgniteAbstractTest {
ErrorResponse errorResponse = (ErrorResponse) result;
assertThat(errorResponse.errorCode(), is(RaftError.EBUSY.getNumber()));
assertThat(errorResponse.errorMsg(),
- is("Metadata not yet available, rejecting ActionRequest with
EBUSY [group=test, requiredLevel=6]."));
+ is("Metadata not yet available by catalog version, rejecting
ActionRequest with EBUSY [group=test, requiredLevel=6]."));
}
@ParameterizedTest
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiencyTest.java
similarity index 83%
rename from
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
rename to
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiencyTest.java
index 0bf845cc107..3aa161640fc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiencyTest.java
@@ -32,7 +32,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
-class CatalogVersionSufficiencyTest extends BaseIgniteAbstractTest {
+class MetadataSufficiencyTest extends BaseIgniteAbstractTest {
@Mock
private CatalogService catalogService;
@@ -40,20 +40,20 @@ class CatalogVersionSufficiencyTest extends
BaseIgniteAbstractTest {
void exceedingLocalVersionIsSufficient() {
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(nullCompletedFuture());
- assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(8,
catalogService));
+ assertTrue(MetadataSufficiency.isMetadataAvailableForCatalogVersion(8,
catalogService));
}
@Test
void equalLocalVersionIsSufficient() {
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(nullCompletedFuture());
- assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(10,
catalogService));
+
assertTrue(MetadataSufficiency.isMetadataAvailableForCatalogVersion(10,
catalogService));
}
@Test
- void lowerLocalVersionIsSufficient() {
+ void lowerLocalVersionIsInsufficient() {
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(new
CompletableFuture<>());
- assertFalse(CatalogVersionSufficiency.isMetadataAvailableFor(12,
catalogService));
+
assertFalse(MetadataSufficiency.isMetadataAvailableForCatalogVersion(12,
catalogService));
}
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/CompoundValidationSchemasSource.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/CompoundValidationSchemasSource.java
new file mode 100644
index 00000000000..fdce26e2d3f
--- /dev/null
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/CompoundValidationSchemasSource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.partition.replicator.schema.FullTableSchema;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+
+class CompoundValidationSchemasSource implements ValidationSchemasSource {
+ private final Map<Integer, ValidationSchemasSource> schemasSources = new
ConcurrentHashMap<>();
+
+ void registerSource(int table, ValidationSchemasSource source) {
+ schemasSources.put(table, source);
+ }
+
+ @Override
+ public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int
schemaVersion) {
+ return
requiredSourceForTable(tableId).waitForSchemaAvailability(tableId,
schemaVersion);
+ }
+
+ private ValidationSchemasSource requiredSourceForTable(int tableId) {
+ return requireNonNull(schemasSources.get(tableId), "No source for
table " + tableId);
+ }
+
+ @Override
+ public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+ return
requiredSourceForTable(tableId).tableSchemaVersionsBetween(tableId,
fromIncluding, toIncluding);
+ }
+
+ @Override
+ public List<FullTableSchema> tableSchemaVersionsBetween(int tableId,
HybridTimestamp fromIncluding, int toTableVersionIncluding) {
+ return
requiredSourceForTable(tableId).tableSchemaVersionsBetween(tableId,
fromIncluding, toTableVersionIncluding);
+ }
+}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index d108b2372bb..31dde335c40 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -156,6 +156,7 @@ import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
import
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
@@ -284,6 +285,10 @@ public class ItTxTestCluster {
protected String localNodeName;
+ private SchemaSyncService schemaSyncService;
+
+ private final CompoundValidationSchemasSource validationSchemasSource =
new CompoundValidationSchemasSource();
+
private final Map<String, Map<ZonePartitionId, ZonePartitionRaftListener>>
zonePartitionRaftGroupListeners = new HashMap<>();
private final Map<String, Map<ZonePartitionId,
ZonePartitionReplicaListener>> zonePartitionReplicaListeners = new HashMap<>();
@@ -487,6 +492,8 @@ public class ItTxTestCluster {
new RaftGroupEventsClientListener()
);
+ schemaSyncService = new AlwaysSyncedSchemaSyncService();
+
ReplicaManager replicaMgr = new ReplicaManager(
nodeName,
clusterService,
@@ -499,6 +506,7 @@ public class ItTxTestCluster {
this::getSafeTimePropagationTimeout,
new NoOpFailureManager(),
commandMarshaller,
+ new PartitionSafeTimeValidator(validationSchemasSource,
catalogService, schemaSyncService),
raftClientFactory,
raftSrv,
partitionRaftConfigurer,
@@ -772,7 +780,10 @@ public class ItTxTestCluster {
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
);
- DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schemaDescriptor);
+ var schemaManager = new
DummySchemaManagerImpl(schemaDescriptor);
+ var tableValidationSchemasSource = new
DummyValidationSchemasSource(schemaManager);
+
+ validationSchemasSource.registerSource(tableId,
tableValidationSchemasSource);
RaftGroupListener raftGroupListener =
getOrCreateAndPopulateRaftGroupListener(
assignment,
@@ -804,9 +815,9 @@ public class ItTxTestCluster {
txStateStorage,
transactionStateResolver,
storageUpdateHandler,
- new
DummyValidationSchemasSource(schemaManager),
+ validationSchemasSource,
nodeResolver.getByConsistentId(assignment),
- new AlwaysSyncedSchemaSyncService(),
+ schemaSyncService,
catalogService,
placementDriver,
nodeResolver,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3c9cd225edf..59ab5d1541d 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -473,6 +473,9 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
ZonePartitionId zonePartitionId = new ZonePartitionId(ZONE_ID,
PART_ID);
+ var validationSchemasSource = new
DummyValidationSchemasSource(schemaManager);
+ var schemaSyncService = new AlwaysSyncedSchemaSyncService();
+
var tableReplicaListener = new PartitionReplicaListener(
mvPartStorage,
svc,
@@ -488,9 +491,9 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
safeTime,
transactionStateResolver,
storageUpdateHandler,
- new DummyValidationSchemasSource(schemaManager),
+ validationSchemasSource,
LOCAL_NODE,
- new AlwaysSyncedSchemaSyncService(),
+ schemaSyncService,
catalogService,
placementDriver,
mock(ClusterNodeResolver.class),
@@ -512,8 +515,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
txStateStorage.getOrCreatePartitionStorage(PART_ID),
CLOCK_SERVICE,
this.txManager,
- new DummyValidationSchemasSource(schemaManager),
- new AlwaysSyncedSchemaSyncService(),
+ validationSchemasSource,
+ schemaSyncService,
catalogService,
placementDriver,
mock(ClusterNodeResolver.class),