This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eeecdfd56e3 IGNITE-27460 Add schema compatibility validation for full 
commands (#7598)
eeecdfd56e3 is described below

commit eeecdfd56e3acf753df7e0b11fca1df22b08647b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Feb 20 21:03:49 2026 +0400

    IGNITE-27460 Add schema compatibility validation for full commands (#7598)
---
 .../asserts/CompletableFutureAssert.java           |  68 ++++-
 .../rebalance/ItRebalanceDistributedTest.java      |   4 +
 .../partition/replicator/fixtures/Node.java        |  41 +--
 .../ItZonePartitionRaftListenerRecoveryTest.java   |   2 +-
 .../handlers/TxFinishReplicaRequestHandler.java    |  19 +-
 .../network/command/UpdateAllCommand.java          |  14 +-
 .../replicator/network/command/UpdateCommand.java  |  12 +-
 .../{UpdateCommand.java => UpdateCommandBase.java} |  28 +-
 .../replicator/raft/ZonePartitionRaftListener.java |   2 +-
 .../schemacompat/CompatValidationResult.java       |  23 ++
 .../schemacompat/SchemaCompatibilityValidator.java |   9 +-
 .../PartitionReplicaLifecycleManagerTest.java      |   2 +
 .../internal/raft/server/RaftGroupOptions.java     |  12 +
 .../internal/raft/server/impl/JraftServerImpl.java |   3 +
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  38 ++-
 .../apache/ignite/raft/jraft/error/RaftError.java  |   8 +-
 .../ignite/raft/jraft/option/NodeOptions.java      |  13 +
 .../jraft/option/PermissiveSafeTimeValidator.java} |  27 +-
 .../jraft/option/SafeTimeValidationResult.java     |  63 ++++
 .../raft/jraft/option/SafeTimeValidator.java       |  36 +++
 .../ItPlacementDriverReplicaSideTest.java          |   2 +
 .../ignite/internal/replicator/ReplicaManager.java |   8 +
 .../internal/replicator/ReplicaManagerTest.java    |   2 +
 .../runner/app/ItIgniteNodeRestartTest.java        |  45 +--
 ...ockedSchemaSyncAndRaftCommandExecutionTest.java | 213 +++++++++++++
 ...eSchemaForwardCompatibilityConsistencyTest.java |  29 ++
 ...tSchemaForwardCompatibilityConsistencyTest.java | 330 +++++++++++++++++++++
 ...eSchemaForwardCompatibilityConsistencyTest.java |  29 ++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  21 +-
 .../ignite/distributed/ReplicaUnavailableTest.java |   2 +
 .../internal/table/distributed/TableManager.java   |   8 +-
 .../raft/PartitionSafeTimeValidator.java           |  94 ++++++
 .../replicator/PartitionReplicaListener.java       |  25 +-
 .../schema/CheckCatalogVersionOnActionRequest.java |  23 +-
 .../schema/CheckCatalogVersionOnAppendEntries.java |  15 +-
 ...onSufficiency.java => MetadataSufficiency.java} |   8 +-
 .../distributed/TableManagerRecoveryTest.java      |   4 +
 .../table/distributed/TableManagerTest.java        |   2 +
 .../replication/PartitionReplicaListenerTest.java  |  66 +++++
 .../CheckCatalogVersionOnActionRequestTest.java    |   6 +-
 ...iencyTest.java => MetadataSufficiencyTest.java} |  10 +-
 .../CompoundValidationSchemasSource.java           |  55 ++++
 .../apache/ignite/distributed/ItTxTestCluster.java |  17 +-
 .../table/impl/DummyInternalTableImpl.java         |  11 +-
 44 files changed, 1267 insertions(+), 182 deletions(-)

diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
index f78ec4ccd66..191297512ea 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java
@@ -25,6 +25,8 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
+import org.apache.ignite.internal.util.ExceptionUtils;
 
 /**
  * Assertions related to {@link CompletableFuture}.
@@ -34,7 +36,7 @@ public class CompletableFutureAssert {
      * Asserts that the given future completes with an exception being an 
instance of the given class and returns
      * that exception for further examination.
      *
-     * <p>Unlike {@link 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher#willThrowFast(Class)},
+     * <p>Unlike {@link 
CompletableFutureExceptionMatcher#willThrowFast(Class)},
      * this method allows to examine the actual exception thrown further in 
the test.
      *
      * @param future Future to work on.
@@ -54,7 +56,7 @@ public class CompletableFutureAssert {
      * that exception for further examination.
      *
      * <p>Unlike
-     * {@link 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher#willThrow(Class,
 int, TimeUnit)},
+     * {@link CompletableFutureExceptionMatcher#willThrow(Class, int, 
TimeUnit)},
      * this method allows to examine the actual exception thrown further in 
the test.
      *
      * @param future Future to work on.
@@ -71,7 +73,7 @@ public class CompletableFutureAssert {
      * that exception for further examination.
      *
      * <p>Unlike
-     * {@link 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher#willThrow(Class,
 int, TimeUnit)},
+     * {@link CompletableFutureExceptionMatcher#willThrow(Class, int, 
TimeUnit)},
      * this method allows to examine the actual exception thrown further in 
the test.
      *
      * @param future Future to work on.
@@ -117,6 +119,66 @@ public class CompletableFutureAssert {
                 + ", but it completed normally with result " + normalResult);
     }
 
+    /**
+     * Asserts that the given future completes with an exception having a 
cause (transitively) which is an instance of the given class
+     * (in time) and returns that exception for further examination.
+     *
+     * <p>Unlike
+     * {@link CompletableFutureExceptionMatcher#willThrow(Class, int, 
TimeUnit)},
+     * this method allows to examine the actual exception thrown further in 
the test.
+     *
+     * @param future Future to work on.
+     * @param expectedCauseClass Expected class of the exception.
+     * @return Matched exception.
+     */
+    public static Throwable assertWillThrowCausedBy(
+            CompletableFuture<?> future,
+            Class<?> expectedCauseClass
+    ) {
+        return assertWillThrowCausedBy(future, expectedCauseClass, 10, 
SECONDS);
+    }
+
+    /**
+     * Asserts that the given future completes with an exception having a 
cause (transitively) which is an instance of the given class
+     * (in time) and returns that exception for further examination.
+     *
+     * <p>Unlike
+     * {@link CompletableFutureExceptionMatcher#willThrow(Class, int, 
TimeUnit)},
+     * this method allows to examine the actual exception thrown further in 
the test.
+     *
+     * @param future Future to work on.
+     * @param expectedCauseClass Expected class of the exception.
+     * @param timeout Duration to wait for future completion.
+     * @param timeUnit Time unit of the duration.
+     * @return Matched exception.
+     */
+    public static Throwable assertWillThrowCausedBy(
+            CompletableFuture<?> future,
+            Class<?> expectedCauseClass,
+            long timeout,
+            TimeUnit timeUnit
+    ) {
+        Object normalResult;
+
+        try {
+            normalResult = future.get(timeout, timeUnit);
+        } catch (Throwable e) {
+            if (ExceptionUtils.hasCause(e, expectedCauseClass)) {
+                // The user actually expects this exception, let's return it.
+                return e;
+            }
+
+            return fail(
+                    "Expected the future to be completed with an exception 
with a cause of class "
+                            + expectedCauseClass + " , but got something 
different",
+                    e
+            );
+        }
+
+        return fail("Expected the future to be completed with an exception 
with a cause of class " + expectedCauseClass
+                + ", but it completed normally with result " + normalResult);
+    }
+
     private static Throwable unwrapThrowable(Throwable e) {
         while (e instanceof ExecutionException || e instanceof 
CompletionException) {
             e = e.getCause();
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index ff04b0d95ac..37a200bddd0 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -180,6 +180,7 @@ import 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
@@ -260,6 +261,7 @@ import 
org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
 import 
org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersAndLearnersAsyncRequest;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -1481,6 +1483,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     partitionIdleSafeTimePropagationPeriodMsSupplier,
                     new NoOpFailureManager(),
                     new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
+                    new PermissiveSafeTimeValidator(),
                     topologyAwareRaftGroupServiceFactory,
                     raftManager,
                     partitionRaftConfigurer,
@@ -1581,6 +1584,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     sharedTxStateStorage,
                     metaStorageManager,
                     schemaManager,
+                    new CatalogValidationSchemasSource(catalogManager, 
schemaManager),
                     threadPoolsManager.tableIoExecutor(),
                     threadPoolsManager.partitionOperationsExecutor(),
                     threadPoolsManager.commonScheduler(),
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 3a0edc1930a..f0306d79b66 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -124,6 +124,8 @@ import 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -173,6 +175,7 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import 
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
@@ -639,6 +642,25 @@ public class Node {
 
         volatileLogStorageManagerCreator = new 
VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" 
+ name));
 
+        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
+        
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
+
+        LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
+
+        schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, 
delayDurationMsSupplier);
+
+        catalogManager = new CatalogManagerImpl(
+                new UpdateLogImpl(metaStorageManager, failureManager),
+                clockService,
+                failureManager,
+                delayDurationMsSupplier,
+                PartitionCountProvider.defaultPartitionCountProvider()
+        );
+
+        schemaManager = new SchemaManager(registry, catalogManager);
+
+        ValidationSchemasSource validationSchemasSource = new 
CatalogValidationSchemasSource(catalogManager, schemaManager);
+
         replicaManager = new ReplicaManager(
                 name,
                 clusterService,
@@ -651,6 +673,7 @@ public class Node {
                 partitionIdleSafeTimePropagationPeriodMsSupplier,
                 new NoOpFailureManager(),
                 new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()),
+                new PartitionSafeTimeValidator(validationSchemasSource, 
catalogManager, schemaSyncService),
                 topologyAwareRaftGroupServiceFactory,
                 raftManager,
                 partitionRaftConfigurer,
@@ -661,28 +684,11 @@ public class Node {
                 threadPoolsManager.commonScheduler()
         );
 
-        LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
-
-        catalogManager = new CatalogManagerImpl(
-                new UpdateLogImpl(metaStorageManager, failureManager),
-                clockService,
-                failureManager,
-                delayDurationMsSupplier,
-                PartitionCountProvider.defaultPartitionCountProvider()
-        );
-
         raftManager.appendEntriesRequestInterceptor(new 
CheckCatalogVersionOnAppendEntries(catalogManager));
         raftManager.actionRequestInterceptor(new 
CheckCatalogVersionOnActionRequest(catalogManager));
 
         indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark, 
metaStorageManager);
 
-        schemaManager = new SchemaManager(registry, catalogManager);
-
-        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
-        
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
-
-        schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, 
delayDurationMsSupplier);
-
         MinimumRequiredTimeCollectorService minTimeCollectorService = new 
MinimumRequiredTimeCollectorServiceImpl();
 
         catalogCompactionRunner = new CatalogCompactionRunner(
@@ -783,6 +789,7 @@ public class Node {
                 sharedTxStateStorage,
                 metaStorageManager,
                 schemaManager,
+                validationSchemasSource,
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 threadPoolsManager.commonScheduler(),
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index dc21b2be0a4..57c85c38f28 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -573,7 +573,7 @@ class ItZonePartitionRaftListenerRecoveryTest extends 
IgniteAbstractTest {
                 .txCoordinatorId(id)
                 .requiredCatalogVersion(0)
                 .leaseStartTime(0L)
-                .safeTime(now)
+                .safeTime(clock.now())
                 .build();
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index 57ac98eb864..9049f871372 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -232,24 +232,7 @@ public class TxFinishReplicaRequestHandler {
 
     private static void 
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult, 
TransactionResult txResult) {
         if (!validationResult.isSuccessful()) {
-            if (validationResult.isTableDropped()) {
-                throw new IncompatibleSchemaAbortException(
-                        format("Commit failed because a table was already 
dropped [table={}]", validationResult.failedTableName()),
-                        txResult
-                );
-            } else {
-                throw new IncompatibleSchemaAbortException(
-                        format(
-                                "Commit failed because schema is not 
forward-compatible "
-                                        + "[fromSchemaVersion={}, 
toSchemaVersion={}, table={}, details={}]",
-                                validationResult.fromSchemaVersion(),
-                                validationResult.toSchemaVersion(),
-                                validationResult.failedTableName(),
-                                validationResult.details()
-                        ),
-                        txResult
-                );
-            }
+            throw new 
IncompatibleSchemaAbortException(validationResult.validationFailedMessage(), 
txResult);
         }
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
index 7757887e788..c37bf0ab0cb 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java
@@ -20,14 +20,10 @@ package 
org.apache.ignite.internal.partition.replicator.network.command;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.PropertyName;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
-import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
 import org.apache.ignite.internal.util.CollectionUtils;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * State machine command for updating a batch of entries.
@@ -35,17 +31,9 @@ import org.jetbrains.annotations.Nullable;
  * <p>This command is replaced with {@link UpdateAllCommandV2} and only exists 
in the source code for backward compatibility.</p>
  */
 @Transferable(PartitionReplicationMessageGroup.Commands.UPDATE_ALL_V1)
-public interface UpdateAllCommand extends PartitionCommand {
-    @PropertyName("tablePartitionId")
-    ReplicationGroupIdMessage commitPartitionId();
-
+public interface UpdateAllCommand extends UpdateCommandBase {
     Map<UUID, TimedBinaryRowMessage> messageRowsToUpdate();
 
-    UUID txCoordinatorId();
-
-    /** Lease start time, hybrid timestamp as long, see {@link 
HybridTimestamp#longValue()}. Should be non-null for the full transactions.*/
-    @Nullable Long leaseStartTime();
-
     /**
      * Returns the timestamps of the last committed entries for each row.
      */
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
index 56c7eb50250..831b9866a0d 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
@@ -19,10 +19,8 @@ package 
org.apache.ignite.internal.partition.replicator.network.command;
 
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.PropertyName;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
-import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.jetbrains.annotations.Nullable;
 
@@ -32,19 +30,11 @@ import org.jetbrains.annotations.Nullable;
  * <p>This command is replaced with {@link UpdateCommandV2} and only exists in 
the source code for backward compatibility.</p>
  */
 @Transferable(Commands.UPDATE_V1)
-public interface UpdateCommand extends PartitionCommand {
-    @PropertyName("tablePartitionId")
-    ReplicationGroupIdMessage commitPartitionId();
-
+public interface UpdateCommand extends UpdateCommandBase {
     UUID rowUuid();
 
     @Nullable TimedBinaryRowMessage messageRowToUpdate();
 
-    UUID txCoordinatorId();
-
-    /** Lease start time, hybrid timestamp as long, see {@link 
HybridTimestamp#longValue()}. Should be non-null for the full transactions.*/
-    @Nullable Long leaseStartTime();
-
     /** Returns the row to update or {@code null} if the row should be 
removed. */
     default @Nullable BinaryRow rowToUpdate() {
         TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommandBase.java
similarity index 56%
copy from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
copy to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommandBase.java
index 56c7eb50250..4b3b47de04e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommand.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommandBase.java
@@ -20,42 +20,18 @@ package 
org.apache.ignite.internal.partition.replicator.network.command;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.annotations.PropertyName;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.Commands;
 import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
-import org.apache.ignite.internal.schema.BinaryRow;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * State machine command to update a row specified by a row ID.
- *
- * <p>This command is replaced with {@link UpdateCommandV2} and only exists in 
the source code for backward compatibility.</p>
+ * Base for commands executing updates to the storage.
  */
-@Transferable(Commands.UPDATE_V1)
-public interface UpdateCommand extends PartitionCommand {
+public interface UpdateCommandBase extends PartitionCommand {
     @PropertyName("tablePartitionId")
     ReplicationGroupIdMessage commitPartitionId();
 
-    UUID rowUuid();
-
-    @Nullable TimedBinaryRowMessage messageRowToUpdate();
-
     UUID txCoordinatorId();
 
     /** Lease start time, hybrid timestamp as long, see {@link 
HybridTimestamp#longValue()}. Should be non-null for the full transactions.*/
     @Nullable Long leaseStartTime();
-
-    /** Returns the row to update or {@code null} if the row should be 
removed. */
-    default @Nullable BinaryRow rowToUpdate() {
-        TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
-
-        return tsRoMsg == null ? null : tsRoMsg.binaryRow();
-    }
-
-    /** Returns the timestamp of the last committed entry. */
-    default @Nullable HybridTimestamp lastCommitTimestamp() {
-        TimedBinaryRowMessage tsRoMsg = messageRowToUpdate();
-
-        return tsRoMsg == null ? null : tsRoMsg.timestamp();
-    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index c5dd6d3aa74..498e61a70a1 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -282,7 +282,7 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
             @Nullable HybridTimestamp safeTimestamp
     ) {
         if (tableProcessors.isEmpty()) {
-            return new CommandResult(null, lastAppliedIndex < commandIndex);
+            return new CommandResult(null, commandIndex > lastAppliedIndex);
         }
 
         boolean wasApplied = false;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
index 2a8a3af7d21..2c221eb674e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.partition.replicator.schemacompat;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -154,4 +156,25 @@ public class CompatValidationResult {
 
         return details;
     }
+
+    /**
+     * Returns error message corresponding to validation failure. Should only 
be called for a failed validation result, otherwise an
+     * assertion error may be thrown.
+     */
+    public String validationFailedMessage() {
+        assert !isSuccessful() : "Should not be called on a successful result";
+
+        if (isTableDropped()) {
+            return format("Commit failed because a table was already dropped 
[table={}]", failedTableName());
+        } else {
+            return format(
+                    "Commit failed because schema is not forward-compatible "
+                            + "[fromSchemaVersion={}, toSchemaVersion={}, 
table={}, details={}]",
+                    fromSchemaVersion(),
+                    toSchemaVersion(),
+                    failedTableName(),
+                    details()
+            );
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
index 31cddfe735b..4dedbcb3541 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/SchemaCompatibilityValidator.java
@@ -65,7 +65,7 @@ public class SchemaCompatibilityValidator {
     }
 
     /**
-     * Performs commit validation. That is, checks that each table enlisted in 
the tranasction still exists at the commit timestamp,
+     * Performs commit validation. That is, checks that each table enlisted in 
the transaction still exists at the commit timestamp,
      * and that the initial schema of the table (identified by the begin 
timestamp) is forward-compatible with the commit schema
      * (identified by the commit timestamp).
      *
@@ -84,7 +84,8 @@ public class SchemaCompatibilityValidator {
         // Using compareTo() instead of after()/begin() because the latter 
methods take clock skew into account
         // which only makes sense when comparing 'unrelated' timestamps. 
beginTs and commitTs have a causal relationship,
         // so we don't need to account for clock skew.
-        assert commitTimestamp.compareTo(beginTimestamp) > 0;
+        assert commitTimestamp.compareTo(beginTimestamp) > 0
+                : "Commit timestamp " + commitTimestamp + " is not after begin 
timestamp " + beginTimestamp;
 
         return schemaSyncService.waitForMetadataCompleteness(commitTimestamp)
                 .thenApply(ignored -> validateCommit(enlistedTableIds, 
commitTimestamp, beginTimestamp));
@@ -132,7 +133,7 @@ public class SchemaCompatibilityValidator {
     ) {
         List<FullTableSchema> tableSchemas = 
validationSchemasSource.tableSchemaVersionsBetween(tableId, beginTimestamp, 
commitTimestamp);
 
-        assert !tableSchemas.isEmpty();
+        assert !tableSchemas.isEmpty() : "No table schemas for table " + 
tableId + " between " + beginTimestamp + " and " + commitTimestamp;
 
         for (int i = 0; i < tableSchemas.size() - 1; i++) {
             FullTableSchema oldSchema = tableSchemas.get(i);
@@ -243,7 +244,7 @@ public class SchemaCompatibilityValidator {
         CatalogTableDescriptor tableAtBeginTs = 
catalogService.activeCatalog(beginTs.longValue()).table(tableId);
         CatalogTableDescriptor tableAtOpTs = 
catalogService.activeCatalog(operationTimestamp.longValue()).table(tableId);
 
-        assert tableAtBeginTs != null : "No table " + tableId + " at ts " + 
tableAtBeginTs;
+        assert tableAtBeginTs != null : "No table " + tableId + " at ts " + 
beginTs;
 
         if (tableAtOpTs == null) {
             throw 
IncompatibleSchemaVersionException.tableDropped(tableAtBeginTs.name());
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 12c2e7bbcc8..108b6aed850 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -123,6 +123,7 @@ import 
org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.SafeTimeValuesTracker;
 import org.apache.ignite.internal.worker.ThreadAssertions;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -244,6 +245,7 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 () -> Long.MAX_VALUE,
                 failureManager,
                 null,
+                new PermissiveSafeTimeValidator(),
                 topologyAwareRaftGroupServiceFactory,
                 raftManager,
                 RaftGroupOptionsConfigurer.EMPTY,
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index dddcbb3bc45..11dd0145497 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -23,6 +23,7 @@ import 
org.apache.ignite.internal.raft.storage.LogStorageManager;
 import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -64,6 +65,8 @@ public class RaftGroupOptions {
      */
     private boolean isSystemGroup = false;
 
+    private @Nullable SafeTimeValidator safeTimeValidator;
+
     /**
      * Gets a system group flag.
      *
@@ -267,4 +270,13 @@ public class RaftGroupOptions {
     public int maxClockSkew() {
         return maxClockSkewMs;
     }
+
+    public @Nullable SafeTimeValidator safeTimeValidator() {
+        return safeTimeValidator;
+    }
+
+    public RaftGroupOptions safeTimeValidator(@Nullable SafeTimeValidator 
safeTimeValidator) {
+        this.safeTimeValidator = safeTimeValidator;
+        return this;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index b03cf6b25d9..15dcbf16017 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -508,6 +508,9 @@ public class JraftServerImpl implements RaftServer {
             if (groupOptions.commandsMarshaller() != null) {
                 
nodeOptions.setCommandsMarshaller(groupOptions.commandsMarshaller());
             }
+            if (groupOptions.safeTimeValidator() != null) {
+                
nodeOptions.setSafeTimeValidator(groupOptions.safeTimeValidator());
+            }
 
             nodeOptions.setFsm(
                     new DelegatingStateMachine(nodeId, lsnr, nodeOptions, 
failureManager));
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 99ffb0ee2aa..d373a55f6c5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -101,6 +101,8 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.option.ReadOnlyOption;
 import org.apache.ignite.raft.jraft.option.ReadOnlyServiceOptions;
 import org.apache.ignite.raft.jraft.option.ReplicatorGroupOptions;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidationResult;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
 import org.apache.ignite.raft.jraft.option.SnapshotExecutorOptions;
 import org.apache.ignite.raft.jraft.rpc.AppendEntriesResponseBuilder;
 import org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
@@ -1743,6 +1745,14 @@ public class NodeImpl implements Node, RaftServerService 
{
                     }
                     continue;
                 }
+
+                // To prevent safe timestamp values from becoming stale, we 
must assign them under a valid leader lock.
+                safeTs = tryAssignSafeTimestamp(task, safeTs);
+
+                if (rejectCommandIfSafeTimeIsNotAcceptable(safeTs, task)) {
+                    continue;
+                }
+
                 if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
                     this.conf.isStable() ? null : this.conf.getOldConf(), 
task.done)) {
                     
Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.done, new 
Status(RaftError.EINTERNAL, "Fail to append task."));
@@ -1750,9 +1760,6 @@ public class NodeImpl implements Node, RaftServerService {
                     continue;
                 }
 
-                // To prevent safe timestamp values from becoming stale, we 
must assign them under a valid leader lock.
-                safeTs = tryAssignSafeTimestamp(task, safeTs);
-
                 // set task entry info before adding to list.
                 task.entry.getId().setTerm(this.currTerm);
                 task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
@@ -1788,6 +1795,31 @@ public class NodeImpl implements Node, RaftServerService 
{
         return safeTs;
     }
 
+    private boolean rejectCommandIfSafeTimeIsNotAcceptable(@Nullable 
HybridTimestamp safeTs, LogEntryAndClosure task) {
+        if (safeTs != null && task.done instanceof 
SafeTimeAwareCommandClosure) {
+            SafeTimeAwareCommandClosure closure = 
(SafeTimeAwareCommandClosure) task.done;
+
+            SafeTimeValidator safeTimeValidator = 
this.getOptions().getSafeTimeValidator();
+            if (safeTimeValidator.shouldValidateFor(closure.command())) {
+                SafeTimeValidationResult validationResult = 
safeTimeValidator.validate(groupId, closure.command(), safeTs);
+
+                if (!validationResult.valid()) {
+                    RaftError raftError = validationResult.shouldRetry() ? 
RaftError.EBUSY : RaftError.EREJECTED_BY_VALIDATOR;
+                    Utils.runClosureInThread(
+                            this.getOptions().getCommonExecutor(),
+                            task.done,
+                            new Status(raftError, 
validationResult.errorMessage())
+                    );
+                    task.reset();
+
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
     /**
      * Builds a status for 'Cannot apply because this node is not a leader' 
situation.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
index 60dc70f2185..ddfca461f4a 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
@@ -240,7 +240,13 @@ public enum RaftError {
     /**
      * Permission denied
      */
-    EACCES(1016);
+    EACCES(1016),
+
+    /**
+     * Rejected by user-supplied validator. If ActionRequest gets such a 
response, the corresponding command has not been either saved
+     * to log or applied on any node; also, the Raft client should not retry 
the same ActionRequest attempt as it will never succeed.
+     */
+    EREJECTED_BY_VALIDATOR(20000);
 
     private static final Map<Integer, RaftError> RAFT_ERROR_MAP = new 
HashMap<>();
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 7ae720af618..2a49a10d71b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -314,6 +314,8 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
             Utils.MAX_COLLECTOR_SIZE_PER_SERVER
     );
 
+    private SafeTimeValidator safeTimeValidator = new 
PermissiveSafeTimeValidator();
+
     public NodeOptions() {
         raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
     }
@@ -794,6 +796,7 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         nodeOptions.setNodeManager(this.getNodeManager());
         
nodeOptions.setAppendEntriesByteBufferCollectorPool(appendEntriesByteBufferCollectorPool);
         nodeOptions.setRaftMetrics(raftMetrics);
+        nodeOptions.setSafeTimeValidator(this.getSafeTimeValidator());
 
         return nodeOptions;
     }
@@ -860,4 +863,14 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     public void 
setAppendEntriesByteBufferCollectorPool(ByteBufferCollectorPool 
appendEntriesByteBufferCollectorPool) {
         this.appendEntriesByteBufferCollectorPool = 
appendEntriesByteBufferCollectorPool;
     }
+
+    /** Gets {@link SafeTimeValidator} to be used by the node. */
+    public SafeTimeValidator getSafeTimeValidator() {
+        return safeTimeValidator;
+    }
+
+    /** Sets {@link SafeTimeValidator} to be used by the node. */
+    public void setSafeTimeValidator(SafeTimeValidator safeTimeValidator) {
+        this.safeTimeValidator = safeTimeValidator;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/PermissiveSafeTimeValidator.java
similarity index 50%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
copy to 
modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/PermissiveSafeTimeValidator.java
index 442af48311c..ce11f6d1ffb 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/PermissiveSafeTimeValidator.java
@@ -14,27 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ignite.raft.jraft.option;
 
-package org.apache.ignite.internal.table.distributed.schema;
-
-import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.WriteCommand;
 
 /**
- * Logic that allows to determine whether the logcal Catalog version is 
sufficient.
+ * Validator that never rejects anything.
  */
-public class CatalogVersionSufficiency {
-    private CatalogVersionSufficiency() {
-        // Deny instantiation.
+public class PermissiveSafeTimeValidator implements SafeTimeValidator {
+    @Override
+    public boolean shouldValidateFor(WriteCommand command) {
+        return false;
     }
 
-    /**
-     * Determines whether the local Catalog version is sufficient.
-     *
-     * @param requiredCatalogVersion Minimal catalog version that is required 
to present.
-     * @param catalogService Catalog service.
-     * @return {@code true} iff the local Catalog version is sufficient.
-     */
-    public static boolean isMetadataAvailableFor(int requiredCatalogVersion, 
CatalogService catalogService) {
-        return 
catalogService.catalogReadyFuture(requiredCatalogVersion).isDone();
+    @Override
+    public SafeTimeValidationResult validate(String groupId, WriteCommand 
command, HybridTimestamp safeTime) {
+        return SafeTimeValidationResult.forValid();
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidationResult.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidationResult.java
new file mode 100644
index 00000000000..633b14a0139
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidationResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.option;
+
+/** Result of safe time validation by {@link SafeTimeValidator}. */
+public class SafeTimeValidationResult {
+    private static final SafeTimeValidationResult VALID = new 
SafeTimeValidationResult(true, false, "");
+
+    private final boolean valid;
+    private final boolean shouldRetry;
+    private final String errorMessage;
+
+    /** Returns valid result. */
+    public static SafeTimeValidationResult forValid() {
+        return VALID;
+    }
+
+    /** Returns result that requires a retry. */
+    public static SafeTimeValidationResult forRetry(String errorMessage) {
+        return new SafeTimeValidationResult(false, true, errorMessage);
+    }
+
+    /** Returns result saying that the ActionRequest is rejected for good. 
Such a request should never be retried by the Raft client. */
+    public static SafeTimeValidationResult forRejected(String errorMessage) {
+        return new SafeTimeValidationResult(false, false, errorMessage);
+    }
+
+    private SafeTimeValidationResult(boolean valid, boolean shouldRetry, 
String errorMessage) {
+        this.valid = valid;
+        this.shouldRetry = shouldRetry;
+        this.errorMessage = errorMessage;
+    }
+
+    /** Returns whether the safe time and the corresponding command are valid. 
*/
+    public boolean valid() {
+        return valid;
+    }
+
+    /** Returns whether the request should be retried or the request is 
rejected for good. */
+    public boolean shouldRetry() {
+        return shouldRetry;
+    }
+
+    /** Returns error message going with the failed result. */
+    public String errorMessage() {
+        return errorMessage;
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidator.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidator.java
new file mode 100644
index 00000000000..74083d624b9
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.option;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.raft.jraft.error.RaftError;
+
+/**
+ * Allows to implement validation for safe time in Raft. If a command's safe 
time is invalid, it gets rejected just before being added
+ * to the leader's log (and hence does not get replicated). A rejected command 
can either be rejected for retry (and then the Raft client
+ * should try again later) or for good (in which case the Raft client should 
not retry the operation and just communicate the error
+ * to the caller instead).
+ */
+public interface SafeTimeValidator {
+    /** Whether to validate safe time for the given command. */
+    boolean shouldValidateFor(WriteCommand command);
+
+    /** Whether the given safe time is valid for the given command. */
+    SafeTimeValidationResult validate(String groupId, WriteCommand command, 
HybridTimestamp safeTime);
+}
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 4295fc11e2a..d2926be55aa 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -109,6 +109,7 @@ import 
org.apache.ignite.internal.topology.TestLogicalTopologyService;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -235,6 +236,7 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                     mock(FailureProcessor.class),
                     // TODO: IGNITE-22222 can't pass 
ThreadLocalPartitionCommandsMarshaller there due to dependency loop
                     null,
+                    new PermissiveSafeTimeValidator(),
                     topologyAwareRaftGroupServiceFactory,
                     raftManager,
                     partitionsConfigurer,
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 8538d99d6d3..72e9814750f 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -135,6 +135,7 @@ import 
org.apache.ignite.internal.util.TrackerClosedException;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 import org.jetbrains.annotations.VisibleForTesting;
@@ -200,6 +201,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     /** Raft command marshaller for raft server endpoints starting. */
     private final Marshaller raftCommandsMarshaller;
 
+    /** Raft safe time validator for partition groups. */
+    private final SafeTimeValidator partitionSafeTimeValidator;
+
     /** Message handler for placement driver messages. */
     private final NetworkMessageHandler placementDriverMessageHandler;
 
@@ -251,6 +255,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param idleSafeTimePropagationPeriodMsSupplier Used to get idle safe 
time propagation period in ms.
      * @param failureProcessor Failure processor.
      * @param raftCommandsMarshaller Command marshaller for raft groups 
creation.
+     * @param partitionSafeTimeValidator Partition safe time validator.
      * @param raftGroupServiceFactory A factory for raft-clients creation.
      * @param raftManager The manager made up of songs and words to spite all 
my troubles is not so bad at all.
      * @param partitionRaftConfigurer Configurer of raft options on raft group 
creation.
@@ -272,6 +277,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             LongSupplier idleSafeTimePropagationPeriodMsSupplier,
             FailureProcessor failureProcessor,
             @Nullable Marshaller raftCommandsMarshaller,
+            SafeTimeValidator partitionSafeTimeValidator,
             TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
             RaftManager raftManager,
             RaftGroupOptionsConfigurer partitionRaftConfigurer,
@@ -293,6 +299,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         this.idleSafeTimePropagationPeriodMsSupplier = 
idleSafeTimePropagationPeriodMsSupplier;
         this.failureProcessor = failureProcessor;
         this.raftCommandsMarshaller = raftCommandsMarshaller;
+        this.partitionSafeTimeValidator = partitionSafeTimeValidator;
         this.raftGroupServiceFactory = raftGroupServiceFactory;
         this.raftManager = raftManager;
         this.partitionRaftConfigurer = partitionRaftConfigurer;
@@ -873,6 +880,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         raftGroupOptions.snapshotStorageFactory(snapshotFactory);
         raftGroupOptions.maxClockSkew((int) clockService.maxClockSkewMillis());
         raftGroupOptions.commandsMarshaller(raftCommandsMarshaller);
+        raftGroupOptions.safeTimeValidator(partitionSafeTimeValidator);
 
         // TODO: The options will be used by Loza only. Consider rafactoring. 
see https://issues.apache.org/jira/browse/IGNITE-18273
         partitionRaftConfigurer.configure(raftGroupOptions);
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index f48729ecb0a..aac4bcb46ec 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -163,6 +164,7 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
                 () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                 new NoOpFailureManager(),
                 marshaller,
+                new PermissiveSafeTimeValidator(),
                 raftGroupServiceFactory,
                 raftManager,
                 partitionsConfigurer,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 03bab008c47..beb61a0502a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -167,6 +167,8 @@ import 
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAf
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
@@ -214,6 +216,7 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import 
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -599,6 +602,27 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 zoneId -> completedFuture(Set.of())
         );
 
+        var schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+        
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
+
+        LongSupplier delayDurationMsSupplier = () -> 
TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
+
+        var schemaSyncService = new 
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
+
+        var catalogManager = new CatalogManagerImpl(
+                new UpdateLogImpl(metaStorageMgr, failureProcessor),
+                clockService,
+                failureProcessor,
+                delayDurationMsSupplier,
+                PartitionCountProvider.defaultPartitionCountProvider()
+        );
+
+        var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
+
+        SchemaManager schemaManager = new SchemaManager(registry, 
catalogManager);
+
+        ValidationSchemasSource validationSchemasSource = new 
CatalogValidationSchemasSource(catalogManager, schemaManager);
+
         ReplicaManager replicaMgr = new ReplicaManager(
                 name,
                 clusterSvc,
@@ -611,6 +635,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 partitionIdleSafeTimePropagationPeriodMsSupplier,
                 failureProcessor,
                 new 
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()),
+                new PartitionSafeTimeValidator(validationSchemasSource, 
catalogManager, schemaSyncService),
                 topologyAwareRaftGroupServiceFactory,
                 raftMgr,
                 partitionRaftConfigurer,
@@ -670,8 +695,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 metricManager
         );
 
-        var registry = new MetaStorageRevisionListenerRegistry(metaStorageMgr);
-
         DataStorageModules dataStorageModules = new DataStorageModules(
                 ServiceLoader.load(DataStorageModule.class)
         );
@@ -696,20 +719,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         TransactionConfiguration txConfiguration = clusterConfigRegistry
                 
.getConfiguration(TransactionExtensionConfiguration.KEY).transaction();
 
-        LongSupplier delayDurationMsSupplier = () -> 
TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
-
-        var catalogManager = new CatalogManagerImpl(
-                new UpdateLogImpl(metaStorageMgr, failureProcessor),
-                clockService,
-                failureProcessor,
-                delayDurationMsSupplier,
-                PartitionCountProvider.defaultPartitionCountProvider()
-        );
-
         var indexMetaStorage = new IndexMetaStorage(catalogManager, 
lowWatermark, metaStorageMgr);
 
-        SchemaManager schemaManager = new SchemaManager(registry, 
catalogManager);
-
         var dataNodesMock = dataNodesMockByNode.get(idx);
 
         SystemDistributedConfiguration systemDistributedConfiguration =
@@ -737,11 +748,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
             }
         };
 
-        var schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
-        
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
-
-        var schemaSyncService = new 
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
-
         var sqlRef = new AtomicReference<IgniteSqlImpl>();
 
         MinimumRequiredTimeCollectorService minTimeCollectorService = new 
MinimumRequiredTimeCollectorServiceImpl();
@@ -799,6 +805,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 sharedTxStateStorage,
                 metaStorageMgr,
                 schemaManager,
+                validationSchemasSource,
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 threadPoolsManager.commonScheduler(),
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java
new file mode 100644
index 00000000000..e8206f5d756
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
+import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowCausedBy;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.schema.SchemaSyncInhibitor;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.TableImpl;
+import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
+import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.apache.ignite.table.KeyValueView;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItBlockedSchemaSyncAndRaftCommandExecutionTest extends 
ClusterPerTestIntegrationTest {
+    private static final String ZONE_NAME = "TEST_ZONE";
+    private static final String TABLE_NAME = "TEST";
+
+    private LogInspector inspector;
+
+    @Override
+    protected int initialNodes() {
+        return 3;
+    }
+
+    @BeforeEach
+    void prepare() {
+        inspector = 
LogInspector.create(CheckCatalogVersionOnAppendEntries.class, true);
+    }
+
+    @AfterEach
+    void cleanup() {
+        inspector.stop();
+    }
+
+    @Test
+    void operationBlockedOnSchemaSyncDoesNotPreventNodeStop() throws Exception 
{
+        InhibitorAndFuture inhibitorAndFuture = 
producePutHangingDueToSchemaSyncInLeaderStateMachine();
+
+        assertTimeoutPreemptively(Duration.ofSeconds(10), () -> 
cluster.stopNode(0));
+
+        //noinspection ThrowableNotThrown
+        assertWillThrowCausedBy(inhibitorAndFuture.future, 
NodeStoppingException.class);
+    }
+
+    private InhibitorAndFuture 
producePutHangingDueToSchemaSyncInLeaderStateMachine()
+            throws InterruptedException {
+        Ignite node = cluster.node(0);
+
+        createTableWith1PartitionOnAllNodes(node);
+
+        cluster.transferLeadershipTo(0, cluster.solePartitionId(ZONE_NAME));
+
+        KeyValueView<Integer, String> kvView = node.tables()
+                .table(TABLE_NAME)
+                .keyValueView(Integer.class, String.class);
+
+        CompletableFuture<SchemaSyncInhibitor> inhibitorFuture = 
startInhibitingSchemaSyncWhenUpdateCommandArrives();
+
+        CompletableFuture<Void> putFuture = kvView.putAsync(null, 1, "one");
+
+        waitTillCommandStartsExecutionAndBlocksOnSchemaSync();
+
+        assertThat(inhibitorFuture, willCompleteSuccessfully());
+
+        return new InhibitorAndFuture(inhibitorFuture.join(), putFuture);
+    }
+
+    private static void createTableWith1PartitionOnAllNodes(Ignite node) {
+        node.sql().executeScript(
+                "CREATE ZONE " + ZONE_NAME + " (REPLICAS 3, PARTITIONS 1) 
STORAGE PROFILES ['"
+                        + DEFAULT_AIPERSIST_PROFILE_NAME + "'];"
+                + "CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL 
VARCHAR) ZONE " + ZONE_NAME + ";"
+        );
+    }
+
+    private CompletableFuture<SchemaSyncInhibitor> 
startInhibitingSchemaSyncWhenUpdateCommandArrives() {
+        AtomicBoolean startedInhibiting = new AtomicBoolean();
+        CompletableFuture<SchemaSyncInhibitor> future = new 
CompletableFuture<>();
+
+        for (Ignite node : cluster.nodes()) {
+            IgniteImpl igniteImpl = unwrapIgniteImpl(node);
+
+            igniteImpl.dropMessages((recipientName, message) -> {
+                if (message instanceof WriteActionRequest) {
+                    WriteActionRequest actionRequest = (WriteActionRequest) 
message;
+
+                    if (PartitionGroupId.matchesString(actionRequest.groupId())
+                            && actionRequest.deserializedCommand() instanceof 
UpdateCommand
+                            && startedInhibiting.compareAndSet(false, true)) {
+                        SchemaSyncInhibitor inhibitor = new 
SchemaSyncInhibitor(igniteImpl);
+                        inhibitor.startInhibit();
+
+                        // Making sure that commitTs (for which we take 
partition safe time) will be at least DelayDuration ahead
+                        // of Metastorage safe time, so during schema sync 
we'll hang until inhibition is over.
+                        
waitForAllSafeTimesToReach(igniteImpl.clock().current().tick(), igniteImpl);
+
+                        future.complete(inhibitor);
+                    }
+                }
+
+                return false;
+            });
+        }
+
+        return future;
+    }
+
+    private void waitForAllSafeTimesToReach(HybridTimestamp current, Ignite 
nodeToWaitSafeTime) {
+        ZonePartitionResources zonePartitionResources = 
unwrapIgniteImpl(nodeToWaitSafeTime)
+                .partitionReplicaLifecycleManager()
+                .zonePartitionResources(cluster.solePartitionId(ZONE_NAME));
+
+        try {
+            zonePartitionResources.safeTimeTracker().waitFor(current).get(10, 
SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        } catch (ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void waitTillCommandStartsExecutionAndBlocksOnSchemaSync() 
throws InterruptedException {
+        // Current implementation doesn't actually block any threads, but we 
still give it a chance to get stuck if the implementation
+        // gets changed.
+        Thread.sleep(1000);
+    }
+
+    @Test
+    void operationBlockedBySchemaHangOnLeaderSucceedsOnSchemaCaughtUp() throws 
Exception {
+        InhibitorAndFuture inhibitorAndFuture = 
producePutHangingDueToSchemaSyncInLeaderStateMachine();
+
+        assertThat(inhibitorAndFuture.future, willTimeoutIn(1, SECONDS));
+
+        inhibitorAndFuture.inhibitor.stopInhibit();
+
+        assertThat(inhibitorAndFuture.future, willCompleteSuccessfully());
+
+        waitTillStoragesOnAllNodesHaveOneRow();
+    }
+
+    private void waitTillStoragesOnAllNodesHaveOneRow() {
+        for (Ignite node : cluster.nodes()) {
+            waitTillStorageHasOneNode(node);
+        }
+    }
+
+    private static void waitTillStorageHasOneNode(Ignite node) {
+        TableImpl table = unwrapTableImpl(node.tables().table(TABLE_NAME));
+        MvPartitionStorage partitionStorage = 
table.internalTable().storage().getMvPartition(0);
+        assertThat(partitionStorage, is(notNullValue()));
+
+        bypassingThreadAssertions(() -> 
await().until(partitionStorage::estimatedSize, is(1L)));
+    }
+
+    private static class InhibitorAndFuture {
+        private final SchemaSyncInhibitor inhibitor;
+        private final CompletableFuture<Void> future;
+
+        private InhibitorAndFuture(SchemaSyncInhibitor inhibitor, 
CompletableFuture<Void> future) {
+            this.inhibitor = inhibitor;
+            this.future = future;
+        }
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItMultiNodeSchemaForwardCompatibilityConsistencyTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItMultiNodeSchemaForwardCompatibilityConsistencyTest.java
new file mode 100644
index 00000000000..e26dc426364
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItMultiNodeSchemaForwardCompatibilityConsistencyTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItMultiNodeSchemaForwardCompatibilityConsistencyTest extends 
ItSchemaForwardCompatibilityConsistencyTest {
+    @Override
+    protected int initialNodes() {
+        return 3;
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityConsistencyTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityConsistencyTest.java
new file mode 100644
index 00000000000..3628c93c614
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityConsistencyTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.QualifiedName;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.IncompatibleSchemaException;
+import org.apache.ignite.tx.Transaction;
+import org.jspecify.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+@ExtendWith(ExecutorServiceExtension.class)
+abstract class ItSchemaForwardCompatibilityConsistencyTest extends 
ClusterPerTestIntegrationTest {
+    private static final String ZONE_NAME = "TEST_ZONE";
+
+    private Ignite node0;
+
+    @InjectExecutorService
+    private ExecutorService executor;
+
+    @Override
+    protected abstract int initialNodes();
+
+    @BeforeEach
+    void prepare() {
+        node0 = cluster.node(0);
+
+        node0.sql()
+                .execute(
+                        "CREATE ZONE " + ZONE_NAME + " (REPLICAS " + 
initialNodes() + ") STORAGE PROFILES ['"
+                                + DEFAULT_AIPERSIST_PROFILE_NAME + "']"
+                )
+                .close();
+    }
+
+    @ParameterizedTest
+    @EnumSource(SinglePartitionOperation.class)
+    void 
forwardIncompatibleSchemaChangesCannotBeCreatedBy1PcTransactions(SinglePartitionOperation
 operation) throws Exception {
+        for (int attempt = 0; attempt < 20; attempt++) {
+            
testForwardIncompatibleSchemaChangesCannotBeCreatedBy1PcTransactions(operation, 
attempt);
+        }
+    }
+
+    private void 
testForwardIncompatibleSchemaChangesCannotBeCreatedBy1PcTransactions(SinglePartitionOperation
 operation, int attempt)
+            throws InterruptedException {
+        String tableName = "TABLE" + attempt;
+
+        createTable(tableName);
+
+        CountDownLatch startedPutting = new CountDownLatch(1);
+        AtomicBoolean performWrites = new AtomicBoolean(true);
+        AtomicInteger rowsAdded = new AtomicInteger();
+
+        CompletableFuture<Void> putterFuture = CompletableFuture.runAsync(() 
-> {
+            try {
+                makeImplicitPut(0, tableName, operation);
+                rowsAdded.incrementAndGet();
+            } finally {
+                startedPutting.countDown();
+            }
+
+            for (int i = 1; performWrites.get(); i++) {
+                makeImplicitPut(i, tableName, operation);
+                rowsAdded.incrementAndGet();
+            }
+        }, executor);
+
+        assertTrue(startedPutting.await(10, SECONDS));
+        applyForwardIncompatibleSchemaChange(tableName);
+
+        performWrites.set(false);
+        assertThat(putterFuture, willCompleteSuccessfully());
+
+        for (Ignite node : cluster.nodes()) {
+            List<ReadResult> readResults = readRowsFromTable(node, tableName, 
rowsAdded.get());
+
+            assertThatSchemaVersionIsConsistentWithCommitTs(readResults, 
lastSchemaChangeActivationTs(tableName));
+        }
+    }
+
+    private void createTable(String tableName) {
+        cluster.doInSession(0, session -> {
+            executeUpdate(
+                    "CREATE TABLE " + tableName + " (id INT PRIMARY KEY, val 
INT, column_to_drop INT) ZONE " + ZONE_NAME,
+                    session
+            );
+        });
+    }
+
+    private void applyForwardIncompatibleSchemaChange(String tableName) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("ALTER TABLE " + tableName + " DROP COLUMN 
column_to_drop", session);
+        });
+    }
+
+    private HybridTimestamp lastSchemaChangeActivationTs(String tableName) {
+        IgniteImpl igniteImpl = unwrapIgniteImpl(node0);
+
+        Catalog currentCatalog = 
igniteImpl.catalogManager().activeCatalog(igniteImpl.clock().nowLong());
+
+        CatalogTableDescriptor tableDescriptor = 
currentCatalog.table(QualifiedName.DEFAULT_SCHEMA_NAME, tableName);
+        assertThat(tableDescriptor, is(notNullValue()));
+        assertThat(tableDescriptor.latestSchemaVersion(), is(2));
+
+        return HybridTimestamp.hybridTimestamp(currentCatalog.time());
+    }
+
+    private void makeImplicitPut(int value, String tableName, 
SinglePartitionOperation singlePartitionOperation) {
+        makePutInTransaction(value, null, tableName, singlePartitionOperation);
+    }
+
+    private void makeExplicitPut(int value, String tableName, 
SinglePartitionOperation singlePartitionOperation) {
+        node0.transactions().runInTransaction((Transaction tx) -> 
makePutInTransaction(value, tx, tableName, singlePartitionOperation));
+    }
+
+    private void makePutInTransaction(int value, @Nullable Transaction tx, 
String tableName, SinglePartitionOperation operation) {
+        operation.put(tx, value, value, tableName, node0);
+    }
+
+    private static List<ReadResult> readRowsFromTable(Ignite node, String 
tableName, int expectedRowCount) {
+        List<ReadResult> readResults = readRowsFromTable(node, tableName);
+
+        assertThat(readResults, hasSize(equalTo(expectedRowCount)));
+
+        return readResults;
+    }
+
+    private static List<ReadResult> readRowsFromTable(Ignite node, String 
tableName) {
+        try (ResultSet<SqlRow> resultSet = node.sql().execute("SELECT * FROM " 
+ tableName)) {
+            while (resultSet.hasNext()) {
+                resultSet.next();
+            }
+        }
+
+        List<ReadResult> readResults = new ArrayList<>();
+
+        IgniteImpl ignite = unwrapIgniteImpl(node);
+        TableImpl table = 
unwrapTableImpl(requireNonNull(ignite.distributedTableManager().cachedTable(tableName)));
+
+        for (int partitionIndex = 0; partitionIndex < DEFAULT_PARTITION_COUNT; 
partitionIndex++) {
+            collectRowsFromPartition(table, partitionIndex, readResults);
+        }
+
+        return readResults;
+    }
+
+    private static void collectRowsFromPartition(TableImpl table, int 
partitionIndex, List<ReadResult> readResults) {
+        MvPartitionStorage partitionStorage = 
table.internalTable().storage().getMvPartition(partitionIndex);
+
+        if (partitionStorage != null) {
+            bypassingThreadAssertions(() -> {
+                try (PartitionTimestampCursor cursor = 
partitionStorage.scan(HybridTimestamp.MAX_VALUE)) {
+                    for (ReadResult readResult : cursor) {
+                        readResults.add(readResult);
+                    }
+                }
+            });
+        }
+    }
+
+    private static void assertThatSchemaVersionIsConsistentWithCommitTs(
+            List<ReadResult> readResults,
+            HybridTimestamp incompatibleChangeActivationTs
+    ) {
+        for (ReadResult readResult : readResults) {
+            HybridTimestamp commitTimestamp = readResult.commitTimestamp();
+            assertThat(commitTimestamp, is(notNullValue()));
+
+            BinaryRow binaryRow = readResult.binaryRow();
+            assertThat(binaryRow, is(notNullValue()));
+
+            if (commitTimestamp.compareTo(incompatibleChangeActivationTs) < 0) 
{
+                assertThat(binaryRow.schemaVersion(), is(1));
+            } else {
+                assertThat(
+                        "According to commitTs, schema version must be 2, but 
it's still 1; this should never happen for "
+                                + "forward incompatible schema changes",
+                        binaryRow.schemaVersion(), is(2)
+                );
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(SinglePartitionOperation.class)
+    void 
forwardIncompatibleSchemaChangesCannotBeCreatedBy2PcTransactions(SinglePartitionOperation
 operation) throws Exception {
+        for (int attempt = 0; attempt < 10; attempt++) {
+            
testForwardIncompatibleSchemaChangesCannotBeCreatedBy2PcTransactions(operation, 
attempt);
+        }
+    }
+
+    private void 
testForwardIncompatibleSchemaChangesCannotBeCreatedBy2PcTransactions(SinglePartitionOperation
 operation, int attempt)
+            throws InterruptedException {
+        String tableName = "TABLE" + attempt;
+
+        createTable(tableName);
+
+        CountDownLatch startedPutting = new CountDownLatch(1);
+        AtomicBoolean performWrites = new AtomicBoolean(true);
+        AtomicInteger rowsAdded = new AtomicInteger();
+
+        CompletableFuture<Void> putterFuture = CompletableFuture.runAsync(() 
-> {
+            try {
+                makeExplicitPut(0, tableName, operation);
+                rowsAdded.incrementAndGet();
+            } finally {
+                startedPutting.countDown();
+            }
+
+            for (int i = 1; performWrites.get(); i++) {
+                try {
+                    makeExplicitPut(i, tableName, operation);
+                    rowsAdded.incrementAndGet();
+                } catch (Exception e) {
+                    if (!hasCause(e, IncompatibleSchemaException.class)) {
+                        throw e;
+                    }
+                }
+            }
+        }, executor);
+
+        assertTrue(startedPutting.await(10, SECONDS));
+        applyForwardIncompatibleSchemaChange(tableName);
+
+        performWrites.set(false);
+        assertThat(putterFuture, willCompleteSuccessfully());
+
+        for (Ignite node : cluster.nodes()) {
+            List<ReadResult> readResults = readRowsFromTable(node, tableName, 
rowsAdded.get());
+
+            assertThatSchemaVersionIsConsistentWithCommitTs(readResults, 
lastSchemaChangeActivationTs(tableName));
+        }
+    }
+
+    private static KeyValueView<Tuple, Tuple> kvView(String tableName, Ignite 
ignite) {
+        return ignite.tables().table(tableName).keyValueView();
+    }
+
+    private static Tuple keyTuple(int key) {
+        return Tuple.create().set("id", key);
+    }
+
+    private static Tuple valueTuple(int value) {
+        return Tuple.create().set("val", value);
+    }
+
+    private enum SinglePartitionOperation {
+        KV_SINGLE {
+            @Override
+            void put(@Nullable Transaction tx, int key, int value, String 
tableName, Ignite ignite) {
+                kvView(tableName, ignite).put(tx, keyTuple(key), 
valueTuple(value));
+            }
+        },
+        KV_BATCH {
+            @Override
+            void put(@Nullable Transaction tx, int key, int value, String 
tableName, Ignite ignite) {
+                kvView(tableName, ignite).putAll(tx, Map.of(keyTuple(key), 
valueTuple(value)));
+            }
+        },
+        SQL_SINGLE {
+            @Override
+            void put(@Nullable Transaction tx, int key, int value, String 
tableName, Ignite ignite) {
+                ignite.sql()
+                        .execute(tx, "INSERT INTO " + tableName + " (ID, VAL) 
VALUES (" + key + ", " + value + ")")
+                        .close();
+            }
+        };
+
+        abstract void put(@Nullable Transaction tx, int key, int value, String 
tableName, Ignite ignite);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSingleNodeSchemaForwardCompatibilityConsistencyTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSingleNodeSchemaForwardCompatibilityConsistencyTest.java
new file mode 100644
index 00000000000..757a93238b1
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSingleNodeSchemaForwardCompatibilityConsistencyTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ExecutorServiceExtension.class)
+class ItSingleNodeSchemaForwardCompatibilityConsistencyTest extends 
ItSchemaForwardCompatibilityConsistencyTest {
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f5c29d42a91..0727c4ab1bd 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -198,6 +198,8 @@ import 
org.apache.ignite.internal.network.wrapper.JumpToExecutorByConsistentIdAf
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import org.apache.ignite.internal.raft.Loza;
@@ -270,6 +272,7 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
+import 
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
 import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
@@ -922,6 +925,15 @@ public class IgniteImpl implements Ignite {
 
         volatileLogStorageManagerCreator = new 
VolatileLogStorageManagerCreator(name, 
workDir.resolve("volatile-log-spillout"));
 
+        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+        
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
+
+        SchemaSyncService schemaSyncService = new 
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
+
+        schemaManager = new SchemaManager(registry, catalogManager);
+
+        ValidationSchemasSource validationSchemasSource = new 
CatalogValidationSchemasSource(catalogManager, schemaManager);
+
         replicaMgr = new ReplicaManager(
                 name,
                 clusterSvc,
@@ -934,6 +946,7 @@ public class IgniteImpl implements Ignite {
                 partitionIdleSafeTimePropagationPeriodMsSupplier,
                 failureManager,
                 raftMarshaller,
+                new PartitionSafeTimeValidator(validationSchemasSource, 
catalogManager, schemaSyncService),
                 topologyAwareRaftGroupServiceFactory,
                 raftMgr,
                 partitionRaftConfigurer,
@@ -998,13 +1011,6 @@ public class IgniteImpl implements Ignite {
         raftMgr.appendEntriesRequestInterceptor(new 
CheckCatalogVersionOnAppendEntries(catalogManager));
         raftMgr.actionRequestInterceptor(new 
CheckCatalogVersionOnActionRequest(catalogManager));
 
-        schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
-        
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
-
-        SchemaSyncService schemaSyncService = new 
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
-
-        schemaManager = new SchemaManager(registry, catalogManager);
-
         distributionZoneManager = new DistributionZoneManager(
                 name,
                 () -> clusterSvc.topologyService().localMember().id(),
@@ -1149,6 +1155,7 @@ public class IgniteImpl implements Ignite {
                 sharedTxStateStorage,
                 metaStorageMgr,
                 schemaManager,
+                validationSchemasSource,
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 threadPoolsManager.commonScheduler(),
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index d100a87c34d..65624b8e156 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -219,6 +220,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                 new NoOpFailureManager(),
                 mock(ThreadLocalPartitionCommandsMarshaller.class),
+                new PermissiveSafeTimeValidator(),
                 mock(TopologyAwareRaftGroupServiceFactory.class),
                 raftManager,
                 RaftGroupOptionsConfigurer.EMPTY,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e19a1b5abaa..afb9b727471 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -126,8 +126,8 @@ import 
org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.Zone
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
-import 
org.apache.ignite.internal.partition.replicator.schema.CatalogValidationSchemasSource;
 import 
org.apache.ignite.internal.partition.replicator.schema.ExecutorInclinedSchemaSyncService;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.raft.ExecutorInclinedRaftCommandRunner;
@@ -304,6 +304,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final SchemaVersions schemaVersions;
 
+    private final ValidationSchemasSource validationSchemasSource;
+
     private final PartitionReplicatorNodeRecovery 
partitionReplicatorNodeRecovery;
 
     /** Ends at the {@link IgniteComponent#stopAsync(ComponentContext)} with 
an {@link NodeStoppingException}. */
@@ -417,6 +419,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             TxStateRocksDbSharedStorage txStateRocksDbSharedStorage,
             MetaStorageManager metaStorageMgr,
             SchemaManager schemaManager,
+            ValidationSchemasSource validationSchemasSource,
             ExecutorService ioExecutor,
             Executor partitionOperationsExecutor,
             ScheduledExecutorService commonScheduler,
@@ -447,6 +450,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.dataStorageMgr = dataStorageMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.schemaManager = schemaManager;
+        this.validationSchemasSource = validationSchemasSource;
         this.ioExecutor = ioExecutor;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
         this.clockService = clockService;
@@ -1065,7 +1069,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 safeTimeTracker,
                 transactionStateResolver,
                 partitionUpdateHandlers.storageUpdateHandler,
-                new CatalogValidationSchemasSource(catalogService, 
schemaManager),
+                validationSchemasSource,
                 localNode(),
                 executorInclinedSchemaSyncService,
                 catalogService,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionSafeTimeValidator.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionSafeTimeValidator.java
new file mode 100644
index 00000000000..db646d50b77
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionSafeTimeValidator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.partition.replicator.network.command.TableAwareCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import 
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidationResult;
+import org.apache.ignite.raft.jraft.option.SafeTimeValidator;
+
+/**
+ * Validator for partition commands.
+ *
+ * <ul>
+ * <li>requests a retry for full (1PC) update commands that require metadata 
to be available by safe time if that metadata
+ * is not yet available;</li>
+ * <li>rejects full (1PC) update commands whose commitTs fails schema 
compatibility validation (the caller is expected to retry the
+ * corresponding implicit transaction).</li>
+ * </ul>
+ */
+public class PartitionSafeTimeValidator implements SafeTimeValidator {
+    private static final IgniteLogger LOG = 
Loggers.forClass(PartitionSafeTimeValidator.class);
+
+    private final SchemaCompatibilityValidator schemaCompatibilityValidator;
+
+    public PartitionSafeTimeValidator(
+            ValidationSchemasSource validationSchemasSource,
+            CatalogService catalogService,
+            SchemaSyncService schemaSyncService
+    ) {
+        schemaCompatibilityValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
+    }
+
+    @Override
+    public boolean shouldValidateFor(WriteCommand command) {
+        return command instanceof UpdateCommandBase
+                && ((UpdateCommandBase) command).full()
+                && command instanceof TableAwareCommand;
+    }
+
+    @Override
+    public SafeTimeValidationResult validate(String groupId, WriteCommand 
command, HybridTimestamp safeTime) {
+        UpdateCommandBase updateCommand = (UpdateCommandBase) command;
+
+        CompletableFuture<CompatValidationResult> future = 
schemaCompatibilityValidator.validateCommit(
+                updateCommand.txId(),
+                Set.of(((TableAwareCommand) updateCommand).tableId()),
+                safeTime
+        );
+
+        if (!future.isDone()) {
+            String template = "Metadata not yet available by safe time, 
rejecting ActionRequest with EBUSY [group={}, safeTs={}].";
+
+            // TODO: IGNITE-20298 - throttle logging.
+            LOG.warn(template, groupId, safeTime);
+
+            return 
SafeTimeValidationResult.forRetry(IgniteStringFormatter.format(template, 
groupId, safeTime));
+        }
+
+        CompatValidationResult compatibilityValidationResult = future.join();
+
+        if (compatibilityValidationResult.isSuccessful()) {
+            return SafeTimeValidationResult.forValid();
+        } else {
+            return 
SafeTimeValidationResult.forRejected(compatibilityValidationResult.validationFailedMessage());
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ff0706ad805..e0d2b048f5c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -202,6 +202,7 @@ import 
org.apache.ignite.internal.tx.message.TxStatePrimaryReplicaRequest;
 import 
org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequestBase;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.CursorUtils;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
@@ -209,6 +210,8 @@ import 
org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -2401,6 +2404,22 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         return raftCommandApplicator.applyCommandWithExceptionHandling(cmd);
     }
 
+    private static CommandApplicationResult 
throwIfFullTxCommitSchemaValidationFailedDuringReplication(
+            CommandApplicationResult res,
+            Throwable ex
+    ) {
+        if (ex != null) {
+            Throwable rootCause = ExceptionUtils.unwrapRootCause(ex);
+            if (rootCause instanceof RaftException && ((RaftException) 
rootCause).raftError() == RaftError.EREJECTED_BY_VALIDATOR) {
+                throw new 
IncompatibleSchemaVersionException(rootCause.getMessage());
+            }
+
+            sneakyThrow(ex);
+        }
+
+        return res;
+    }
+
     /**
      * Executes an Update command.
      *
@@ -2520,7 +2539,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
                     return completedFuture(new 
CommandApplicationResult(safeTs, null));
                 }
-            });
+            
}).handle(PartitionReplicaListener::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
         }
     }
 
@@ -2658,7 +2677,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
                     return completedFuture(new 
CommandApplicationResult(safeTs, null));
                 }
-            });
+            
}).handle(PartitionReplicaListener::throwIfFullTxCommitSchemaValidationFailedDuringReplication);
         }
     }
 
@@ -3542,7 +3561,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             UUID txId,
             boolean full,
             UUID txCoordinatorId,
-            @Nullable HybridTimestamp initiatorTime,
+            HybridTimestamp initiatorTime,
             int catalogVersion,
             @Nullable Long leaseStartTime
     ) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
index 65f5f9773aa..df9461a0e2d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequest.java
@@ -17,10 +17,11 @@
 
 package org.apache.ignite.internal.table.distributed.schema;
 
-import static 
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+import static 
org.apache.ignite.internal.table.distributed.schema.MetadataSufficiency.isMetadataAvailableForCatalogVersion;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.marshaller.PartitionCommandsMarshaller;
@@ -83,20 +84,18 @@ public class CheckCatalogVersionOnActionRequest implements 
ActionRequestIntercep
                 OptimizedMarshaller.ORDER));
 
         if (requiredCatalogVersion >= 0) {
-            if (!isMetadataAvailableFor(requiredCatalogVersion, 
catalogService)) {
-                // TODO: IGNITE-20298 - throttle logging.
-                LOG.warn(
-                        "Metadata not yet available, rejecting ActionRequest 
with EBUSY [group={}, requiredLevel={}].",
-                        request.groupId(), requiredCatalogVersion
+            if (!isMetadataAvailableForCatalogVersion(requiredCatalogVersion, 
catalogService)) {
+                String message = IgniteStringFormatter.format(
+                        "Metadata not yet available by catalog version, 
rejecting ActionRequest with EBUSY [group={}, requiredLevel={}].",
+                        request.groupId(),
+                        requiredCatalogVersion
                 );
 
+                // TODO: IGNITE-20298 - throttle logging.
+                LOG.warn(message);
+
                 return RaftRpcFactory.DEFAULT //
-                    .newResponse(
-                            node.getRaftOptions().getRaftMessagesFactory(),
-                            RaftError.EBUSY,
-                            "Metadata not yet available, rejecting 
ActionRequest with EBUSY [group=%s, requiredLevel=%d].",
-                            request.groupId(), requiredCatalogVersion
-                    );
+                    
.newResponse(node.getRaftOptions().getRaftMessagesFactory(), RaftError.EBUSY, 
message);
             }
         }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
index 52f52176ee9..d71b22a5f03 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java
@@ -18,10 +18,11 @@
 package org.apache.ignite.internal.table.distributed.schema;
 
 import static 
org.apache.ignite.internal.partition.replicator.marshaller.PartitionCommandsMarshaller.NO_VERSION_REQUIRED;
-import static 
org.apache.ignite.internal.table.distributed.schema.CatalogVersionSufficiency.isMetadataAvailableFor;
+import static 
org.apache.ignite.internal.table.distributed.schema.MetadataSufficiency.isMetadataAvailableForCatalogVersion;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.marshaller.PartitionCommandsMarshaller;
@@ -67,19 +68,21 @@ public class CheckCatalogVersionOnAppendEntries implements 
AppendEntriesRequestI
         for (RaftOutter.EntryMeta entry : request.entriesList()) {
             int requiredCatalogVersion = 
readRequiredCatalogVersionForMeta(allData, entry, 
node.getOptions().getCommandsMarshaller());
 
-            if (requiredCatalogVersion != NO_VERSION_REQUIRED && 
!isMetadataAvailableFor(requiredCatalogVersion, catalogService)) {
-                // TODO: IGNITE-20298 - throttle logging.
-                LOG.warn(
+            if (requiredCatalogVersion != NO_VERSION_REQUIRED
+                    && 
!isMetadataAvailableForCatalogVersion(requiredCatalogVersion, catalogService)) {
+                String message = IgniteStringFormatter.format(
                         "Metadata not yet available, rejecting 
AppendEntriesRequest with EBUSY [group={}, requiredLevel={}].",
                         request.groupId(), requiredCatalogVersion
                 );
 
+                // TODO: IGNITE-20298 - throttle logging.
+                LOG.warn(message);
+
                 return RaftRpcFactory.DEFAULT //
                     .newResponse(
                             node.getRaftOptions().getRaftMessagesFactory(),
                             RaftError.EBUSY,
-                            "Metadata not yet available, rejecting 
AppendEntriesRequest with EBUSY [group=%s, requiredLevel=%d].",
-                            request.groupId(), requiredCatalogVersion
+                            message
                     );
             }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiency.java
similarity index 83%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
rename to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiency.java
index 442af48311c..a19ee37fc4f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiency.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiency.java
@@ -20,10 +20,10 @@ package org.apache.ignite.internal.table.distributed.schema;
 import org.apache.ignite.internal.catalog.CatalogService;
 
 /**
- * Logic that allows to determine whether the logcal Catalog version is 
sufficient.
+ * Logic that allows to determine whether the local schema metadata is 
sufficient.
  */
-public class CatalogVersionSufficiency {
-    private CatalogVersionSufficiency() {
+public class MetadataSufficiency {
+    private MetadataSufficiency() {
         // Deny instantiation.
     }
 
@@ -34,7 +34,7 @@ public class CatalogVersionSufficiency {
      * @param catalogService Catalog service.
      * @return {@code true} iff the local Catalog version is sufficient.
      */
-    public static boolean isMetadataAvailableFor(int requiredCatalogVersion, 
CatalogService catalogService) {
+    public static boolean isMetadataAvailableForCatalogVersion(int 
requiredCatalogVersion, CatalogService catalogService) {
         return 
catalogService.catalogReadyFuture(requiredCatalogVersion).isDone();
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index db52dad31cf..64d5286ee2c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -115,6 +115,7 @@ import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
@@ -163,6 +164,7 @@ import 
org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.option.PermissiveSafeTimeValidator;
 import org.apache.ignite.sql.IgniteSql;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -448,6 +450,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
                 failureProcessor,
                 null,
+                new PermissiveSafeTimeValidator(),
                 mock(TopologyAwareRaftGroupServiceFactory.class),
                 rm,
                 RaftGroupOptionsConfigurer.EMPTY,
@@ -573,6 +576,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 sharedTxStateStorage,
                 metaStorageManager,
                 sm,
+                mock(ValidationSchemasSource.class),
                 partitionOperationsExecutor,
                 partitionOperationsExecutor,
                 scheduledExecutor,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index a3068cd7042..03ebd18c989 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaService;
@@ -585,6 +586,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 sharedTxStateStorage,
                 msm,
                 sm,
+                mock(ValidationSchemasSource.class),
                 partitionOperationsExecutor,
                 partitionOperationsExecutor,
                 scheduledExecutor,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index b284ce1a622..d96a5d45554 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -104,6 +104,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -147,6 +148,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.CatalogVe
 import 
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BuildIndexReplicaRequest;
@@ -251,6 +253,8 @@ import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.tx.TransactionException;
@@ -2106,6 +2110,68 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 .map(Arguments::of);
     }
 
+    @ParameterizedTest
+    @MethodSource("singleRowWriteRequestTypes")
+    public void singleRowWritesRespectFailedSchemaValidationResult(RequestType 
requestType) {
+        RwListenerInvocation invocation;
+
+        if (requestType == RW_DELETE || requestType == RW_GET_AND_DELETE) {
+            invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, true);
+        } else {
+            invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, true);
+        }
+
+        testWritesRespectFailedSchemaValidationResult(requestType, invocation);
+    }
+
+    @Test
+    public void replaceRequestRespectsFailedSchemaValidationResult() {
+        RwListenerInvocation invocation = (targetTxId, key) -> 
doReplaceRequest(
+                targetTxId,
+                marshalKeyOrKeyValue(RW_REPLACE, key),
+                marshalKeyOrKeyValue(RW_REPLACE, key),
+                true
+        );
+
+        testWritesRespectFailedSchemaValidationResult(RW_REPLACE, invocation);
+    }
+
+    @ParameterizedTest
+    @MethodSource("multiRowsWriteRequestTypes")
+    public void multiRowWritesRespectFailedSchemaValidationResult(RequestType 
requestType) {
+        RwListenerInvocation invocation;
+
+        if (requestType == RW_DELETE_ALL) {
+            invocation = (targetTxId, key)
+                    -> doMultiRowPkRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, true);
+        } else {
+            invocation = (targetTxId, key)
+                    -> doMultiRowRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, true);
+        }
+
+        testWritesRespectFailedSchemaValidationResult(requestType, invocation);
+    }
+
+    private void testWritesRespectFailedSchemaValidationResult(RequestType 
requestType, RwListenerInvocation listenerInvocation) {
+        TestKey key = nextKey();
+
+        if (RequestTypes.looksUpFirst(requestType)) {
+            upsertInNewTxFor(key);
+
+            // While handling the upsert, our mocks were touched, let's reset 
them to prevent false-positives during verification.
+            Mockito.reset(schemaSyncService);
+        }
+
+        UUID targetTxId = newTxId();
+
+        when(mockRaftClient.run(any(UpdateCommandBase.class)))
+                .thenReturn(failedFuture(new CompletionException(new 
RaftException(RaftError.EREJECTED_BY_VALIDATOR))));
+
+        CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, 
key);
+
+        assertThat(future, 
willThrow(IncompatibleSchemaVersionException.class));
+    }
+
     @CartesianTest
     @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
     void singleRowRwOperationsFailIfTableAlteredAfterTxStart(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
index 6d1eb3370a1..b51cceaa935 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnActionRequestTest.java
@@ -25,9 +25,9 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.when;
 
@@ -164,7 +164,7 @@ class CheckCatalogVersionOnActionRequestTest extends 
BaseIgniteAbstractTest {
         ErrorResponse errorResponse = (ErrorResponse) result;
         assertThat(errorResponse.errorCode(), is(RaftError.EBUSY.getNumber()));
         assertThat(errorResponse.errorMsg(),
-                is("Metadata not yet available, rejecting ActionRequest with 
EBUSY [group=test, requiredLevel=6]."));
+                is("Metadata not yet available by catalog version, rejecting 
ActionRequest with EBUSY [group=test, requiredLevel=6]."));
     }
 
     @ParameterizedTest
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiencyTest.java
similarity index 83%
rename from 
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
rename to 
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiencyTest.java
index 0bf845cc107..3aa161640fc 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/CatalogVersionSufficiencyTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiencyTest.java
@@ -32,7 +32,7 @@ import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
-class CatalogVersionSufficiencyTest extends BaseIgniteAbstractTest {
+class MetadataSufficiencyTest extends BaseIgniteAbstractTest {
     @Mock
     private CatalogService catalogService;
 
@@ -40,20 +40,20 @@ class CatalogVersionSufficiencyTest extends 
BaseIgniteAbstractTest {
     void exceedingLocalVersionIsSufficient() {
         
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(nullCompletedFuture());
 
-        assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(8, 
catalogService));
+        assertTrue(MetadataSufficiency.isMetadataAvailableForCatalogVersion(8, 
catalogService));
     }
 
     @Test
     void equalLocalVersionIsSufficient() {
         
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(nullCompletedFuture());
 
-        assertTrue(CatalogVersionSufficiency.isMetadataAvailableFor(10, 
catalogService));
+        
assertTrue(MetadataSufficiency.isMetadataAvailableForCatalogVersion(10, 
catalogService));
     }
 
     @Test
-    void lowerLocalVersionIsSufficient() {
+    void lowerLocalVersionIsInsufficient() {
         when(catalogService.catalogReadyFuture(anyInt())).thenReturn(new 
CompletableFuture<>());
 
-        assertFalse(CatalogVersionSufficiency.isMetadataAvailableFor(12, 
catalogService));
+        
assertFalse(MetadataSufficiency.isMetadataAvailableForCatalogVersion(12, 
catalogService));
     }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/CompoundValidationSchemasSource.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/CompoundValidationSchemasSource.java
new file mode 100644
index 00000000000..fdce26e2d3f
--- /dev/null
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/CompoundValidationSchemasSource.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.partition.replicator.schema.FullTableSchema;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+
+class CompoundValidationSchemasSource implements ValidationSchemasSource {
+    private final Map<Integer, ValidationSchemasSource> schemasSources = new 
ConcurrentHashMap<>();
+
+    void registerSource(int table, ValidationSchemasSource source) {
+        schemasSources.put(table, source);
+    }
+
+    @Override
+    public CompletableFuture<Void> waitForSchemaAvailability(int tableId, int 
schemaVersion) {
+        return 
requiredSourceForTable(tableId).waitForSchemaAvailability(tableId, 
schemaVersion);
+    }
+
+    private ValidationSchemasSource requiredSourceForTable(int tableId) {
+        return requireNonNull(schemasSources.get(tableId), "No source for 
table " + tableId);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        return 
requiredSourceForTable(tableId).tableSchemaVersionsBetween(tableId, 
fromIncluding, toIncluding);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(int tableId, 
HybridTimestamp fromIncluding, int toTableVersionIncluding) {
+        return 
requiredSourceForTable(tableId).tableSchemaVersionsBetween(tableId, 
fromIncluding, toTableVersionIncluding);
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index d108b2372bb..31dde335c40 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -156,6 +156,7 @@ import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
+import 
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
 import 
org.apache.ignite.internal.table.distributed.raft.TablePartitionProcessor;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
@@ -284,6 +285,10 @@ public class ItTxTestCluster {
 
     protected String localNodeName;
 
+    private SchemaSyncService schemaSyncService;
+
+    private final CompoundValidationSchemasSource validationSchemasSource = 
new CompoundValidationSchemasSource();
+
     private final Map<String, Map<ZonePartitionId, ZonePartitionRaftListener>> 
zonePartitionRaftGroupListeners = new HashMap<>();
 
     private final Map<String, Map<ZonePartitionId, 
ZonePartitionReplicaListener>> zonePartitionReplicaListeners = new HashMap<>();
@@ -487,6 +492,8 @@ public class ItTxTestCluster {
                     new RaftGroupEventsClientListener()
             );
 
+            schemaSyncService = new AlwaysSyncedSchemaSyncService();
+
             ReplicaManager replicaMgr = new ReplicaManager(
                     nodeName,
                     clusterService,
@@ -499,6 +506,7 @@ public class ItTxTestCluster {
                     this::getSafeTimePropagationTimeout,
                     new NoOpFailureManager(),
                     commandMarshaller,
+                    new PartitionSafeTimeValidator(validationSchemasSource, 
catalogService, schemaSyncService),
                     raftClientFactory,
                     raftSrv,
                     partitionRaftConfigurer,
@@ -772,7 +780,10 @@ public class ItTxTestCluster {
                         TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER
                 );
 
-                DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
+                var schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
+                var tableValidationSchemasSource = new 
DummyValidationSchemasSource(schemaManager);
+
+                validationSchemasSource.registerSource(tableId, 
tableValidationSchemasSource);
 
                 RaftGroupListener raftGroupListener = 
getOrCreateAndPopulateRaftGroupListener(
                         assignment,
@@ -804,9 +815,9 @@ public class ItTxTestCluster {
                                 txStateStorage,
                                 transactionStateResolver,
                                 storageUpdateHandler,
-                                new 
DummyValidationSchemasSource(schemaManager),
+                                validationSchemasSource,
                                 nodeResolver.getByConsistentId(assignment),
-                                new AlwaysSyncedSchemaSyncService(),
+                                schemaSyncService,
                                 catalogService,
                                 placementDriver,
                                 nodeResolver,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3c9cd225edf..59ab5d1541d 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -473,6 +473,9 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
 
         ZonePartitionId zonePartitionId = new ZonePartitionId(ZONE_ID, 
PART_ID);
 
+        var validationSchemasSource = new 
DummyValidationSchemasSource(schemaManager);
+        var schemaSyncService = new AlwaysSyncedSchemaSyncService();
+
         var tableReplicaListener = new PartitionReplicaListener(
                 mvPartStorage,
                 svc,
@@ -488,9 +491,9 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 safeTime,
                 transactionStateResolver,
                 storageUpdateHandler,
-                new DummyValidationSchemasSource(schemaManager),
+                validationSchemasSource,
                 LOCAL_NODE,
-                new AlwaysSyncedSchemaSyncService(),
+                schemaSyncService,
                 catalogService,
                 placementDriver,
                 mock(ClusterNodeResolver.class),
@@ -512,8 +515,8 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 txStateStorage.getOrCreatePartitionStorage(PART_ID),
                 CLOCK_SERVICE,
                 this.txManager,
-                new DummyValidationSchemasSource(schemaManager),
-                new AlwaysSyncedSchemaSyncService(),
+                validationSchemasSource,
+                schemaSyncService,
                 catalogService,
                 placementDriver,
                 mock(ClusterNodeResolver.class),

Reply via email to