This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 09bd408a11 IGNITE-20715 Check that versions of tuples sent to 
PartitionReplicaListener match tx-bound schema version (#2761)
09bd408a11 is described below

commit 09bd408a11252d821b155d00917ea10d22847110
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Oct 31 14:41:58 2023 +0400

    IGNITE-20715 Check that versions of tuples sent to PartitionReplicaListener 
match tx-bound schema version (#2761)
---
 .../ignite/internal/replicator/ReplicaManager.java |  11 +-
 .../exception/ExpectedReplicationException.java    |  25 ++++
 .../ignite/internal/table/AbstractTableView.java   |  29 +++-
 .../internal/table/RecordBinaryViewImpl.java       |   2 -
 .../InternalSchemaVersionMismatchException.java    |  36 +++++
 .../replicator/PartitionReplicaListener.java       | 134 ++++++++++++------
 .../replicator/SchemaCompatValidator.java          |  19 +++
 .../table/KeyValueBinaryViewOperationsTest.java    |  34 +++++
 .../internal/table/KeyValueViewOperationsTest.java | 129 +++++++++++------
 .../table/RecordBinaryViewOperationsTest.java      | 117 ++++++++--------
 .../internal/table/RecordViewOperationsTest.java   | 123 ++++++++++------
 .../PartitionReplicaListenerIndexLockingTest.java  |   6 +-
 .../replication/PartitionReplicaListenerTest.java  | 155 +++++++++++++++++----
 .../apache/ignite/distributed/ItTxTestCluster.java |  10 +-
 14 files changed, 613 insertions(+), 217 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index c9ff648060..01e6d13e5c 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -50,6 +50,7 @@ import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessage
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaIsAlreadyStartedException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaStoppingException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
@@ -291,7 +292,11 @@ public class ReplicaManager implements IgniteComponent {
                 if (ex == null) {
                     msg = prepareReplicaResponse(sendTimestamp, res.result());
                 } else {
-                    LOG.warn("Failed to process replica request [request={}]", 
ex, request);
+                    if (indicatesUnexpectedProblem(ex)) {
+                        LOG.warn("Failed to process replica request 
[request={}]", ex, request);
+                    } else {
+                        LOG.debug("Failed to process replica request 
[request={}]", ex, request);
+                    }
 
                     msg = prepareReplicaErrorResponse(sendTimestamp, ex);
                 }
@@ -338,6 +343,10 @@ public class ReplicaManager implements IgniteComponent {
         }
     }
 
+    private static boolean indicatesUnexpectedProblem(Throwable ex) {
+        return !(ex instanceof ExpectedReplicationException);
+    }
+
     /**
      * Checks this exception is caused of timeout or connectivity issue.
      *
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ExpectedReplicationException.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ExpectedReplicationException.java
new file mode 100644
index 0000000000..814ba8cb64
--- /dev/null
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ExpectedReplicationException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.exception;
+
+/**
+ * Marker interface for exceptions that are used in the replication protocol 
normally (i.e. they don't indicate errors that need
+ * to be logged, for example).
+ */
+public interface ExpectedReplicationException {
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 6b6187bdf4..0a3304053e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -17,13 +17,17 @@
 
 package org.apache.ignite.internal.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
 import org.apache.ignite.internal.schema.SchemaRegistry;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -95,17 +99,38 @@ abstract class AbstractTableView {
      * @return Whatever the action returns.
      */
     protected final <T> CompletableFuture<T> withSchemaSync(@Nullable 
Transaction tx, KvAction<T> action) {
-        // TODO: IGNITE-20106 - retry if our request is rejected by the server 
due to a changed schema version.
+        return withSchemaSync(tx, null, action);
+    }
 
+    private <T> CompletableFuture<T> withSchemaSync(@Nullable Transaction tx, 
@Nullable Integer previousSchemaVersion, KvAction<T> action) {
         CompletableFuture<Integer> schemaVersionFuture = tx == null
                 ? schemaVersions.schemaVersionAtNow(tbl.tableId())
                 : schemaVersions.schemaVersionAt(((InternalTransaction) 
tx).startTimestamp(), tbl.tableId());
 
-        CompletableFuture<T> future = 
schemaVersionFuture.thenCompose(action::act);
+        CompletableFuture<T> future = schemaVersionFuture
+                .thenCompose(schemaVersion -> action.act(schemaVersion)
+                        .handle((res, ex) -> {
+                            if 
(isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
+                                assert tx == null : "Only for implicit 
transactions a retry might be requested";
+                                assert previousSchemaVersion == null || 
!Objects.equals(schemaVersion, previousSchemaVersion)
+                                        : "Same schema version (" + 
schemaVersion
+                                                + ") on a retry: something is 
wrong, is this caused by the test setup?";
+
+                                // Repeat.
+                                return withSchemaSync(tx, schemaVersion, 
action);
+                            } else {
+                                return ex == null ? completedFuture(res) : 
CompletableFuture.<T>failedFuture(ex);
+                            }
+                        }))
+                .thenCompose(identity());
 
         return convertToPublicFuture(future);
     }
 
+    private static boolean isOrCausedBy(Class<? extends Exception> 
exceptionClass, @Nullable Throwable ex) {
+        return ex != null && (exceptionClass.isInstance(ex) || 
isOrCausedBy(exceptionClass, ex.getCause()));
+    }
+
     /**
      * Action representing some KV operation. When executed, the action is 
supplied with schema version corresponding
      * to the operation timestamp (see {@link #withSchemaSync(Transaction, 
KvAction)} for details).
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 7284c12a2b..7ee855b2a3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -40,7 +40,6 @@ import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
-import org.jetbrains.annotations.TestOnly;
 
 /**
  * Table view implementation for binary objects.
@@ -84,7 +83,6 @@ public class RecordBinaryViewImpl extends AbstractTableView 
implements RecordVie
      *
      * @param schemaVersion Schema version for which to obtain a marshaller.
      */
-    @TestOnly
     public TupleMarshaller marshaller(int schemaVersion) {
         return marshallerCache.marshaller(schemaVersion);
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/InternalSchemaVersionMismatchException.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/InternalSchemaVersionMismatchException.java
new file mode 100644
index 0000000000..e7b249e0b9
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/InternalSchemaVersionMismatchException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import 
org.apache.ignite.internal.replicator.exception.ExpectedReplicationException;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.schema.BinaryRow;
+
+/**
+ * An exception that is thrown to indicate that schema version of {@link 
BinaryRow}(s) passed in a {@link ReplicaRequest} do not match
+ * the schema version corresponding to the transaction. This can only happen 
if the caller first obtained the schema
+ * and then started an (implicit) transaction; in such a case, the caller 
should retry the sequence (that is,
+ * re-obtain the new schema version, start another implicit transaction and 
send new ReplicaRequest).
+ *
+ * <p>This exception should never reach the public API users (even as a cause 
or a suppressed exception),
+ * that's why it has no error code attached.
+ */
+public class InternalSchemaVersionMismatchException extends 
IgniteInternalException implements ExpectedReplicationException {
+    private static final long serialVersionUID = 7695731405107811859L;
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 91e86650a2..94a16bd373 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -67,7 +67,6 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -98,6 +97,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.NullBinaryRow;
+import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -472,34 +472,30 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
         }
 
-        return waitForSchemasBeforeReading(request)
-                .thenCompose(unused -> validateTableExistence(request))
-                .thenCompose(opStartTimestamp -> 
processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
+        HybridTimestamp opTsIfDirectRo = (request instanceof 
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
+
+        return validateTableExistence(request, opTsIfDirectRo)
+                .thenCompose(unused -> validateSchemaMatch(request, 
opTsIfDirectRo))
+                .thenCompose(unused -> waitForSchemasBeforeReading(request, 
opTsIfDirectRo))
+                .thenCompose(opStartTimestamp -> 
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
     }
 
     /**
-     * Makes sure that we have schemas corresponding to the moment of tx 
start; this makes PK extraction safe WRT
-     * {@link org.apache.ignite.internal.schema.SchemaRegistry#schema(int)}.
+     * Validates that the table exists at a timestamp corresponding to the 
request operation.
      *
-     * @param request Request that's being processed.
+     * <ul>
+     *     <li>For a read/write in an RW transaction, it's 'now'</li>
+     *     <li>For an RO read (with readTimestamp), it's readTimestamp 
(matches readTimestamp in the transaction)</li>
+     *     <li>For a direct read in an RO implicit transaction, it's the 
timestamp chosen (as 'now') to process the request</li>
+     * </ul>
+     *
+     * <p>For other requests, the validation is skipped.
+     *
+     * @param request Replica request corresponding to the operation.
+     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
+     * @return Future completed when the validation is finished.
      */
-    private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest 
request) {
-        // TODO: IGNITE-20715 - validate that input rows schema version 
matches the tx-bound schema version.
-
-        HybridTimestamp tsToWaitForSchemas;
-
-        if (request instanceof ReadWriteReplicaRequest) {
-            tsToWaitForSchemas = 
TransactionIds.beginTimestamp(((ReadWriteReplicaRequest) 
request).transactionId());
-        } else if (request instanceof ReadOnlyReplicaRequest) {
-            tsToWaitForSchemas = ((ReadOnlyReplicaRequest) 
request).readTimestamp();
-        } else {
-            tsToWaitForSchemas = null;
-        }
-
-        return tsToWaitForSchemas == null ? completedFuture(null) : 
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchemas);
-    }
-
-    private CompletableFuture<HybridTimestamp> 
validateTableExistence(ReplicaRequest request) {
+    private CompletableFuture<Void> validateTableExistence(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
         HybridTimestamp opStartTs;
 
         if (request instanceof ReadWriteScanCloseReplicaRequest) {
@@ -510,7 +506,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof ReadOnlyReplicaRequest) {
             opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
         } else if (request instanceof ReadOnlyDirectReplicaRequest) {
-            opStartTs = hybridClock.now();
+            assert opTsIfDirectRo != null;
+
+            opStartTs = opTsIfDirectRo;
         } else {
             opStartTs = null;
         }
@@ -520,18 +518,84 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
 
         return schemaSyncService.waitForMetadataCompleteness(opStartTs)
-                .thenApply(unused -> {
-                    schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, 
tableId());
+                .thenRun(() -> 
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
+    }
 
-                    return opStartTs;
+    /**
+     * Makes sure that {@link 
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches 
table schema version
+     * corresponding to the operation.
+     *
+     * @param request Replica request corresponding to the operation.
+     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
+     * @return Future completed when the validation is finished.
+     */
+    private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+        if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
+            return completedFuture(null);
+        }
+
+        HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
+        if (tsToWaitForSchema == null) {
+            tsToWaitForSchema = opTsIfDirectRo;
+        }
+
+        if (tsToWaitForSchema == null) {
+            return completedFuture(null);
+        }
+
+        HybridTimestamp finalTsToWaitForSchema = tsToWaitForSchema;
+        return 
schemaSyncService.waitForMetadataCompleteness(finalTsToWaitForSchema)
+                .thenRun(() -> {
+                    SchemaVersionAwareReplicaRequest versionAwareRequest = 
(SchemaVersionAwareReplicaRequest) request;
+
+                    schemaCompatValidator.failIfRequestSchemaDiffersFromTxTs(
+                            finalTsToWaitForSchema,
+                            versionAwareRequest.schemaVersion(),
+                            tableId()
+                    );
                 });
     }
 
+    /**
+     * Makes sure that we have schemas corresponding to the moment of tx 
start; this makes PK extraction safe WRT
+     * {@link SchemaRegistry#schema(int)}.
+     *
+     * @param request Replica request corresponding to the operation.
+     * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null} 
otherwise.
+     * @return Future completed when the validation is finished.
+     */
+    private CompletableFuture<Void> waitForSchemasBeforeReading(ReplicaRequest 
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+        HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
+        if (tsToWaitForSchema == null) {
+            tsToWaitForSchema = opTsIfDirectRo;
+        }
+
+        return tsToWaitForSchema == null ? completedFuture(null) : 
schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchema);
+    }
+
+    /**
+     * Returns timestamp of transaction start (for RW/timestamped RO requests) 
or @{code null} for other requests.
+     *
+     * @param request Replica request corresponding to the operation.
+     */
+    private static @Nullable HybridTimestamp 
getTxStartTimestamp(ReplicaRequest request) {
+        HybridTimestamp txStartTimestamp;
+
+        if (request instanceof ReadWriteReplicaRequest) {
+            txStartTimestamp = 
TransactionIds.beginTimestamp(((ReadWriteReplicaRequest) 
request).transactionId());
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            txStartTimestamp = ((ReadOnlyReplicaRequest) 
request).readTimestamp();
+        } else {
+            txStartTimestamp = null;
+        }
+        return txStartTimestamp;
+    }
+
     private CompletableFuture<?> processOperationRequest(
             ReplicaRequest request,
             @Nullable Boolean isPrimary,
             String senderId,
-            HybridTimestamp opStartTimestamp
+            @Nullable HybridTimestamp opStartTsIfDirectRo
     ) {
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
@@ -603,9 +667,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof BuildIndexReplicaRequest) {
             return 
raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
         } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
-            return 
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) 
request, opStartTimestamp);
+            return 
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) 
request, opStartTsIfDirectRo);
         } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
-            return 
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) 
request, opStartTimestamp);
+            return 
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) 
request, opStartTsIfDirectRo);
         } else {
             throw new UnsupportedReplicaRequestException(request.getClass());
         }
@@ -3495,14 +3559,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         }
     }
 
-    private CatalogTableDescriptor getTableDescriptor(int tableId, int 
catalogVersion) {
-        CatalogTableDescriptor tableDescriptor = catalogService.table(tableId, 
catalogVersion);
-
-        assert tableDescriptor != null : "tableId=" + tableId + ", 
catalogVersion=" + catalogVersion;
-
-        return tableDescriptor;
-    }
-
     private static BuildIndexCommand 
toBuildIndexCommand(BuildIndexReplicaRequest request) {
         return MSG_FACTORY.buildIndexCommand()
                 .indexId(request.indexId())
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 90ca7d5d55..8aef07a5e6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -221,4 +221,23 @@ class SchemaCompatValidator {
             throw tableWasDroppedException(tableId);
         }
     }
+
+    /**
+     * Throws an {@link InternalSchemaVersionMismatchException} if the schema 
version passed in the request differs from the schema version
+     * corresponding to the transaction timestamp.
+     *
+     * @param txTs Transaction timestamp.
+     * @param requestSchemaVersion Schema version passed in the operation 
request.
+     * @param tableId ID of the table.
+     * @throws InternalSchemaVersionMismatchException Thrown if the schema 
versions are different.
+     */
+    void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp txTs, int 
requestSchemaVersion, int tableId) {
+        CatalogTableDescriptor table = catalogService.table(tableId, 
txTs.longValue());
+
+        assert table != null : "No table " + tableId + " at " + txTs;
+
+        if (table.tableVersion() != requestSchemaVersion) {
+            throw new InternalSchemaVersionMismatchException();
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
index 8d438e3cc2..8de46d2ee5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
@@ -17,17 +17,31 @@
 
 package org.apache.ignite.internal.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
@@ -431,6 +445,26 @@ public class KeyValueBinaryViewOperationsTest extends 
TableKvOperationsTestBase
         assertThrows(NullPointerException.class, () -> tbl.putAll(null, 
Collections.singletonMap(key, null)));
     }
 
+    @Test
+    void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+        SchemaDescriptor schema = schemaDescriptor();
+        InternalTable internalTable = spy(createInternalTable(schema));
+
+        KeyValueView<Tuple, Tuple> view = new 
KeyValueBinaryViewImpl(internalTable, new DummySchemaManagerImpl(schema), 
schemaVersions);
+
+        BinaryRow resultRow = new 
TupleMarshallerImpl(schema).marshal(Tuple.create().set("ID", 1L).set("VAL", 
2L));
+
+        doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+                .doReturn(completedFuture(resultRow))
+                .when(internalTable).get(any(), any());
+
+        Tuple result = view.get(null, Tuple.create().set("ID", 1L));
+
+        assertThat(result.longValue("VAL"), is(2L));
+
+        verify(internalTable, times(2)).get(any(), isNull());
+    }
+
     private SchemaDescriptor schemaDescriptor() {
         return new SchemaDescriptor(
                 SCHEMA_VERSION,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
index ebca9a33a7..dcb30d93ec 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.type.NativeTypes.BOOLEAN;
 import static org.apache.ignite.internal.type.NativeTypes.BYTES;
 import static org.apache.ignite.internal.type.NativeTypes.DATE;
@@ -30,13 +32,22 @@ import static 
org.apache.ignite.internal.type.NativeTypes.STRING;
 import static org.apache.ignite.internal.type.NativeTypes.datetime;
 import static org.apache.ignite.internal.type.NativeTypes.time;
 import static org.apache.ignite.internal.type.NativeTypes.timestamp;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
@@ -48,8 +59,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.type.NativeTypeSpec;
@@ -59,6 +73,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -67,6 +82,54 @@ import org.junit.jupiter.api.Test;
 public class KeyValueViewOperationsTest extends TableKvOperationsTestBase {
     private final Random rnd = new Random();
 
+    private final Column[] valCols = {
+            new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN, false),
+            new Column("primitiveByteCol".toUpperCase(), INT8, false),
+            new Column("primitiveShortCol".toUpperCase(), INT16, false),
+            new Column("primitiveIntCol".toUpperCase(), INT32, false),
+            new Column("primitiveLongCol".toUpperCase(), INT64, false),
+            new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
+            new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
+
+            new Column("booleanCol".toUpperCase(), BOOLEAN, true),
+            new Column("byteCol".toUpperCase(), INT8, true),
+            new Column("shortCol".toUpperCase(), INT16, true),
+            new Column("intCol".toUpperCase(), INT32, true),
+            new Column("longCol".toUpperCase(), INT64, true),
+            new Column("nullLongCol".toUpperCase(), INT64, true),
+            new Column("floatCol".toUpperCase(), FLOAT, true),
+            new Column("doubleCol".toUpperCase(), DOUBLE, true),
+
+            new Column("dateCol".toUpperCase(), DATE, true),
+            new Column("timeCol".toUpperCase(), time(0), true),
+            new Column("dateTimeCol".toUpperCase(), datetime(6), true),
+            new Column("timestampCol".toUpperCase(), timestamp(6), true),
+
+            new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
+            new Column("bitmaskCol".toUpperCase(), NativeTypes.bitmaskOf(42), 
true),
+            new Column("stringCol".toUpperCase(), STRING, true),
+            new Column("nullBytesCol".toUpperCase(), BYTES, true),
+            new Column("bytesCol".toUpperCase(), BYTES, true),
+            new Column("numberCol".toUpperCase(), NativeTypes.numberOf(12), 
true),
+            new Column("decimalCol".toUpperCase(), NativeTypes.decimalOf(19, 
3), true),
+    };
+
+    private final SchemaDescriptor schema = new SchemaDescriptor(
+            SCHEMA_VERSION,
+            new Column[]{new Column("id".toUpperCase(), INT64, false)},
+            valCols
+    );
+
+    private final Mapper<TestKeyObject> keyMapper = 
Mapper.of(TestKeyObject.class);
+    private final Mapper<TestObjectWithAllTypes> valMapper = 
Mapper.of(TestObjectWithAllTypes.class);
+
+    private DummyInternalTableImpl internalTable;
+
+    @BeforeEach
+    void createInternalTable() {
+        internalTable = spy(createInternalTable(schema));
+    }
+
     @Test
     public void put() {
         final TestKeyObject key = TestKeyObject.randomObject(rnd);
@@ -615,6 +678,27 @@ public class KeyValueViewOperationsTest extends 
TableKvOperationsTestBase {
         assertThrows(MarshallerException.class, () -> tbl.putAll(null, 
Collections.singletonMap(key, null)));
     }
 
+    @Test
+    void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+        KeyValueViewImpl<TestKeyObject, TestObjectWithAllTypes> view = 
kvView();
+
+        TestKeyObject key = new TestKeyObject(1);
+        TestObjectWithAllTypes expectedValue = 
TestObjectWithAllTypes.randomObject(rnd);
+
+        BinaryRow resultRow = new KvMarshallerImpl<>(schema, keyMapper, 
valMapper)
+                .marshal(key, expectedValue);
+
+        doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+                .doReturn(completedFuture(resultRow))
+                .when(internalTable).get(any(), any());
+
+        TestObjectWithAllTypes result = view.get(null, new TestKeyObject(1L));
+
+        assertThat(result, is(equalTo(expectedValue)));
+
+        verify(internalTable, times(2)).get(any(), isNull());
+    }
+
     /**
      * Creates key-value view.
      */
@@ -625,49 +709,6 @@ public class KeyValueViewOperationsTest extends 
TableKvOperationsTestBase {
 
         
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class, 
RETURNS_DEEP_STUBS));
 
-        Mapper<TestKeyObject> keyMapper = Mapper.of(TestKeyObject.class);
-        Mapper<TestObjectWithAllTypes> valMapper = 
Mapper.of(TestObjectWithAllTypes.class);
-
-        Column[] valCols = {
-                new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN, 
false),
-                new Column("primitiveByteCol".toUpperCase(), INT8, false),
-                new Column("primitiveShortCol".toUpperCase(), INT16, false),
-                new Column("primitiveIntCol".toUpperCase(), INT32, false),
-                new Column("primitiveLongCol".toUpperCase(), INT64, false),
-                new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
-                new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
-
-                new Column("booleanCol".toUpperCase(), BOOLEAN, true),
-                new Column("byteCol".toUpperCase(), INT8, true),
-                new Column("shortCol".toUpperCase(), INT16, true),
-                new Column("intCol".toUpperCase(), INT32, true),
-                new Column("longCol".toUpperCase(), INT64, true),
-                new Column("nullLongCol".toUpperCase(), INT64, true),
-                new Column("floatCol".toUpperCase(), FLOAT, true),
-                new Column("doubleCol".toUpperCase(), DOUBLE, true),
-
-                new Column("dateCol".toUpperCase(), DATE, true),
-                new Column("timeCol".toUpperCase(), time(0), true),
-                new Column("dateTimeCol".toUpperCase(), datetime(6), true),
-                new Column("timestampCol".toUpperCase(), timestamp(6), true),
-
-                new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
-                new Column("bitmaskCol".toUpperCase(), 
NativeTypes.bitmaskOf(42), true),
-                new Column("stringCol".toUpperCase(), STRING, true),
-                new Column("nullBytesCol".toUpperCase(), BYTES, true),
-                new Column("bytesCol".toUpperCase(), BYTES, true),
-                new Column("numberCol".toUpperCase(), 
NativeTypes.numberOf(12), true),
-                new Column("decimalCol".toUpperCase(), 
NativeTypes.decimalOf(19, 3), true),
-        };
-
-        SchemaDescriptor schema = new SchemaDescriptor(
-                SCHEMA_VERSION,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                valCols
-        );
-
-        DummyInternalTableImpl table = createInternalTable(schema);
-
         // Validate all types are tested.
         Set<NativeTypeSpec> testedTypes = Arrays.stream(valCols).map(c -> 
c.type().spec())
                 .collect(Collectors.toSet());
@@ -677,7 +718,7 @@ public class KeyValueViewOperationsTest extends 
TableKvOperationsTestBase {
         assertEquals(Collections.emptySet(), missedTypes);
 
         return new KeyValueViewImpl<>(
-                table,
+                internalTable,
                 new DummySchemaManagerImpl(schema),
                 schemaVersions,
                 keyMapper,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
index 0637167966..12d23ae078 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.schema.DefaultValueProvider.constantProvider;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -27,13 +30,23 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaMismatchException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
+import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.table.impl.TestTupleBuilder;
 import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.lang.IgniteException;
@@ -49,11 +62,7 @@ import org.junit.jupiter.api.function.Executable;
 public class RecordBinaryViewOperationsTest extends TableKvOperationsTestBase {
     @Test
     public void insert() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                SCHEMA_VERSION,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -76,13 +85,17 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
         assertNull(tbl.get(null, nonExistedTuple));
     }
 
-    @Test
-    public void upsert() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+    private static SchemaDescriptor schemaDescriptor() {
+        return new SchemaDescriptor(
+                SCHEMA_VERSION,
                 new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
                 new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
         );
+    }
+
+    @Test
+    public void upsert() {
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -107,11 +120,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void getAndUpsert() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -133,11 +142,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void remove() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -158,11 +163,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void removeExact() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -206,11 +207,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void replace() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -236,11 +233,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void replaceExact() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -268,7 +261,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
     @Test
     public void validateSchema() {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                SCHEMA_VERSION,
                 new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
                 new Column[]{
                         new Column("val".toUpperCase(), NativeTypes.INT64, 
true),
@@ -300,7 +293,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
     @Test
     public void defaultValues() {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                SCHEMA_VERSION,
                 new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
                 new Column[]{
                         new Column("val".toUpperCase(), NativeTypes.INT64, 
true, constantProvider(28L)),
@@ -327,11 +320,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void getAll() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -353,11 +342,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void upsertAllAfterInsertAll() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -395,11 +380,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void deleteVsDeleteExact() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -423,7 +404,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
     @Test
     public void getAndReplace() {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                SCHEMA_VERSION,
                 new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
                 new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT32, false)}
         );
@@ -448,7 +429,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
     @Test
     public void getAndDelete() {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                SCHEMA_VERSION,
                 new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
                 new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT32, false)}
         );
@@ -469,7 +450,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
     @Test
     public void deleteAll() {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                SCHEMA_VERSION,
                 new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
                 new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT32, false)}
         );
@@ -522,7 +503,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
     @Test
     public void deleteExact() {
         SchemaDescriptor schema = new SchemaDescriptor(
-                1,
+                SCHEMA_VERSION,
                 new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
                 new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT32, false)}
         );
@@ -575,11 +556,7 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
 
     @Test
     public void getAndReplaceVsGetAndUpsert() {
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("id".toUpperCase(), NativeTypes.INT64, 
false)},
-                new Column[]{new Column("val".toUpperCase(), 
NativeTypes.INT64, false)}
-        );
+        SchemaDescriptor schema = schemaDescriptor();
 
         RecordView<Tuple> tbl = createTable(schema).recordView();
 
@@ -600,6 +577,26 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
         assertNull(tbl.get(null, Tuple.create().set("id", 1L)));
     }
 
+    @Test
+    void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+        SchemaDescriptor schema = schemaDescriptor();
+        InternalTable internalTable = spy(createInternalTable(schema));
+
+        RecordView<Tuple> view = new RecordBinaryViewImpl(internalTable, new 
DummySchemaManagerImpl(schema), schemaVersions);
+
+        BinaryRow resultRow = new 
TupleMarshallerImpl(schema).marshal(Tuple.create().set("id", 1L).set("val", 
2L));
+
+        doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+                .doReturn(completedFuture(resultRow))
+                .when(internalTable).get(any(), any());
+
+        Tuple result = view.get(null, Tuple.create().set("id", 1L));
+
+        assertThat(result.longValue("val"), is(2L));
+
+        verify(internalTable, times(2)).get(any(), isNull());
+    }
+
     /**
      * Check tuples equality.
      *
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
index 589eb862c3..dd0f303ccb 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.type.NativeTypes.BOOLEAN;
 import static org.apache.ignite.internal.type.NativeTypes.BYTES;
 import static org.apache.ignite.internal.type.NativeTypes.DATE;
@@ -32,12 +34,20 @@ import static 
org.apache.ignite.internal.type.NativeTypes.time;
 import static org.apache.ignite.internal.type.NativeTypes.timestamp;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
@@ -48,8 +58,11 @@ import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.RecordMarshallerImpl;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.type.NativeTypeSpec;
@@ -58,6 +71,7 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.mapper.Mapper;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -67,6 +81,52 @@ public class RecordViewOperationsTest extends 
TableKvOperationsTestBase {
 
     private final Random rnd = new Random();
 
+    private final Column[] valCols = {
+            new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN, false),
+            new Column("primitiveByteCol".toUpperCase(), INT8, false),
+            new Column("primitiveShortCol".toUpperCase(), INT16, false),
+            new Column("primitiveIntCol".toUpperCase(), INT32, false),
+            new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
+            new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
+
+            new Column("booleanCol".toUpperCase(), BOOLEAN, true),
+            new Column("byteCol".toUpperCase(), INT8, true),
+            new Column("shortCol".toUpperCase(), INT16, true),
+            new Column("intCol".toUpperCase(), INT32, true),
+            new Column("longCol".toUpperCase(), INT64, true),
+            new Column("nullLongCol".toUpperCase(), INT64, true),
+            new Column("floatCol".toUpperCase(), FLOAT, true),
+            new Column("doubleCol".toUpperCase(), DOUBLE, true),
+
+            new Column("dateCol".toUpperCase(), DATE, true),
+            new Column("timeCol".toUpperCase(), time(0), true),
+            new Column("dateTimeCol".toUpperCase(), datetime(6), true),
+            new Column("timestampCol".toUpperCase(), timestamp(6), true),
+
+            new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
+            new Column("bitmaskCol".toUpperCase(), NativeTypes.bitmaskOf(42), 
true),
+            new Column("stringCol".toUpperCase(), STRING, true),
+            new Column("nullBytesCol".toUpperCase(), BYTES, true),
+            new Column("bytesCol".toUpperCase(), BYTES, true),
+            new Column("numberCol".toUpperCase(), NativeTypes.numberOf(12), 
true),
+            new Column("decimalCol".toUpperCase(), NativeTypes.decimalOf(19, 
3), true),
+    };
+
+    private final SchemaDescriptor schema = new SchemaDescriptor(
+            SCHEMA_VERSION,
+            new Column[]{new Column("primitiveLongCol".toUpperCase(), 
NativeTypes.INT64, false)},
+            valCols
+    );
+
+    private final Mapper<TestObjectWithAllTypes> recMapper = 
Mapper.of(TestObjectWithAllTypes.class);
+
+    private DummyInternalTableImpl internalTable;
+
+    @BeforeEach
+    void createInternalTable() {
+        internalTable = spy(createInternalTable(schema));
+    }
+
     @Test
     public void upsert() {
         final TestObjectWithAllTypes key = key(rnd);
@@ -287,6 +347,26 @@ public class RecordViewOperationsTest extends 
TableKvOperationsTestBase {
         assertThat(res, contains(val1, null, val3));
     }
 
+    @Test
+    void retriesOnInternalSchemaVersionMismatchException() throws Exception {
+        RecordView<TestObjectWithAllTypes> view = recordView();
+
+        TestObjectWithAllTypes expectedRecord = 
TestObjectWithAllTypes.randomObject(rnd);
+
+        BinaryRow resultRow = new RecordMarshallerImpl<>(schema, recMapper)
+                .marshal(expectedRecord);
+
+        doReturn(failedFuture(new InternalSchemaVersionMismatchException()))
+                .doReturn(completedFuture(resultRow))
+                .when(internalTable).get(any(), any());
+
+        TestObjectWithAllTypes result = view.get(null, new 
TestObjectWithAllTypes());
+
+        assertThat(result, is(equalTo(expectedRecord)));
+
+        verify(internalTable, times(2)).get(any(), isNull());
+    }
+
     /**
      * Creates RecordView.
      */
@@ -297,47 +377,6 @@ public class RecordViewOperationsTest extends 
TableKvOperationsTestBase {
 
         
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class, 
RETURNS_DEEP_STUBS));
 
-        Mapper<TestObjectWithAllTypes> recMapper = 
Mapper.of(TestObjectWithAllTypes.class);
-
-        Column[] valCols = {
-                new Column("primitiveBooleanCol".toUpperCase(), BOOLEAN, 
false),
-                new Column("primitiveByteCol".toUpperCase(), INT8, false),
-                new Column("primitiveShortCol".toUpperCase(), INT16, false),
-                new Column("primitiveIntCol".toUpperCase(), INT32, false),
-                new Column("primitiveFloatCol".toUpperCase(), FLOAT, false),
-                new Column("primitiveDoubleCol".toUpperCase(), DOUBLE, false),
-
-                new Column("booleanCol".toUpperCase(), BOOLEAN, true),
-                new Column("byteCol".toUpperCase(), INT8, true),
-                new Column("shortCol".toUpperCase(), INT16, true),
-                new Column("intCol".toUpperCase(), INT32, true),
-                new Column("longCol".toUpperCase(), INT64, true),
-                new Column("nullLongCol".toUpperCase(), INT64, true),
-                new Column("floatCol".toUpperCase(), FLOAT, true),
-                new Column("doubleCol".toUpperCase(), DOUBLE, true),
-
-                new Column("dateCol".toUpperCase(), DATE, true),
-                new Column("timeCol".toUpperCase(), time(0), true),
-                new Column("dateTimeCol".toUpperCase(), datetime(6), true),
-                new Column("timestampCol".toUpperCase(), timestamp(6), true),
-
-                new Column("uuidCol".toUpperCase(), NativeTypes.UUID, true),
-                new Column("bitmaskCol".toUpperCase(), 
NativeTypes.bitmaskOf(42), true),
-                new Column("stringCol".toUpperCase(), STRING, true),
-                new Column("nullBytesCol".toUpperCase(), BYTES, true),
-                new Column("bytesCol".toUpperCase(), BYTES, true),
-                new Column("numberCol".toUpperCase(), 
NativeTypes.numberOf(12), true),
-                new Column("decimalCol".toUpperCase(), 
NativeTypes.decimalOf(19, 3), true),
-        };
-
-        SchemaDescriptor schema = new SchemaDescriptor(
-                1,
-                new Column[]{new Column("primitiveLongCol".toUpperCase(), 
NativeTypes.INT64, false)},
-                valCols
-        );
-
-        DummyInternalTableImpl table = createInternalTable(schema);
-
         // Validate all types are tested.
         Set<NativeTypeSpec> testedTypes = Arrays.stream(valCols).map(c -> 
c.type().spec())
                 .collect(Collectors.toSet());
@@ -347,7 +386,7 @@ public class RecordViewOperationsTest extends 
TableKvOperationsTestBase {
         assertEquals(Collections.emptySet(), missedTypes);
 
         return new RecordViewImpl<>(
-                table,
+                internalTable,
                 new DummySchemaManagerImpl(schema),
                 schemaVersions,
                 recMapper
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 41d46aa1b2..f9f2d1f74a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -206,7 +206,11 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
         TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(TABLE_ID, PART_ID, TEST_MV_PARTITION_STORAGE);
 
         CatalogService catalogService = mock(CatalogService.class);
-        when(catalogService.table(anyInt(), 
anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
+        CatalogTableDescriptor tableDescriptor = 
mock(CatalogTableDescriptor.class);
+        
when(tableDescriptor.tableVersion()).thenReturn(schemaDescriptor.version());
+
+        when(catalogService.table(anyInt(), 
anyLong())).thenReturn(tableDescriptor);
 
         ClusterNode localNode = mock(ClusterNode.class);
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index bb1c861bfd..5b56fc8018 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUS
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
 import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
@@ -143,6 +144,7 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnly
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
 import 
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
+import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
@@ -1907,13 +1909,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         RwListenerInvocation invocation = null;
 
         if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doSingleRowPkRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
-            };
+            invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
         } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
-            };
+            invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
         } else {
             fail("Uncovered type: " + requestType);
         }
@@ -1988,13 +1986,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         RwListenerInvocation invocation = null;
 
         if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doMultiRowPkRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
-            };
+            invocation = (targetTxId, key)
+                    -> doMultiRowPkRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
         } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doMultiRowRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
-            };
+            invocation = (targetTxId, key)
+                    -> doMultiRowRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
         } else {
             fail("Uncovered type: " + requestType);
         }
@@ -2054,13 +2050,9 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         RwListenerInvocation invocation = null;
 
         if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doSingleRowPkRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
-            };
+            invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
         } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
-            };
+            invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
         } else {
             fail("Uncovered type: " + requestType);
         }
@@ -2105,13 +2097,11 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         RwListenerInvocation invocation = null;
 
         if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doMultiRowPkRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
-            };
+            invocation = (targetTxId, key)
+                    -> doMultiRowPkRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
         } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
-            invocation = (targetTxId, key) -> {
-                return doMultiRowRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
-            };
+            invocation = (targetTxId, key)
+                    -> doMultiRowRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
         } else {
             fail("Uncovered type: " + requestType);
         }
@@ -2256,6 +2246,123 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         assertThat("The transaction must have been aborted", committed.get(), 
is(false));
     }
 
+    @CartesianTest
+    @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
+    void singleRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType 
requestType, boolean onExistingRow, boolean full) {
+        RwListenerInvocation invocation = null;
+
+        if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
+            invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
+        } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
+            invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, 
marshalKeyOrKeyValue(requestType, key), requestType, full);
+        } else {
+            fail("Uncovered type: " + requestType);
+        }
+
+        testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, 
invocation);
+    }
+
+    private void testRwOperationFailsIfSchemaVersionMismatchesTx(boolean 
onExistingRow, RwListenerInvocation listenerInvocation) {
+        TestKey key = nextKey();
+
+        if (onExistingRow) {
+            upsertInNewTxFor(key);
+        }
+
+        UUID txId = newTxId();
+
+        makeSchemaBeNextVersion();
+
+        CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+        assertThat(future, 
willThrow(InternalSchemaVersionMismatchException.class));
+    }
+
+    private void makeSchemaBeNextVersion() {
+        CatalogTableDescriptor tableVersion2 = 
mock(CatalogTableDescriptor.class);
+        when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+        when(catalogService.table(eq(TABLE_ID), 
anyLong())).thenReturn(tableVersion2);
+    }
+
+    @CartesianTest
+    @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
+    void multiRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType 
requestType, boolean onExistingRow, boolean full) {
+        RwListenerInvocation invocation = null;
+
+        if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
+            invocation = (targetTxId, key)
+                    -> doMultiRowPkRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+        } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
+            invocation = (targetTxId, key)
+                    -> doMultiRowRequest(targetTxId, 
List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+        } else {
+            fail("Uncovered type: " + requestType);
+        }
+
+        testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, 
invocation);
+    }
+
+    @CartesianTest
+    void replaceRequestFailsIfSchemaVersionMismatchesTx(
+            @Values(booleans = {false, true}) boolean onExistingRow,
+            @Values(booleans = {false, true}) boolean full
+    ) {
+        testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, 
(targetTxId, key) -> {
+            return doReplaceRequest(
+                    targetTxId,
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    full
+            );
+        });
+    }
+
+    @CartesianTest
+    void singleRowRoGetFailsIfSchemaVersionMismatchesTx(
+            @Values(booleans = {false, true}) boolean direct,
+            @Values(booleans = {false, true}) boolean onExistingRow
+    ) {
+        testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, 
(targetTxId, readTimestamp, key) -> {
+            if (direct) {
+                return doReadOnlyDirectSingleGet(marshalQuietly(key, 
kvMarshaller));
+            } else {
+                return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller), 
readTimestamp);
+            }
+        });
+    }
+
+    private void testRoOperationFailsIfSchemaVersionMismatchesTx(boolean 
onExistingRow, RoListenerInvocation listenerInvocation) {
+        TestKey key = nextKey();
+
+        if (onExistingRow) {
+            upsertInNewTxFor(key);
+        }
+
+        UUID txId = newTxId();
+        HybridTimestamp readTs = clock.now();
+
+        makeSchemaBeNextVersion();
+
+        CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, 
key);
+
+        assertThat(future, 
willThrow(InternalSchemaVersionMismatchException.class));
+    }
+
+    @CartesianTest
+    void multiRowRoGetFailsIfSchemaVersionMismatchesTx(
+            @Values(booleans = {false, true}) boolean direct,
+            @Values(booleans = {false, true}) boolean onExistingRow
+    ) {
+        testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, 
(targetTxId, readTimestamp, key) -> {
+            if (direct) {
+                return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, 
kvMarshaller)));
+            } else {
+                return doReadOnlyMultiGet(List.of(marshalQuietly(key, 
kvMarshaller)), readTimestamp);
+            }
+        });
+    }
+
     private UUID newTxId() {
         return transactionIdFor(clock.now());
     }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a4defff961..6377f9a7ee 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -142,6 +142,8 @@ import org.junit.jupiter.api.TestInfo;
  * Class that allows to mock a cluster for transaction tests' purposes.
  */
 public class ItTxTestCluster {
+    private static final int SCHEMA_VERSION = 1;
+
     private final List<NetworkAddress> localAddresses;
 
     private final NodeFinder nodeFinder;
@@ -407,7 +409,11 @@ public class ItTxTestCluster {
      */
     public TableImpl startTable(String tableName, int tableId, 
SchemaDescriptor schemaDescriptor) throws Exception {
         CatalogService catalogService = mock(CatalogService.class);
-        lenient().when(catalogService.table(anyInt(), 
anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
+        CatalogTableDescriptor tableDescriptor = 
mock(CatalogTableDescriptor.class);
+        when(tableDescriptor.tableVersion()).thenReturn(SCHEMA_VERSION);
+
+        lenient().when(catalogService.table(anyInt(), 
anyLong())).thenReturn(tableDescriptor);
 
         List<Set<Assignment>> calculatedAssignments = 
AffinityUtils.calculateAssignments(
                 cluster.stream().map(node -> 
node.topologyService().localMember().name()).collect(toList()),
@@ -613,7 +619,7 @@ public class ItTxTestCluster {
                 ),
                 new DummySchemaManagerImpl(schemaDescriptor),
                 clientTxManager.lockManager(),
-                new ConstantSchemaVersions(1)
+                new ConstantSchemaVersions(SCHEMA_VERSION)
         );
     }
 

Reply via email to