This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 86f27a08d7 IGNITE-19824 Implicit RO should be used in implicit single 
gets (#2641)
86f27a08d7 is described below

commit 86f27a08d7fea5c1a38f2e971962bc65b93c3d56
Author: Vladislav Pyatkov <vldpyat...@gmail.com>
AuthorDate: Thu Oct 5 11:19:30 2023 +0300

    IGNITE-19824 Implicit RO should be used in implicit single gets (#2641)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |   2 +-
 .../Apache.Ignite.Tests/Table/DataStreamerTests.cs |  10 +-
 .../ignite/internal/replicator/ReplicaManager.java |  24 +-
 .../message/ReadOnlyDirectReplicaRequest.java      |  34 +++
 .../ignite/distributed/ItTablePersistenceTest.java |  12 +-
 .../table/distributed/TableMessageGroup.java       |  10 +
 .../ReadOnlyDirectMultiRowReplicaRequest.java      |  29 +++
 .../ReadOnlyDirectSingleRowReplicaRequest.java     |  30 +++
 .../replicator/PartitionReplicaListener.java       |  67 ++++++
 .../distributed/storage/InternalTableImpl.java     | 254 ++++++++++++++++++---
 .../RepeatedFinishReadWriteTransactionTest.java    |   2 +-
 .../ignite/internal/table/TxAbstractTest.java      |  63 +++++
 .../ignite/internal/tx/InternalTransaction.java    |  11 +
 .../org/apache/ignite/internal/tx/TxManager.java   |   4 +-
 .../tx/impl/IgniteAbstractTransactionImpl.java     |  12 +-
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  |  29 ++-
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  10 +-
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   8 +-
 .../apache/ignite/internal/tx/TxManagerTest.java   |  11 +-
 .../tx/impl/ReadOnlyTransactionImplTest.java       |   3 +-
 20 files changed, 537 insertions(+), 88 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 836b515d5f..3ea373782c 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -149,7 +149,7 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker) {
+    public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker, boolean readOnly) {
         throw new UnsupportedOperationException("Not expected to be called 
here");
     }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 2ace822027..d53774938b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -100,15 +100,7 @@ public class DataStreamerTests : IgniteTestsBase
 
         if (enabled)
         {
-            // TODO IGNITE-19824: Remove read-only TX workaround.
-            // Currently, there might be an exception due to false-positive tx 
conflict detection, which is fixed by a  read-only tx.
-            await TestUtils.WaitForConditionAsync(
-                async () =>
-                {
-                    await using var roTx = await 
Client.Transactions.BeginAsync(new(ReadOnly: true));
-                    return await TupleView.ContainsKeyAsync(roTx, GetTuple(0));
-                },
-                3000);
+            TestUtils.WaitForCondition(() => TupleView.ContainsKeyAsync(null, 
GetTuple(0)).GetAwaiter().GetResult(), 3000);
         }
         else
         {
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index c5599f9639..b159a06bc6 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -54,6 +54,7 @@ import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableExcepti
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
 import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -231,12 +232,12 @@ public class ReplicaManager implements IgniteComponent {
                 return;
             }
 
-            HybridTimestamp sendTimestamp = null;
-
             if (requestTimestamp != null) {
-                sendTimestamp = clock.update(requestTimestamp);
+                clock.update(requestTimestamp);
             }
 
+            boolean sendTimestamp = request instanceof TimestampAware || 
request instanceof ReadOnlyDirectReplicaRequest;
+
             // replicaFut is always completed here.
             Replica replica = replicaFut.join();
 
@@ -245,16 +246,15 @@ public class ReplicaManager implements IgniteComponent {
 
             CompletableFuture<?> result = replica.processRequest(request, 
senderId);
 
-            HybridTimestamp finalSendTimestamp = sendTimestamp;
             result.handle((res, ex) -> {
                 NetworkMessage msg;
 
                 if (ex == null) {
-                    msg = prepareReplicaResponse(finalSendTimestamp, res);
+                    msg = prepareReplicaResponse(sendTimestamp, res);
                 } else {
                     LOG.warn("Failed to process replica request [request={}]", 
ex, request);
 
-                    msg = prepareReplicaErrorResponse(finalSendTimestamp, ex);
+                    msg = prepareReplicaErrorResponse(sendTimestamp, ex);
                 }
 
                 clusterNetSvc.messagingService().respond(senderConsistentId, 
msg, correlationId);
@@ -584,12 +584,12 @@ public class ReplicaManager implements IgniteComponent {
     /**
      * Prepares replica response.
      */
-    private NetworkMessage prepareReplicaResponse(@Nullable HybridTimestamp 
sendTimestamp, Object result) {
-        if (sendTimestamp != null) {
+    private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, 
Object result) {
+        if (sendTimestamp) {
             return REPLICA_MESSAGES_FACTORY
                     .timestampAwareReplicaResponse()
                     .result(result)
-                    .timestampLong(sendTimestamp.longValue())
+                    .timestampLong(clock.nowLong())
                     .build();
         } else {
             return REPLICA_MESSAGES_FACTORY
@@ -602,12 +602,12 @@ public class ReplicaManager implements IgniteComponent {
     /**
      * Prepares replica error response.
      */
-    private NetworkMessage prepareReplicaErrorResponse(@Nullable 
HybridTimestamp sendTimestamp, Throwable ex) {
-        if (sendTimestamp != null) {
+    private NetworkMessage prepareReplicaErrorResponse(boolean sendTimestamp, 
Throwable ex) {
+        if (sendTimestamp) {
             return REPLICA_MESSAGES_FACTORY
                     .errorTimestampAwareReplicaResponse()
                     .throwable(ex)
-                    .timestampLong(sendTimestamp.longValue())
+                    .timestampLong(clock.nowLong())
                     .build();
         } else {
             return REPLICA_MESSAGES_FACTORY
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java
new file mode 100644
index 0000000000..29ca899ee2
--- /dev/null
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+/**
+ * Read-only request that sand to a specific node directly. This request has 
no read timestamp as other read-only requests because the
+ * timestamp is calculated on the replica side. The linearization is 
guaranteed by sending the request directly to the primary node.
+ *
+ * <p>The requests are used to implement an implicit read-only transaction for 
a single partition.
+ */
+public interface ReadOnlyDirectReplicaRequest extends ReplicaRequest {
+    /**
+     * Gets an enlistment consistency token.
+     * The token is used to check that the lease is still actual while the 
message goes to the replica.
+     *
+     * @return Enlistment consistency token.
+     */
+    Long enlistmentConsistencyToken();
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 2986aa9615..542d750c6c 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -85,8 +85,10 @@ import 
org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
 import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -242,10 +244,10 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
         when(partitionReplicaListener.invoke(any(), 
any())).thenAnswer(invocationOnMock -> {
             ReplicaRequest req = invocationOnMock.getArgument(0);
 
-            if (req instanceof ReadWriteSingleRowPkReplicaRequest) {
-                ReadWriteSingleRowPkReplicaRequest req0 = 
(ReadWriteSingleRowPkReplicaRequest) req;
+            if (req instanceof ReadWriteSingleRowPkReplicaRequest || req 
instanceof ReadOnlyDirectSingleRowReplicaRequest) {
+                SingleRowPkReplicaRequest req0 = (SingleRowPkReplicaRequest) 
req;
 
-                if (req0.requestType() == RequestType.RW_GET) {
+                if (req0.requestType() == RequestType.RW_GET || 
req0.requestType() == RequestType.RO_GET) {
                     List<JraftServerImpl> servers = servers();
 
                     JraftServerImpl leader = servers.stream()
@@ -274,8 +276,10 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
 
                     return completedFuture(row);
                 } else if (req0.requestType() == RequestType.RW_DELETE) {
+                    ReadWriteSingleRowPkReplicaRequest rwReq = 
(ReadWriteSingleRowPkReplicaRequest) req0;
+
                     UpdateCommand cmd = msgFactory.updateCommand()
-                            .txId(req0.transactionId())
+                            .txId(rwReq.transactionId())
                             .tablePartitionId(tablePartitionId(new 
TablePartitionId(1, 0)))
                             .rowUuid(new RowId(0).uuid())
                             .safeTimeLong(hybridClock.nowLong())
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 78604d0c17..38ddd35f0c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -165,6 +165,16 @@ public interface TableMessageGroup {
     /** Message type for {@link BuildIndexReplicaRequest}. */
     short BUILD_INDEX_REPLICA_REQUEST = 21;
 
+    /**
+     * Message type for {@link ReadOnlyDirectSingleRowReplicaRequest}.
+     */
+    short RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST = 22;
+
+    /**
+     * Message type for {@link ReadOnlyDirectMultiRowReplicaRequest}.
+     */
+    short RO_DIRECT_MULTI_ROW_REPLICA_REQUEST = 23;
+
     /**
      * Message types for Table module RAFT commands.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java
new file mode 100644
index 0000000000..70d2e22917
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication.request;
+
+import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Read only direct multi row replica request.
+ */
+@Transferable(TableMessageGroup.RO_DIRECT_MULTI_ROW_REPLICA_REQUEST)
+public interface ReadOnlyDirectMultiRowReplicaRequest extends 
MultipleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java
new file mode 100644
index 0000000000..c4e4d59817
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication.request;
+
+import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Read only direct node single row replica request.
+ * The type of RO request never waits and is executed at the current node 
timestamp.
+ */
+@Transferable(TableMessageGroup.RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST)
+public interface ReadOnlyDirectSingleRowReplicaRequest extends 
SingleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c1f667e0c4..dad2205006 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -91,6 +91,7 @@ import 
org.apache.ignite.internal.replicator.exception.ReplicationException;
 import 
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
 import 
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import 
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
@@ -125,6 +126,8 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryRo
 import 
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
 import 
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.CommittableTxRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
+import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
 import 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -448,6 +451,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return 
processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, 
isPrimary);
         } else if (request instanceof BuildIndexReplicaRequest) {
             return 
raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
+        } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
+            return 
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) 
request);
+        } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
+            return 
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) 
request);
         } else {
             throw new UnsupportedReplicaRequestException(request.getClass());
         }
@@ -1657,6 +1664,44 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return row.tupleSlice().compareTo(row2.tupleSlice()) == 0;
     }
 
+    /**
+     * Processes multiple entries direct request for read only transaction.
+     *
+     * @param request Read only multiple entries request.
+     * @return Result future.
+     */
+    private CompletableFuture<List<BinaryRow>> 
processReadOnlyDirectMultiEntryAction(
+            ReadOnlyDirectMultiRowReplicaRequest request
+    ) {
+        List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
+        HybridTimestamp readTimestamp = hybridClock.now();
+
+        if (request.requestType() != RequestType.RO_GET_ALL) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    format("Unknown single request [actionType={}]", 
request.requestType()));
+        }
+
+        var resolutionFuts = new 
ArrayList<CompletableFuture<BinaryRow>>(primaryKeys.size());
+
+        for (BinaryTuple primaryKey : primaryKeys) {
+            CompletableFuture<BinaryRow> fut = 
resolveRowByPkForReadOnly(primaryKey, readTimestamp);
+
+            resolutionFuts.add(fut);
+        }
+
+        return allOf(resolutionFuts.toArray(new 
CompletableFuture[0])).thenApply(unused1 -> {
+            var result = new ArrayList<BinaryRow>(resolutionFuts.size());
+
+            for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
+                BinaryRow resolvedReadResult = resolutionFut.join();
+
+                result.add(resolvedReadResult);
+            }
+
+            return result;
+        });
+    }
+
     /**
      * Precesses multi request.
      *
@@ -2034,6 +2079,24 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
     }
 
+    /**
+     * Processes single entry direct request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<BinaryRow> 
processReadOnlyDirectSingleEntryAction(ReadOnlyDirectSingleRowReplicaRequest 
request) {
+        BinaryTuple primaryKey = resolvePk(request.primaryKey());
+        HybridTimestamp readTimestamp = hybridClock.now();
+
+        if (request.requestType() != RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    format("Unknown single request [actionType={}]", 
request.requestType()));
+        }
+
+        return resolveRowByPkForReadOnly(primaryKey, readTimestamp);
+    }
+
     /**
      * Precesses single request.
      *
@@ -2464,6 +2527,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof TxFinishReplicaRequest) {
             expectedTerm = ((TxFinishReplicaRequest) request).term();
 
+            assert expectedTerm != null;
+        } else if (request instanceof ReadOnlyDirectReplicaRequest) {
+            expectedTerm = ((ReadOnlyDirectReplicaRequest) 
request).enlistmentConsistencyToken();
+
             assert expectedTerm != null;
         } else {
             expectedTerm = null;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 1fca0a43e8..2718d6656b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -40,6 +40,7 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -49,6 +50,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -95,6 +97,7 @@ import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -365,7 +368,7 @@ public class InternalTableImpl implements InternalTable {
     }
 
     private InternalTransaction startImplicitTxIfNeeded(@Nullable 
InternalTransaction tx) {
-        return tx == null ? 
txManager.beginImplicit(observableTimestampTracker) : tx;
+        return tx == null ? 
txManager.beginImplicit(observableTimestampTracker, false) : tx;
     }
 
     /**
@@ -549,28 +552,173 @@ public class InternalTableImpl implements InternalTable {
         }).thenCompose(x -> x);
     }
 
+    /**
+     * Evaluates the single-row request to the cluster for a read-only 
single-partition transaction.
+     *
+     * @param row Binary row.
+     * @param op Operation.
+     * @param <R> Result type.
+     * @return The future.
+     */
+    private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(
+            BinaryRowEx row,
+            BiFunction<ReplicationGroupId, Long, ReplicaRequest> op
+    ) {
+        InternalTransaction tx = 
txManager.beginImplicit(observableTimestampTracker, true);
+
+        int partId = partitionId(row);
+
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
+                tablePartitionId,
+                tx.startTimestamp(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                TimeUnit.SECONDS
+        );
+
+        CompletableFuture<R>  fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
+            try {
+                ClusterNode node = 
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+
+                if (node == null) {
+                    throw new TransactionException(REPLICA_UNAVAILABLE_ERR, 
"Failed to resolve the primary replica node [consistentId="
+                            + primaryReplica.getLeaseholder() + ']');
+                }
+
+                return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
+            } catch (Throwable e) {
+                throw new TransactionException(
+                        INTERNAL_ERR,
+                        IgniteStringFormatter.format(
+                                "Failed to invoke the replica request 
[tableName={}, partId={}]",
+                                tableName,
+                                partId
+                        ),
+                        e
+                );
+            }
+        });
+
+        return postEvaluate(fut, tx);
+    }
+
+    /**
+     * Evaluates the multi-row request to the cluster for a read-only 
single-partition transaction.
+     *
+     * @param rows Rows.
+     * @param op Replica requests factory.
+     * @param <R> Result type.
+     * @return The future.
+     */
+    private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(
+            Collection<BinaryRowEx> rows,
+            BiFunction<ReplicationGroupId, Long, ReplicaRequest> op
+    ) {
+        InternalTransaction tx = 
txManager.beginImplicit(observableTimestampTracker, true);
+
+        int partId = partitionId(rows.iterator().next());
+
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
+
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
+                tablePartitionId,
+                tx.startTimestamp(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                TimeUnit.SECONDS
+        );
+
+        CompletableFuture<R>  fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
+            try {
+                ClusterNode node = 
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+
+                if (node == null) {
+                    throw new TransactionException(REPLICA_UNAVAILABLE_ERR, 
"Failed to resolve the primary replica node [consistentId="
+                            + primaryReplica.getLeaseholder() + ']');
+                }
+
+                return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
+            } catch (Throwable e) {
+                throw new TransactionException(
+                        INTERNAL_ERR,
+                        IgniteStringFormatter.format(
+                                "Failed to invoke the replica request 
[tableName={}, partId={}]",
+                                tableName,
+                                partId
+                        ),
+                        e
+                );
+            }
+        });
+
+        return postEvaluate(fut, tx);
+    }
+
+    /**
+     * Performs post evaluate operation.
+     *
+     * @param fut The future.
+     * @param tx The transaction.
+     * @param <R> Operation return type.
+     * @return The future.
+     */
+    private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut, 
InternalTransaction tx) {
+        return fut.handle((BiFunction<R, Throwable, CompletableFuture<R>>) (r, 
e) -> {
+            if (e != null) {
+                RuntimeException e0 = wrapReplicationException(e);
+
+                return tx.finish(false, clock.now())
+                        .handle((ignored, err) -> {
+
+                            if (err != null) {
+                                e0.addSuppressed(err);
+                            }
+                            throw e0;
+                        }); // Preserve failed state.
+            }
+
+            return tx.finish(true, clock.now())
+                    .exceptionally(ex -> {
+                        throw wrapReplicationException(ex);
+                    })
+                    .thenApply(ignored -> r);
+        }).thenCompose(x -> x);
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, 
InternalTransaction tx) {
-        if (tx != null && tx.isReadOnly()) {
-            return evaluateReadOnlyRecipientNode(partitionId(keyRow))
-                    .thenCompose(recipientNode -> get(keyRow, 
tx.readTimestamp(), recipientNode));
-        } else {
-            return enlistInTx(
+        if (tx == null) {
+            return evaluateReadOnlyPrimaryNode(
                     keyRow,
-                    tx,
-                    (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+                    (groupId, consistencyToken) -> 
tableMessagesFactory.readOnlyDirectSingleRowReplicaRequest()
                             .groupId(groupId)
+                            .enlistmentConsistencyToken(consistencyToken)
                             .primaryKey(keyRow.tupleSlice())
-                            
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                            .transactionId(txo.id())
-                            .term(term)
-                            .requestType(RequestType.RW_GET)
-                            .timestampLong(clock.nowLong())
-                            .full(tx == null)
+                            .requestType(RequestType.RO_GET)
                             .build()
             );
         }
+
+        if (tx.isReadOnly()) {
+            return evaluateReadOnlyRecipientNode(partitionId(keyRow))
+                    .thenCompose(recipientNode -> get(keyRow, 
tx.readTimestamp(), recipientNode));
+        }
+
+        return enlistInTx(
+                keyRow,
+                tx,
+                (txo, groupId, term) -> 
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+                        .groupId(groupId)
+                        .primaryKey(keyRow.tupleSlice())
+                        
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+                        .transactionId(txo.id())
+                        .term(term)
+                        .requestType(RequestType.RW_GET)
+                        .timestampLong(clock.nowLong())
+                        .full(tx == null)
+                        .build()
+        );
     }
 
     @Override
@@ -591,35 +739,69 @@ public class InternalTableImpl implements InternalTable {
         );
     }
 
+    /**
+     * Checks that the batch of rows belongs to a single partition.
+     *
+     * @param rows Batch of rows.
+     * @return If all rows belong to one partition, the method returns true; 
otherwise, it returns false.
+     */
+    private boolean isSinglePartitionBatch(Collection<BinaryRowEx> rows) {
+        Iterator<BinaryRowEx> rowIterator = rows.iterator();
+
+        int partId = partitionId(rowIterator.next());
+
+        while (rowIterator.hasNext()) {
+            BinaryRowEx row = rowIterator.next();
+
+            if (partId != partitionId(row)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> 
keyRows, InternalTransaction tx) {
-        if (tx != null && tx.isReadOnly()) {
-            BinaryRowEx firstRow = keyRows.iterator().next();
+        if (CollectionUtils.nullOrEmpty(keyRows)) {
+            return completedFuture(Collections.emptyList());
+        }
 
-            if (firstRow == null) {
-                return completedFuture(Collections.emptyList());
-            } else {
-                return evaluateReadOnlyRecipientNode(partitionId(firstRow))
-                        .thenCompose(recipientNode -> getAll(keyRows, 
tx.readTimestamp(), recipientNode));
-            }
-        } else {
-            return enlistInTx(
+        if (tx == null && isSinglePartitionBatch(keyRows)) {
+            return evaluateReadOnlyPrimaryNode(
                     keyRows,
-                    tx,
-                    (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
+                    (groupId, consistencyToken) -> 
tableMessagesFactory.readOnlyDirectMultiRowReplicaRequest()
                             .groupId(groupId)
-                            .primaryKeys(serializePrimaryKeys(keyRows0))
-                            
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
-                            .transactionId(txo.id())
-                            .term(term)
-                            .requestType(RequestType.RW_GET_ALL)
-                            .timestampLong(clock.nowLong())
-                            .full(full)
-                            .build(),
-                    
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
+                            .enlistmentConsistencyToken(consistencyToken)
+                            .primaryKeys(serializePrimaryKeys(keyRows))
+                            .requestType(RequestType.RO_GET_ALL)
+                            .build()
             );
         }
+
+        if (tx != null && tx.isReadOnly()) {
+            BinaryRowEx firstRow = keyRows.iterator().next();
+
+            return evaluateReadOnlyRecipientNode(partitionId(firstRow))
+                        .thenCompose(recipientNode -> getAll(keyRows, 
tx.readTimestamp(), recipientNode));
+        }
+
+        return enlistInTx(
+                keyRows,
+                tx,
+                (keyRows0, txo, groupId, term, full) -> 
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
+                        .groupId(groupId)
+                        .primaryKeys(serializePrimaryKeys(keyRows0))
+                        
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+                        .transactionId(txo.id())
+                        .term(term)
+                        .requestType(RequestType.RW_GET_ALL)
+                        .timestampLong(clock.nowLong())
+                        .full(full)
+                        .build(),
+                InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
+        );
     }
 
     /** {@inheritDoc} */
@@ -713,7 +895,7 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int 
partition) {
-        InternalTransaction tx = 
txManager.beginImplicit(observableTimestampTracker);
+        InternalTransaction tx = 
txManager.beginImplicit(observableTimestampTracker, false);
         TablePartitionId partGroupId = new TablePartitionId(tableId, 
partition);
 
         CompletableFuture<Void> fut = enlistWithRetry(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
index 863d01f5e2..3a0b2daf0f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
@@ -246,7 +246,7 @@ public class RepeatedFinishReadWriteTransactionTest extends 
BaseIgniteAbstractTe
         }
 
         @Override
-        public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker) {
+        public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker, boolean readOnly) {
             throw new UnsupportedOperationException("Not implemented");
         }
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index d5f9fa98d1..3ac7e0ee00 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -1958,6 +1959,68 @@ public abstract class TxAbstractTest extends 
IgniteAbstractTest {
         tx.commit();
     }
 
+    @Test
+    public void testSingleGet() {
+        var accountRecordsView = accounts.recordView();
+
+        accountRecordsView.upsert(null, makeValue(1, 100.));
+
+        Transaction tx1 = igniteTransactions.begin();
+        Transaction tx2 = igniteTransactions.begin();
+
+        accountRecordsView.upsert(tx1, makeValue(1, 200.));
+
+        assertThrows(TransactionException.class, () -> 
accountRecordsView.get(tx2, makeKey(1)));
+
+        assertEquals(100., accountRecordsView.get(null, 
makeKey(1)).doubleValue("balance"));
+
+        tx1.commit();
+
+        assertEquals(200., accountRecordsView.get(null, 
makeKey(1)).doubleValue("balance"));
+    }
+
+    @Test
+    public void testBatchSinglePartitionGet() throws Exception {
+        var accountRecordsView = accounts.recordView();
+
+        var marshaller = new TupleMarshallerImpl(accounts.schemaView());
+
+        int partId = 
accounts.internalTable().partition(marshaller.marshalKey(makeKey(0)));
+
+        ArrayList<Integer> keys = new ArrayList<>(10);
+        keys.add(0);
+
+        for (int i = 1; i < 10_000 && keys.size() < 10; i++) {
+            var p = 
accounts.internalTable().partition(marshaller.marshalKey(makeKey(i)));
+
+            if (p == partId) {
+                keys.add(i);
+            }
+        }
+
+        log.info("A batch of keys for a single partition is found [partId={}, 
keys{}]", partId, keys);
+
+        accountRecordsView.upsertAll(null, keys.stream().map(k -> makeValue(k, 
100.)).collect(toList()));
+
+        Transaction tx1 = igniteTransactions.begin();
+        Transaction tx2 = igniteTransactions.begin();
+
+        accountRecordsView.upsertAll(tx1, keys.stream().map(k -> makeValue(k, 
200.)).collect(toList()));
+
+        assertThrows(TransactionException.class,
+                () -> accountRecordsView.getAll(tx2, keys.stream().map(k -> 
makeKey(k)).collect(toList())));
+
+        for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k 
-> makeKey(k)).collect(toList()))) {
+            assertEquals(100., tuple.doubleValue("balance"));
+        }
+
+        tx1.commit();
+
+        for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k 
-> makeKey(k)).collect(toList()))) {
+            assertEquals(200., tuple.doubleValue("balance"));
+        }
+    }
+
     /**
      * Checks operations that act after a transaction is committed, are 
finished with exception.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index 784a1a1b35..7a60fd949f 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -105,4 +105,15 @@ public interface InternalTransaction extends Transaction {
      * provided no transaction for an operation) or not.
      */
     boolean implicit();
+
+    /**
+     * Finishes a read-only transaction with a specific execution timestamp.
+     *
+     * @param commit Commit flag. The flag is ignored for read-only 
transactions.
+     * @param executionTimestamp The timestamp is the time when a read-only 
transaction is applied to the remote node.
+     * @return The future.
+     */
+    default CompletableFuture<Void> finish(boolean commit, HybridTimestamp 
executionTimestamp) {
+        return commit ? commitAsync() : rollbackAsync();
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index fc06c3ea15..2052a4e6bd 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -64,9 +64,11 @@ public interface TxManager extends IgniteComponent {
      *
      * @param timestampTracker Observable timestamp tracker is used to track a 
timestamp for either read-write or read-only
      *         transaction execution. The tracker is also used to determine 
the read timestamp for read-only transactions.
+     * @param readOnly {@code true} in order to start a read-only transaction, 
{@code false} in order to start read-write one.
+     *         Calling begin with readOnly {@code false} is an equivalent of 
TxManager#begin().
      * @return The transaction.
      */
-    InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker);
+    InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, 
boolean readOnly);
 
     /**
      * Returns a transaction state meta.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
index a721e16708..8ce44dfb98 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
@@ -40,15 +40,19 @@ public abstract class IgniteAbstractTransactionImpl 
implements InternalTransacti
     /** The transaction manager. */
     protected final TxManager txManager;
 
+    private final boolean implicit;
+
     /**
      * The constructor.
      *
      * @param txManager The tx manager.
      * @param id The id.
+     * @param implicit Whether the transaction will be implicit or not.
      */
-    public IgniteAbstractTransactionImpl(TxManager txManager, UUID id) {
+    public IgniteAbstractTransactionImpl(TxManager txManager, UUID id, boolean 
implicit) {
         this.txManager = txManager;
         this.id = id;
+        this.implicit = implicit;
     }
 
     /** {@inheritDoc} */
@@ -57,6 +61,12 @@ public abstract class IgniteAbstractTransactionImpl 
implements InternalTransacti
         return id;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public boolean implicit() {
+        return implicit;
+    }
+
     /** {@inheritDoc} */
     @Nullable
     @Override
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 8cfd7b6da8..18200874d2 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.network.ClusterNode;
 
@@ -39,17 +40,29 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** Prevents double finish of the transaction. */
     private final AtomicBoolean finishGuard = new AtomicBoolean();
 
+    /** The tracker is used to track an observable timestamp. */
+    private final HybridTimestampTracker observableTsTracker;
+
     /**
      * The constructor.
      *
      * @param txManager The tx manager.
+     * @param observableTsTracker Observable timestamp tracker.
      * @param id The id.
      * @param readTimestamp The read timestamp.
+     * @param implicit Whether the transaction will be implicit or not.
      */
-    ReadOnlyTransactionImpl(TxManagerImpl txManager, UUID id, HybridTimestamp 
readTimestamp) {
-        super(txManager, id);
+    ReadOnlyTransactionImpl(
+            TxManagerImpl txManager,
+            HybridTimestampTracker observableTsTracker,
+            UUID id,
+            HybridTimestamp readTimestamp,
+            boolean implicit
+    ) {
+        super(txManager, id, implicit);
 
         this.readTimestamp = readTimestamp;
+        this.observableTsTracker = observableTsTracker;
     }
 
     @Override
@@ -67,11 +80,6 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
         return readTimestamp;
     }
 
-    @Override
-    public boolean implicit() {
-        return false;
-    }
-
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId 
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
         // TODO: IGNITE-17666 Close cursor tx finish and do it on the first 
finish invocation only.
@@ -101,10 +109,17 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
     @Override
     // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish 
invocation only.
     protected CompletableFuture<Void> finish(boolean commit) {
+        return finish(commit, readTimestamp);
+    }
+
+    @Override
+    public CompletableFuture<Void> finish(boolean commit, HybridTimestamp 
executionTimestamp) {
         if (!finishGuard.compareAndSet(false, true)) {
             return completedFuture(null);
         }
 
+        observableTsTracker.update(executionTimestamp);
+
         return ((TxManagerImpl) 
txManager).completeReadOnlyTransactionFuture(new 
TxIdAndTimestamp(readTimestamp, id()))
                 .thenRun(() -> txManager.updateTxMeta(
                         id(),
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index e90ed3e2a4..631d5d6c14 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -62,8 +62,6 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** The tracker is used to track an observable timestamp. */
     private final HybridTimestampTracker observableTsTracker;
 
-    private final boolean implicit;
-
     /** A partition which stores the transaction state. */
     private volatile TablePartitionId commitPart;
 
@@ -90,10 +88,9 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
      * @param implicit Whether the transaction will be implicit or not.
      */
     public ReadWriteTransactionImpl(TxManager txManager, 
HybridTimestampTracker observableTsTracker, UUID id, boolean implicit) {
-        super(txManager, id);
+        super(txManager, id, implicit);
 
         this.observableTsTracker = observableTsTracker;
-        this.implicit = implicit;
     }
 
     /** {@inheritDoc} */
@@ -200,9 +197,4 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     public HybridTimestamp startTimestamp() {
         return TransactionIds.beginTimestamp(id());
     }
-
-    @Override
-    public boolean implicit() {
-        return implicit;
-    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index a14b5964a5..aa84bef5bc 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -156,8 +156,8 @@ public class TxManagerImpl implements TxManager {
     }
 
     @Override
-    public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker) {
-        return beginTx(timestampTracker, false, true);
+    public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker, boolean readOnly) {
+        return beginTx(timestampTracker, readOnly, true);
     }
 
     private InternalTransaction beginTx(HybridTimestampTracker 
timestampTracker, boolean readOnly, boolean implicit) {
@@ -175,8 +175,6 @@ public class TxManagerImpl implements TxManager {
                 ? HybridTimestamp.max(observableTimestamp, 
currentReadTimestamp())
                 : currentReadTimestamp();
 
-        timestampTracker.update(readTimestamp);
-
         lowWatermarkReadWriteLock.readLock().lock();
 
         try {
@@ -196,7 +194,7 @@ public class TxManagerImpl implements TxManager {
                 return new CompletableFuture<>();
             });
 
-            return new ReadOnlyTransactionImpl(this, txId, readTimestamp);
+            return new ReadOnlyTransactionImpl(this, timestampTracker, txId, 
readTimestamp, implicit);
         } finally {
             lowWatermarkReadWriteLock.readLock().unlock();
         }
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 5b6d73d1c1..5569cf8c9b 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -107,17 +107,26 @@ public class TxManagerTest extends IgniteAbstractTest {
         InternalTransaction tx0 = txManager.begin(hybridTimestampTracker);
         InternalTransaction tx1 = txManager.begin(hybridTimestampTracker);
         InternalTransaction tx2 = txManager.begin(hybridTimestampTracker, 
true);
-        InternalTransaction tx3 = 
txManager.beginImplicit(hybridTimestampTracker);
+        InternalTransaction tx3 = 
txManager.beginImplicit(hybridTimestampTracker, false);
+        InternalTransaction tx4 = 
txManager.beginImplicit(hybridTimestampTracker, true);
 
         assertNotNull(tx0.id());
         assertNotNull(tx1.id());
         assertNotNull(tx2.id());
         assertNotNull(tx3.id());
+        assertNotNull(tx4.id());
 
         assertFalse(tx0.implicit());
         assertFalse(tx1.implicit());
         assertFalse(tx2.implicit());
         assertTrue(tx3.implicit());
+        assertTrue(tx4.implicit());
+
+        assertFalse(tx0.isReadOnly());
+        assertFalse(tx1.isReadOnly());
+        assertTrue(tx2.isReadOnly());
+        assertFalse(tx3.isReadOnly());
+        assertTrue(tx4.isReadOnly());
     }
 
     @Test
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index cf314addee..65c9cb732a 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -40,7 +41,7 @@ class ReadOnlyTransactionImplTest extends 
BaseIgniteAbstractTest {
         HybridTimestamp readTimestamp = new HybridClockImpl().now();
         UUID txId = 
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(readTimestamp);
 
-        var tx = new ReadOnlyTransactionImpl(txManager, txId, readTimestamp);
+        var tx = new ReadOnlyTransactionImpl(txManager, new 
HybridTimestampTracker(), txId, readTimestamp, false);
 
         assertThat(tx.startTimestamp(), is(readTimestamp));
     }

Reply via email to