This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 86f27a08d7 IGNITE-19824 Implicit RO should be used in implicit single gets (#2641) 86f27a08d7 is described below commit 86f27a08d7fea5c1a38f2e971962bc65b93c3d56 Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Thu Oct 5 11:19:30 2023 +0300 IGNITE-19824 Implicit RO should be used in implicit single gets (#2641) --- .../apache/ignite/client/fakes/FakeTxManager.java | 2 +- .../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 10 +- .../ignite/internal/replicator/ReplicaManager.java | 24 +- .../message/ReadOnlyDirectReplicaRequest.java | 34 +++ .../ignite/distributed/ItTablePersistenceTest.java | 12 +- .../table/distributed/TableMessageGroup.java | 10 + .../ReadOnlyDirectMultiRowReplicaRequest.java | 29 +++ .../ReadOnlyDirectSingleRowReplicaRequest.java | 30 +++ .../replicator/PartitionReplicaListener.java | 67 ++++++ .../distributed/storage/InternalTableImpl.java | 254 ++++++++++++++++++--- .../RepeatedFinishReadWriteTransactionTest.java | 2 +- .../ignite/internal/table/TxAbstractTest.java | 63 +++++ .../ignite/internal/tx/InternalTransaction.java | 11 + .../org/apache/ignite/internal/tx/TxManager.java | 4 +- .../tx/impl/IgniteAbstractTransactionImpl.java | 12 +- .../internal/tx/impl/ReadOnlyTransactionImpl.java | 29 ++- .../internal/tx/impl/ReadWriteTransactionImpl.java | 10 +- .../ignite/internal/tx/impl/TxManagerImpl.java | 8 +- .../apache/ignite/internal/tx/TxManagerTest.java | 11 +- .../tx/impl/ReadOnlyTransactionImplTest.java | 3 +- 20 files changed, 537 insertions(+), 88 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java index 836b515d5f..3ea373782c 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -149,7 +149,7 @@ public class FakeTxManager implements TxManager { } @Override - public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker) { + public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) { throw new UnsupportedOperationException("Not expected to be called here"); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs index 2ace822027..d53774938b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs @@ -100,15 +100,7 @@ public class DataStreamerTests : IgniteTestsBase if (enabled) { - // TODO IGNITE-19824: Remove read-only TX workaround. - // Currently, there might be an exception due to false-positive tx conflict detection, which is fixed by a read-only tx. - await TestUtils.WaitForConditionAsync( - async () => - { - await using var roTx = await Client.Transactions.BeginAsync(new(ReadOnly: true)); - return await TupleView.ContainsKeyAsync(roTx, GetTuple(0)); - }, - 3000); + TestUtils.WaitForCondition(() => TupleView.ContainsKeyAsync(null, GetTuple(0)).GetAwaiter().GetResult(), 3000); } else { diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index c5599f9639..b159a06bc6 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.replicator.exception.ReplicaUnavailableExcepti import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest; import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest; +import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaRequest; @@ -231,12 +232,12 @@ public class ReplicaManager implements IgniteComponent { return; } - HybridTimestamp sendTimestamp = null; - if (requestTimestamp != null) { - sendTimestamp = clock.update(requestTimestamp); + clock.update(requestTimestamp); } + boolean sendTimestamp = request instanceof TimestampAware || request instanceof ReadOnlyDirectReplicaRequest; + // replicaFut is always completed here. Replica replica = replicaFut.join(); @@ -245,16 +246,15 @@ public class ReplicaManager implements IgniteComponent { CompletableFuture<?> result = replica.processRequest(request, senderId); - HybridTimestamp finalSendTimestamp = sendTimestamp; result.handle((res, ex) -> { NetworkMessage msg; if (ex == null) { - msg = prepareReplicaResponse(finalSendTimestamp, res); + msg = prepareReplicaResponse(sendTimestamp, res); } else { LOG.warn("Failed to process replica request [request={}]", ex, request); - msg = prepareReplicaErrorResponse(finalSendTimestamp, ex); + msg = prepareReplicaErrorResponse(sendTimestamp, ex); } clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId); @@ -584,12 +584,12 @@ public class ReplicaManager implements IgniteComponent { /** * Prepares replica response. */ - private NetworkMessage prepareReplicaResponse(@Nullable HybridTimestamp sendTimestamp, Object result) { - if (sendTimestamp != null) { + private NetworkMessage prepareReplicaResponse(boolean sendTimestamp, Object result) { + if (sendTimestamp) { return REPLICA_MESSAGES_FACTORY .timestampAwareReplicaResponse() .result(result) - .timestampLong(sendTimestamp.longValue()) + .timestampLong(clock.nowLong()) .build(); } else { return REPLICA_MESSAGES_FACTORY @@ -602,12 +602,12 @@ public class ReplicaManager implements IgniteComponent { /** * Prepares replica error response. */ - private NetworkMessage prepareReplicaErrorResponse(@Nullable HybridTimestamp sendTimestamp, Throwable ex) { - if (sendTimestamp != null) { + private NetworkMessage prepareReplicaErrorResponse(boolean sendTimestamp, Throwable ex) { + if (sendTimestamp) { return REPLICA_MESSAGES_FACTORY .errorTimestampAwareReplicaResponse() .throwable(ex) - .timestampLong(sendTimestamp.longValue()) + .timestampLong(clock.nowLong()) .build(); } else { return REPLICA_MESSAGES_FACTORY diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java new file mode 100644 index 0000000000..29ca899ee2 --- /dev/null +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.replicator.message; + +/** + * Read-only request that sand to a specific node directly. This request has no read timestamp as other read-only requests because the + * timestamp is calculated on the replica side. The linearization is guaranteed by sending the request directly to the primary node. + * + * <p>The requests are used to implement an implicit read-only transaction for a single partition. + */ +public interface ReadOnlyDirectReplicaRequest extends ReplicaRequest { + /** + * Gets an enlistment consistency token. + * The token is used to check that the lease is still actual while the message goes to the replica. + * + * @return Enlistment consistency token. + */ + Long enlistmentConsistencyToken(); +} diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java index 2986aa9615..542d750c6c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java @@ -85,8 +85,10 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler; import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.PartitionListener; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest; +import org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; @@ -242,10 +244,10 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti when(partitionReplicaListener.invoke(any(), any())).thenAnswer(invocationOnMock -> { ReplicaRequest req = invocationOnMock.getArgument(0); - if (req instanceof ReadWriteSingleRowPkReplicaRequest) { - ReadWriteSingleRowPkReplicaRequest req0 = (ReadWriteSingleRowPkReplicaRequest) req; + if (req instanceof ReadWriteSingleRowPkReplicaRequest || req instanceof ReadOnlyDirectSingleRowReplicaRequest) { + SingleRowPkReplicaRequest req0 = (SingleRowPkReplicaRequest) req; - if (req0.requestType() == RequestType.RW_GET) { + if (req0.requestType() == RequestType.RW_GET || req0.requestType() == RequestType.RO_GET) { List<JraftServerImpl> servers = servers(); JraftServerImpl leader = servers.stream() @@ -274,8 +276,10 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti return completedFuture(row); } else if (req0.requestType() == RequestType.RW_DELETE) { + ReadWriteSingleRowPkReplicaRequest rwReq = (ReadWriteSingleRowPkReplicaRequest) req0; + UpdateCommand cmd = msgFactory.updateCommand() - .txId(req0.transactionId()) + .txId(rwReq.transactionId()) .tablePartitionId(tablePartitionId(new TablePartitionId(1, 0))) .rowUuid(new RowId(0).uuid()) .safeTimeLong(hybridClock.nowLong()) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java index 78604d0c17..38ddd35f0c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java @@ -165,6 +165,16 @@ public interface TableMessageGroup { /** Message type for {@link BuildIndexReplicaRequest}. */ short BUILD_INDEX_REPLICA_REQUEST = 21; + /** + * Message type for {@link ReadOnlyDirectSingleRowReplicaRequest}. + */ + short RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST = 22; + + /** + * Message type for {@link ReadOnlyDirectMultiRowReplicaRequest}. + */ + short RO_DIRECT_MULTI_ROW_REPLICA_REQUEST = 23; + /** * Message types for Table module RAFT commands. * diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java new file mode 100644 index 0000000000..70d2e22917 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.replication.request; + +import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Read only direct multi row replica request. + */ +@Transferable(TableMessageGroup.RO_DIRECT_MULTI_ROW_REPLICA_REQUEST) +public interface ReadOnlyDirectMultiRowReplicaRequest extends MultipleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest { +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java new file mode 100644 index 0000000000..c4e4d59817 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.replication.request; + +import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Read only direct node single row replica request. + * The type of RO request never waits and is executed at the current node timestamp. + */ +@Transferable(TableMessageGroup.RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST) +public interface ReadOnlyDirectSingleRowReplicaRequest extends SingleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest { +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index c1f667e0c4..dad2205006 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -91,6 +91,7 @@ import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException; import org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException; import org.apache.ignite.internal.replicator.listener.ReplicaListener; +import org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest; @@ -125,6 +126,8 @@ import org.apache.ignite.internal.table.distributed.replication.request.BinaryRo import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage; import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.CommittableTxRequest; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest; +import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest; import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest; @@ -448,6 +451,10 @@ public class PartitionReplicaListener implements ReplicaListener { return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request, isPrimary); } else if (request instanceof BuildIndexReplicaRequest) { return raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request)); + } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) { + return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request); + } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) { + return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request); } else { throw new UnsupportedReplicaRequestException(request.getClass()); } @@ -1657,6 +1664,44 @@ public class PartitionReplicaListener implements ReplicaListener { return row.tupleSlice().compareTo(row2.tupleSlice()) == 0; } + /** + * Processes multiple entries direct request for read only transaction. + * + * @param request Read only multiple entries request. + * @return Result future. + */ + private CompletableFuture<List<BinaryRow>> processReadOnlyDirectMultiEntryAction( + ReadOnlyDirectMultiRowReplicaRequest request + ) { + List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys()); + HybridTimestamp readTimestamp = hybridClock.now(); + + if (request.requestType() != RequestType.RO_GET_ALL) { + throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, + format("Unknown single request [actionType={}]", request.requestType())); + } + + var resolutionFuts = new ArrayList<CompletableFuture<BinaryRow>>(primaryKeys.size()); + + for (BinaryTuple primaryKey : primaryKeys) { + CompletableFuture<BinaryRow> fut = resolveRowByPkForReadOnly(primaryKey, readTimestamp); + + resolutionFuts.add(fut); + } + + return allOf(resolutionFuts.toArray(new CompletableFuture[0])).thenApply(unused1 -> { + var result = new ArrayList<BinaryRow>(resolutionFuts.size()); + + for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) { + BinaryRow resolvedReadResult = resolutionFut.join(); + + result.add(resolvedReadResult); + } + + return result; + }); + } + /** * Precesses multi request. * @@ -2034,6 +2079,24 @@ public class PartitionReplicaListener implements ReplicaListener { }); } + /** + * Processes single entry direct request for read only transaction. + * + * @param request Read only single entry request. + * @return Result future. + */ + private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(ReadOnlyDirectSingleRowReplicaRequest request) { + BinaryTuple primaryKey = resolvePk(request.primaryKey()); + HybridTimestamp readTimestamp = hybridClock.now(); + + if (request.requestType() != RequestType.RO_GET) { + throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, + format("Unknown single request [actionType={}]", request.requestType())); + } + + return resolveRowByPkForReadOnly(primaryKey, readTimestamp); + } + /** * Precesses single request. * @@ -2464,6 +2527,10 @@ public class PartitionReplicaListener implements ReplicaListener { } else if (request instanceof TxFinishReplicaRequest) { expectedTerm = ((TxFinishReplicaRequest) request).term(); + assert expectedTerm != null; + } else if (request instanceof ReadOnlyDirectReplicaRequest) { + expectedTerm = ((ReadOnlyDirectReplicaRequest) request).enlistmentConsistencyToken(); + assert expectedTerm != null; } else { expectedTerm = null; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 1fca0a43e8..2718d6656b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -40,6 +40,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -49,6 +50,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -95,6 +97,7 @@ import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; +import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; @@ -365,7 +368,7 @@ public class InternalTableImpl implements InternalTable { } private InternalTransaction startImplicitTxIfNeeded(@Nullable InternalTransaction tx) { - return tx == null ? txManager.beginImplicit(observableTimestampTracker) : tx; + return tx == null ? txManager.beginImplicit(observableTimestampTracker, false) : tx; } /** @@ -549,28 +552,173 @@ public class InternalTableImpl implements InternalTable { }).thenCompose(x -> x); } + /** + * Evaluates the single-row request to the cluster for a read-only single-partition transaction. + * + * @param row Binary row. + * @param op Operation. + * @param <R> Result type. + * @return The future. + */ + private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode( + BinaryRowEx row, + BiFunction<ReplicationGroupId, Long, ReplicaRequest> op + ) { + InternalTransaction tx = txManager.beginImplicit(observableTimestampTracker, true); + + int partId = partitionId(row); + + TablePartitionId tablePartitionId = new TablePartitionId(tableId, partId); + + CompletableFuture<ReplicaMeta> primaryReplicaFuture = placementDriver.awaitPrimaryReplica( + tablePartitionId, + tx.startTimestamp(), + AWAIT_PRIMARY_REPLICA_TIMEOUT, + TimeUnit.SECONDS + ); + + CompletableFuture<R> fut = primaryReplicaFuture.thenCompose(primaryReplica -> { + try { + ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder()); + + if (node == null) { + throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId=" + + primaryReplica.getLeaseholder() + ']'); + } + + return replicaSvc.invoke(node, op.apply(tablePartitionId, primaryReplica.getStartTime().longValue())); + } catch (Throwable e) { + throw new TransactionException( + INTERNAL_ERR, + IgniteStringFormatter.format( + "Failed to invoke the replica request [tableName={}, partId={}]", + tableName, + partId + ), + e + ); + } + }); + + return postEvaluate(fut, tx); + } + + /** + * Evaluates the multi-row request to the cluster for a read-only single-partition transaction. + * + * @param rows Rows. + * @param op Replica requests factory. + * @param <R> Result type. + * @return The future. + */ + private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode( + Collection<BinaryRowEx> rows, + BiFunction<ReplicationGroupId, Long, ReplicaRequest> op + ) { + InternalTransaction tx = txManager.beginImplicit(observableTimestampTracker, true); + + int partId = partitionId(rows.iterator().next()); + + TablePartitionId tablePartitionId = new TablePartitionId(tableId, partId); + + CompletableFuture<ReplicaMeta> primaryReplicaFuture = placementDriver.awaitPrimaryReplica( + tablePartitionId, + tx.startTimestamp(), + AWAIT_PRIMARY_REPLICA_TIMEOUT, + TimeUnit.SECONDS + ); + + CompletableFuture<R> fut = primaryReplicaFuture.thenCompose(primaryReplica -> { + try { + ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder()); + + if (node == null) { + throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId=" + + primaryReplica.getLeaseholder() + ']'); + } + + return replicaSvc.invoke(node, op.apply(tablePartitionId, primaryReplica.getStartTime().longValue())); + } catch (Throwable e) { + throw new TransactionException( + INTERNAL_ERR, + IgniteStringFormatter.format( + "Failed to invoke the replica request [tableName={}, partId={}]", + tableName, + partId + ), + e + ); + } + }); + + return postEvaluate(fut, tx); + } + + /** + * Performs post evaluate operation. + * + * @param fut The future. + * @param tx The transaction. + * @param <R> Operation return type. + * @return The future. + */ + private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut, InternalTransaction tx) { + return fut.handle((BiFunction<R, Throwable, CompletableFuture<R>>) (r, e) -> { + if (e != null) { + RuntimeException e0 = wrapReplicationException(e); + + return tx.finish(false, clock.now()) + .handle((ignored, err) -> { + + if (err != null) { + e0.addSuppressed(err); + } + throw e0; + }); // Preserve failed state. + } + + return tx.finish(true, clock.now()) + .exceptionally(ex -> { + throw wrapReplicationException(ex); + }) + .thenApply(ignored -> r); + }).thenCompose(x -> x); + } + /** {@inheritDoc} */ @Override public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, InternalTransaction tx) { - if (tx != null && tx.isReadOnly()) { - return evaluateReadOnlyRecipientNode(partitionId(keyRow)) - .thenCompose(recipientNode -> get(keyRow, tx.readTimestamp(), recipientNode)); - } else { - return enlistInTx( + if (tx == null) { + return evaluateReadOnlyPrimaryNode( keyRow, - tx, - (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowPkReplicaRequest() + (groupId, consistencyToken) -> tableMessagesFactory.readOnlyDirectSingleRowReplicaRequest() .groupId(groupId) + .enlistmentConsistencyToken(consistencyToken) .primaryKey(keyRow.tupleSlice()) - .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) - .transactionId(txo.id()) - .term(term) - .requestType(RequestType.RW_GET) - .timestampLong(clock.nowLong()) - .full(tx == null) + .requestType(RequestType.RO_GET) .build() ); } + + if (tx.isReadOnly()) { + return evaluateReadOnlyRecipientNode(partitionId(keyRow)) + .thenCompose(recipientNode -> get(keyRow, tx.readTimestamp(), recipientNode)); + } + + return enlistInTx( + keyRow, + tx, + (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowPkReplicaRequest() + .groupId(groupId) + .primaryKey(keyRow.tupleSlice()) + .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) + .transactionId(txo.id()) + .term(term) + .requestType(RequestType.RW_GET) + .timestampLong(clock.nowLong()) + .full(tx == null) + .build() + ); } @Override @@ -591,35 +739,69 @@ public class InternalTableImpl implements InternalTable { ); } + /** + * Checks that the batch of rows belongs to a single partition. + * + * @param rows Batch of rows. + * @return If all rows belong to one partition, the method returns true; otherwise, it returns false. + */ + private boolean isSinglePartitionBatch(Collection<BinaryRowEx> rows) { + Iterator<BinaryRowEx> rowIterator = rows.iterator(); + + int partId = partitionId(rowIterator.next()); + + while (rowIterator.hasNext()) { + BinaryRowEx row = rowIterator.next(); + + if (partId != partitionId(row)) { + return false; + } + } + + return true; + } + /** {@inheritDoc} */ @Override public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows, InternalTransaction tx) { - if (tx != null && tx.isReadOnly()) { - BinaryRowEx firstRow = keyRows.iterator().next(); + if (CollectionUtils.nullOrEmpty(keyRows)) { + return completedFuture(Collections.emptyList()); + } - if (firstRow == null) { - return completedFuture(Collections.emptyList()); - } else { - return evaluateReadOnlyRecipientNode(partitionId(firstRow)) - .thenCompose(recipientNode -> getAll(keyRows, tx.readTimestamp(), recipientNode)); - } - } else { - return enlistInTx( + if (tx == null && isSinglePartitionBatch(keyRows)) { + return evaluateReadOnlyPrimaryNode( keyRows, - tx, - (keyRows0, txo, groupId, term, full) -> tableMessagesFactory.readWriteMultiRowPkReplicaRequest() + (groupId, consistencyToken) -> tableMessagesFactory.readOnlyDirectMultiRowReplicaRequest() .groupId(groupId) - .primaryKeys(serializePrimaryKeys(keyRows0)) - .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) - .transactionId(txo.id()) - .term(term) - .requestType(RequestType.RW_GET_ALL) - .timestampLong(clock.nowLong()) - .full(full) - .build(), - InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder + .enlistmentConsistencyToken(consistencyToken) + .primaryKeys(serializePrimaryKeys(keyRows)) + .requestType(RequestType.RO_GET_ALL) + .build() ); } + + if (tx != null && tx.isReadOnly()) { + BinaryRowEx firstRow = keyRows.iterator().next(); + + return evaluateReadOnlyRecipientNode(partitionId(firstRow)) + .thenCompose(recipientNode -> getAll(keyRows, tx.readTimestamp(), recipientNode)); + } + + return enlistInTx( + keyRows, + tx, + (keyRows0, txo, groupId, term, full) -> tableMessagesFactory.readWriteMultiRowPkReplicaRequest() + .groupId(groupId) + .primaryKeys(serializePrimaryKeys(keyRows0)) + .commitPartitionId(serializeTablePartitionId(txo.commitPartition())) + .transactionId(txo.id()) + .term(term) + .requestType(RequestType.RW_GET_ALL) + .timestampLong(clock.nowLong()) + .full(full) + .build(), + InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder + ); } /** {@inheritDoc} */ @@ -713,7 +895,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int partition) { - InternalTransaction tx = txManager.beginImplicit(observableTimestampTracker); + InternalTransaction tx = txManager.beginImplicit(observableTimestampTracker, false); TablePartitionId partGroupId = new TablePartitionId(tableId, partition); CompletableFuture<Void> fut = enlistWithRetry( diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java index 863d01f5e2..3a0b2daf0f 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java @@ -246,7 +246,7 @@ public class RepeatedFinishReadWriteTransactionTest extends BaseIgniteAbstractTe } @Override - public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker) { + public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index d5f9fa98d1..3ac7e0ee00 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.NativeTypes; import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; import org.apache.ignite.internal.schema.row.Row; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.testframework.IgniteTestUtils; @@ -1958,6 +1959,68 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { tx.commit(); } + @Test + public void testSingleGet() { + var accountRecordsView = accounts.recordView(); + + accountRecordsView.upsert(null, makeValue(1, 100.)); + + Transaction tx1 = igniteTransactions.begin(); + Transaction tx2 = igniteTransactions.begin(); + + accountRecordsView.upsert(tx1, makeValue(1, 200.)); + + assertThrows(TransactionException.class, () -> accountRecordsView.get(tx2, makeKey(1))); + + assertEquals(100., accountRecordsView.get(null, makeKey(1)).doubleValue("balance")); + + tx1.commit(); + + assertEquals(200., accountRecordsView.get(null, makeKey(1)).doubleValue("balance")); + } + + @Test + public void testBatchSinglePartitionGet() throws Exception { + var accountRecordsView = accounts.recordView(); + + var marshaller = new TupleMarshallerImpl(accounts.schemaView()); + + int partId = accounts.internalTable().partition(marshaller.marshalKey(makeKey(0))); + + ArrayList<Integer> keys = new ArrayList<>(10); + keys.add(0); + + for (int i = 1; i < 10_000 && keys.size() < 10; i++) { + var p = accounts.internalTable().partition(marshaller.marshalKey(makeKey(i))); + + if (p == partId) { + keys.add(i); + } + } + + log.info("A batch of keys for a single partition is found [partId={}, keys{}]", partId, keys); + + accountRecordsView.upsertAll(null, keys.stream().map(k -> makeValue(k, 100.)).collect(toList())); + + Transaction tx1 = igniteTransactions.begin(); + Transaction tx2 = igniteTransactions.begin(); + + accountRecordsView.upsertAll(tx1, keys.stream().map(k -> makeValue(k, 200.)).collect(toList())); + + assertThrows(TransactionException.class, + () -> accountRecordsView.getAll(tx2, keys.stream().map(k -> makeKey(k)).collect(toList()))); + + for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k -> makeKey(k)).collect(toList()))) { + assertEquals(100., tuple.doubleValue("balance")); + } + + tx1.commit(); + + for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k -> makeKey(k)).collect(toList()))) { + assertEquals(200., tuple.doubleValue("balance")); + } + } + /** * Checks operations that act after a transaction is committed, are finished with exception. * diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java index 784a1a1b35..7a60fd949f 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java @@ -105,4 +105,15 @@ public interface InternalTransaction extends Transaction { * provided no transaction for an operation) or not. */ boolean implicit(); + + /** + * Finishes a read-only transaction with a specific execution timestamp. + * + * @param commit Commit flag. The flag is ignored for read-only transactions. + * @param executionTimestamp The timestamp is the time when a read-only transaction is applied to the remote node. + * @return The future. + */ + default CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp) { + return commit ? commitAsync() : rollbackAsync(); + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index fc06c3ea15..2052a4e6bd 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -64,9 +64,11 @@ public interface TxManager extends IgniteComponent { * * @param timestampTracker Observable timestamp tracker is used to track a timestamp for either read-write or read-only * transaction execution. The tracker is also used to determine the read timestamp for read-only transactions. + * @param readOnly {@code true} in order to start a read-only transaction, {@code false} in order to start read-write one. + * Calling begin with readOnly {@code false} is an equivalent of TxManager#begin(). * @return The transaction. */ - InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker); + InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly); /** * Returns a transaction state meta. diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java index a721e16708..8ce44dfb98 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java @@ -40,15 +40,19 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti /** The transaction manager. */ protected final TxManager txManager; + private final boolean implicit; + /** * The constructor. * * @param txManager The tx manager. * @param id The id. + * @param implicit Whether the transaction will be implicit or not. */ - public IgniteAbstractTransactionImpl(TxManager txManager, UUID id) { + public IgniteAbstractTransactionImpl(TxManager txManager, UUID id, boolean implicit) { this.txManager = txManager; this.id = id; + this.implicit = implicit; } /** {@inheritDoc} */ @@ -57,6 +61,12 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti return id; } + /** {@inheritDoc} */ + @Override + public boolean implicit() { + return implicit; + } + /** {@inheritDoc} */ @Nullable @Override diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java index 8cfd7b6da8..18200874d2 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.TxStateMeta; import org.apache.ignite.network.ClusterNode; @@ -39,17 +40,29 @@ class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl { /** Prevents double finish of the transaction. */ private final AtomicBoolean finishGuard = new AtomicBoolean(); + /** The tracker is used to track an observable timestamp. */ + private final HybridTimestampTracker observableTsTracker; + /** * The constructor. * * @param txManager The tx manager. + * @param observableTsTracker Observable timestamp tracker. * @param id The id. * @param readTimestamp The read timestamp. + * @param implicit Whether the transaction will be implicit or not. */ - ReadOnlyTransactionImpl(TxManagerImpl txManager, UUID id, HybridTimestamp readTimestamp) { - super(txManager, id); + ReadOnlyTransactionImpl( + TxManagerImpl txManager, + HybridTimestampTracker observableTsTracker, + UUID id, + HybridTimestamp readTimestamp, + boolean implicit + ) { + super(txManager, id, implicit); this.readTimestamp = readTimestamp; + this.observableTsTracker = observableTsTracker; } @Override @@ -67,11 +80,6 @@ class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl { return readTimestamp; } - @Override - public boolean implicit() { - return false; - } - @Override public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) { // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only. @@ -101,10 +109,17 @@ class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl { @Override // TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish invocation only. protected CompletableFuture<Void> finish(boolean commit) { + return finish(commit, readTimestamp); + } + + @Override + public CompletableFuture<Void> finish(boolean commit, HybridTimestamp executionTimestamp) { if (!finishGuard.compareAndSet(false, true)) { return completedFuture(null); } + observableTsTracker.update(executionTimestamp); + return ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(new TxIdAndTimestamp(readTimestamp, id())) .thenRun(() -> txManager.updateTxMeta( id(), diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java index e90ed3e2a4..631d5d6c14 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java @@ -62,8 +62,6 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { /** The tracker is used to track an observable timestamp. */ private final HybridTimestampTracker observableTsTracker; - private final boolean implicit; - /** A partition which stores the transaction state. */ private volatile TablePartitionId commitPart; @@ -90,10 +88,9 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { * @param implicit Whether the transaction will be implicit or not. */ public ReadWriteTransactionImpl(TxManager txManager, HybridTimestampTracker observableTsTracker, UUID id, boolean implicit) { - super(txManager, id); + super(txManager, id, implicit); this.observableTsTracker = observableTsTracker; - this.implicit = implicit; } /** {@inheritDoc} */ @@ -200,9 +197,4 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl { public HybridTimestamp startTimestamp() { return TransactionIds.beginTimestamp(id()); } - - @Override - public boolean implicit() { - return implicit; - } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index a14b5964a5..aa84bef5bc 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -156,8 +156,8 @@ public class TxManagerImpl implements TxManager { } @Override - public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker) { - return beginTx(timestampTracker, false, true); + public InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker, boolean readOnly) { + return beginTx(timestampTracker, readOnly, true); } private InternalTransaction beginTx(HybridTimestampTracker timestampTracker, boolean readOnly, boolean implicit) { @@ -175,8 +175,6 @@ public class TxManagerImpl implements TxManager { ? HybridTimestamp.max(observableTimestamp, currentReadTimestamp()) : currentReadTimestamp(); - timestampTracker.update(readTimestamp); - lowWatermarkReadWriteLock.readLock().lock(); try { @@ -196,7 +194,7 @@ public class TxManagerImpl implements TxManager { return new CompletableFuture<>(); }); - return new ReadOnlyTransactionImpl(this, txId, readTimestamp); + return new ReadOnlyTransactionImpl(this, timestampTracker, txId, readTimestamp, implicit); } finally { lowWatermarkReadWriteLock.readLock().unlock(); } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 5b6d73d1c1..5569cf8c9b 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -107,17 +107,26 @@ public class TxManagerTest extends IgniteAbstractTest { InternalTransaction tx0 = txManager.begin(hybridTimestampTracker); InternalTransaction tx1 = txManager.begin(hybridTimestampTracker); InternalTransaction tx2 = txManager.begin(hybridTimestampTracker, true); - InternalTransaction tx3 = txManager.beginImplicit(hybridTimestampTracker); + InternalTransaction tx3 = txManager.beginImplicit(hybridTimestampTracker, false); + InternalTransaction tx4 = txManager.beginImplicit(hybridTimestampTracker, true); assertNotNull(tx0.id()); assertNotNull(tx1.id()); assertNotNull(tx2.id()); assertNotNull(tx3.id()); + assertNotNull(tx4.id()); assertFalse(tx0.implicit()); assertFalse(tx1.implicit()); assertFalse(tx2.implicit()); assertTrue(tx3.implicit()); + assertTrue(tx4.implicit()); + + assertFalse(tx0.isReadOnly()); + assertFalse(tx1.isReadOnly()); + assertTrue(tx2.isReadOnly()); + assertFalse(tx3.isReadOnly()); + assertTrue(tx4.isReadOnly()); } @Test diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java index cf314addee..65c9cb732a 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.test.TestTransactionIds; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -40,7 +41,7 @@ class ReadOnlyTransactionImplTest extends BaseIgniteAbstractTest { HybridTimestamp readTimestamp = new HybridClockImpl().now(); UUID txId = TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(readTimestamp); - var tx = new ReadOnlyTransactionImpl(txManager, txId, readTimestamp); + var tx = new ReadOnlyTransactionImpl(txManager, new HybridTimestampTracker(), txId, readTimestamp, false); assertThat(tx.startTimestamp(), is(readTimestamp)); }