This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 86f27a08d7 IGNITE-19824 Implicit RO should be used in implicit single
gets (#2641)
86f27a08d7 is described below
commit 86f27a08d7fea5c1a38f2e971962bc65b93c3d56
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Oct 5 11:19:30 2023 +0300
IGNITE-19824 Implicit RO should be used in implicit single gets (#2641)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 2 +-
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 10 +-
.../ignite/internal/replicator/ReplicaManager.java | 24 +-
.../message/ReadOnlyDirectReplicaRequest.java | 34 +++
.../ignite/distributed/ItTablePersistenceTest.java | 12 +-
.../table/distributed/TableMessageGroup.java | 10 +
.../ReadOnlyDirectMultiRowReplicaRequest.java | 29 +++
.../ReadOnlyDirectSingleRowReplicaRequest.java | 30 +++
.../replicator/PartitionReplicaListener.java | 67 ++++++
.../distributed/storage/InternalTableImpl.java | 254 ++++++++++++++++++---
.../RepeatedFinishReadWriteTransactionTest.java | 2 +-
.../ignite/internal/table/TxAbstractTest.java | 63 +++++
.../ignite/internal/tx/InternalTransaction.java | 11 +
.../org/apache/ignite/internal/tx/TxManager.java | 4 +-
.../tx/impl/IgniteAbstractTransactionImpl.java | 12 +-
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 29 ++-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 10 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 8 +-
.../apache/ignite/internal/tx/TxManagerTest.java | 11 +-
.../tx/impl/ReadOnlyTransactionImplTest.java | 3 +-
20 files changed, 537 insertions(+), 88 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 836b515d5f..3ea373782c 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -149,7 +149,7 @@ public class FakeTxManager implements TxManager {
}
@Override
- public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker) {
+ public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker, boolean readOnly) {
throw new UnsupportedOperationException("Not expected to be called
here");
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 2ace822027..d53774938b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -100,15 +100,7 @@ public class DataStreamerTests : IgniteTestsBase
if (enabled)
{
- // TODO IGNITE-19824: Remove read-only TX workaround.
- // Currently, there might be an exception due to false-positive tx
conflict detection, which is fixed by a read-only tx.
- await TestUtils.WaitForConditionAsync(
- async () =>
- {
- await using var roTx = await
Client.Transactions.BeginAsync(new(ReadOnly: true));
- return await TupleView.ContainsKeyAsync(roTx, GetTuple(0));
- },
- 3000);
+ TestUtils.WaitForCondition(() => TupleView.ContainsKeyAsync(null,
GetTuple(0)).GetAwaiter().GetResult(), 3000);
}
else
{
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index c5599f9639..b159a06bc6 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableExcepti
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.AwaitReplicaRequest;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
@@ -231,12 +232,12 @@ public class ReplicaManager implements IgniteComponent {
return;
}
- HybridTimestamp sendTimestamp = null;
-
if (requestTimestamp != null) {
- sendTimestamp = clock.update(requestTimestamp);
+ clock.update(requestTimestamp);
}
+ boolean sendTimestamp = request instanceof TimestampAware ||
request instanceof ReadOnlyDirectReplicaRequest;
+
// replicaFut is always completed here.
Replica replica = replicaFut.join();
@@ -245,16 +246,15 @@ public class ReplicaManager implements IgniteComponent {
CompletableFuture<?> result = replica.processRequest(request,
senderId);
- HybridTimestamp finalSendTimestamp = sendTimestamp;
result.handle((res, ex) -> {
NetworkMessage msg;
if (ex == null) {
- msg = prepareReplicaResponse(finalSendTimestamp, res);
+ msg = prepareReplicaResponse(sendTimestamp, res);
} else {
LOG.warn("Failed to process replica request [request={}]",
ex, request);
- msg = prepareReplicaErrorResponse(finalSendTimestamp, ex);
+ msg = prepareReplicaErrorResponse(sendTimestamp, ex);
}
clusterNetSvc.messagingService().respond(senderConsistentId,
msg, correlationId);
@@ -584,12 +584,12 @@ public class ReplicaManager implements IgniteComponent {
/**
* Prepares replica response.
*/
- private NetworkMessage prepareReplicaResponse(@Nullable HybridTimestamp
sendTimestamp, Object result) {
- if (sendTimestamp != null) {
+ private NetworkMessage prepareReplicaResponse(boolean sendTimestamp,
Object result) {
+ if (sendTimestamp) {
return REPLICA_MESSAGES_FACTORY
.timestampAwareReplicaResponse()
.result(result)
- .timestampLong(sendTimestamp.longValue())
+ .timestampLong(clock.nowLong())
.build();
} else {
return REPLICA_MESSAGES_FACTORY
@@ -602,12 +602,12 @@ public class ReplicaManager implements IgniteComponent {
/**
* Prepares replica error response.
*/
- private NetworkMessage prepareReplicaErrorResponse(@Nullable
HybridTimestamp sendTimestamp, Throwable ex) {
- if (sendTimestamp != null) {
+ private NetworkMessage prepareReplicaErrorResponse(boolean sendTimestamp,
Throwable ex) {
+ if (sendTimestamp) {
return REPLICA_MESSAGES_FACTORY
.errorTimestampAwareReplicaResponse()
.throwable(ex)
- .timestampLong(sendTimestamp.longValue())
+ .timestampLong(clock.nowLong())
.build();
} else {
return REPLICA_MESSAGES_FACTORY
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java
new file mode 100644
index 0000000000..29ca899ee2
--- /dev/null
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReadOnlyDirectReplicaRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator.message;
+
+/**
+ * Read-only request that sand to a specific node directly. This request has
no read timestamp as other read-only requests because the
+ * timestamp is calculated on the replica side. The linearization is
guaranteed by sending the request directly to the primary node.
+ *
+ * <p>The requests are used to implement an implicit read-only transaction for
a single partition.
+ */
+public interface ReadOnlyDirectReplicaRequest extends ReplicaRequest {
+ /**
+ * Gets an enlistment consistency token.
+ * The token is used to check that the lease is still actual while the
message goes to the replica.
+ *
+ * @return Enlistment consistency token.
+ */
+ Long enlistmentConsistencyToken();
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 2986aa9615..542d750c6c 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -85,8 +85,10 @@ import
org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.SingleRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -242,10 +244,10 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
when(partitionReplicaListener.invoke(any(),
any())).thenAnswer(invocationOnMock -> {
ReplicaRequest req = invocationOnMock.getArgument(0);
- if (req instanceof ReadWriteSingleRowPkReplicaRequest) {
- ReadWriteSingleRowPkReplicaRequest req0 =
(ReadWriteSingleRowPkReplicaRequest) req;
+ if (req instanceof ReadWriteSingleRowPkReplicaRequest || req
instanceof ReadOnlyDirectSingleRowReplicaRequest) {
+ SingleRowPkReplicaRequest req0 = (SingleRowPkReplicaRequest)
req;
- if (req0.requestType() == RequestType.RW_GET) {
+ if (req0.requestType() == RequestType.RW_GET ||
req0.requestType() == RequestType.RO_GET) {
List<JraftServerImpl> servers = servers();
JraftServerImpl leader = servers.stream()
@@ -274,8 +276,10 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
return completedFuture(row);
} else if (req0.requestType() == RequestType.RW_DELETE) {
+ ReadWriteSingleRowPkReplicaRequest rwReq =
(ReadWriteSingleRowPkReplicaRequest) req0;
+
UpdateCommand cmd = msgFactory.updateCommand()
- .txId(req0.transactionId())
+ .txId(rwReq.transactionId())
.tablePartitionId(tablePartitionId(new
TablePartitionId(1, 0)))
.rowUuid(new RowId(0).uuid())
.safeTimeLong(hybridClock.nowLong())
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 78604d0c17..38ddd35f0c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -165,6 +165,16 @@ public interface TableMessageGroup {
/** Message type for {@link BuildIndexReplicaRequest}. */
short BUILD_INDEX_REPLICA_REQUEST = 21;
+ /**
+ * Message type for {@link ReadOnlyDirectSingleRowReplicaRequest}.
+ */
+ short RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST = 22;
+
+ /**
+ * Message type for {@link ReadOnlyDirectMultiRowReplicaRequest}.
+ */
+ short RO_DIRECT_MULTI_ROW_REPLICA_REQUEST = 23;
+
/**
* Message types for Table module RAFT commands.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java
new file mode 100644
index 0000000000..70d2e22917
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectMultiRowReplicaRequest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication.request;
+
+import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Read only direct multi row replica request.
+ */
+@Transferable(TableMessageGroup.RO_DIRECT_MULTI_ROW_REPLICA_REQUEST)
+public interface ReadOnlyDirectMultiRowReplicaRequest extends
MultipleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java
new file mode 100644
index 0000000000..c4e4d59817
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadOnlyDirectSingleRowReplicaRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication.request;
+
+import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Read only direct node single row replica request.
+ * The type of RO request never waits and is executed at the current node
timestamp.
+ */
+@Transferable(TableMessageGroup.RO_DIRECT_SINGLE_ROW_REPLICA_REQUEST)
+public interface ReadOnlyDirectSingleRowReplicaRequest extends
SingleRowPkReplicaRequest, ReadOnlyDirectReplicaRequest {
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c1f667e0c4..dad2205006 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -91,6 +91,7 @@ import
org.apache.ignite.internal.replicator.exception.ReplicationException;
import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
import
org.apache.ignite.internal.replicator.exception.UnsupportedReplicaRequestException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import
org.apache.ignite.internal.replicator.message.ReadOnlyDirectReplicaRequest;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
@@ -125,6 +126,8 @@ import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRo
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import
org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.CommittableTxRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
+import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
import
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -448,6 +451,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
return
processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request,
isPrimary);
} else if (request instanceof BuildIndexReplicaRequest) {
return
raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
+ } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
+ return
processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest)
request);
+ } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
+ return
processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest)
request);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -1657,6 +1664,44 @@ public class PartitionReplicaListener implements
ReplicaListener {
return row.tupleSlice().compareTo(row2.tupleSlice()) == 0;
}
+ /**
+ * Processes multiple entries direct request for read only transaction.
+ *
+ * @param request Read only multiple entries request.
+ * @return Result future.
+ */
+ private CompletableFuture<List<BinaryRow>>
processReadOnlyDirectMultiEntryAction(
+ ReadOnlyDirectMultiRowReplicaRequest request
+ ) {
+ List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
+ HybridTimestamp readTimestamp = hybridClock.now();
+
+ if (request.requestType() != RequestType.RO_GET_ALL) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ format("Unknown single request [actionType={}]",
request.requestType()));
+ }
+
+ var resolutionFuts = new
ArrayList<CompletableFuture<BinaryRow>>(primaryKeys.size());
+
+ for (BinaryTuple primaryKey : primaryKeys) {
+ CompletableFuture<BinaryRow> fut =
resolveRowByPkForReadOnly(primaryKey, readTimestamp);
+
+ resolutionFuts.add(fut);
+ }
+
+ return allOf(resolutionFuts.toArray(new
CompletableFuture[0])).thenApply(unused1 -> {
+ var result = new ArrayList<BinaryRow>(resolutionFuts.size());
+
+ for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
+ BinaryRow resolvedReadResult = resolutionFut.join();
+
+ result.add(resolvedReadResult);
+ }
+
+ return result;
+ });
+ }
+
/**
* Precesses multi request.
*
@@ -2034,6 +2079,24 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ /**
+ * Processes single entry direct request for read only transaction.
+ *
+ * @param request Read only single entry request.
+ * @return Result future.
+ */
+ private CompletableFuture<BinaryRow>
processReadOnlyDirectSingleEntryAction(ReadOnlyDirectSingleRowReplicaRequest
request) {
+ BinaryTuple primaryKey = resolvePk(request.primaryKey());
+ HybridTimestamp readTimestamp = hybridClock.now();
+
+ if (request.requestType() != RequestType.RO_GET) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ format("Unknown single request [actionType={}]",
request.requestType()));
+ }
+
+ return resolveRowByPkForReadOnly(primaryKey, readTimestamp);
+ }
+
/**
* Precesses single request.
*
@@ -2464,6 +2527,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
} else if (request instanceof TxFinishReplicaRequest) {
expectedTerm = ((TxFinishReplicaRequest) request).term();
+ assert expectedTerm != null;
+ } else if (request instanceof ReadOnlyDirectReplicaRequest) {
+ expectedTerm = ((ReadOnlyDirectReplicaRequest)
request).enlistmentConsistencyToken();
+
assert expectedTerm != null;
} else {
expectedTerm = null;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 1fca0a43e8..2718d6656b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -40,6 +40,7 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -49,6 +50,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -95,6 +97,7 @@ import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
@@ -365,7 +368,7 @@ public class InternalTableImpl implements InternalTable {
}
private InternalTransaction startImplicitTxIfNeeded(@Nullable
InternalTransaction tx) {
- return tx == null ?
txManager.beginImplicit(observableTimestampTracker) : tx;
+ return tx == null ?
txManager.beginImplicit(observableTimestampTracker, false) : tx;
}
/**
@@ -549,28 +552,173 @@ public class InternalTableImpl implements InternalTable {
}).thenCompose(x -> x);
}
+ /**
+ * Evaluates the single-row request to the cluster for a read-only
single-partition transaction.
+ *
+ * @param row Binary row.
+ * @param op Operation.
+ * @param <R> Result type.
+ * @return The future.
+ */
+ private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(
+ BinaryRowEx row,
+ BiFunction<ReplicationGroupId, Long, ReplicaRequest> op
+ ) {
+ InternalTransaction tx =
txManager.beginImplicit(observableTimestampTracker, true);
+
+ int partId = partitionId(row);
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
+
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(
+ tablePartitionId,
+ tx.startTimestamp(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ TimeUnit.SECONDS
+ );
+
+ CompletableFuture<R> fut =
primaryReplicaFuture.thenCompose(primaryReplica -> {
+ try {
+ ClusterNode node =
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+
+ if (node == null) {
+ throw new TransactionException(REPLICA_UNAVAILABLE_ERR,
"Failed to resolve the primary replica node [consistentId="
+ + primaryReplica.getLeaseholder() + ']');
+ }
+
+ return replicaSvc.invoke(node, op.apply(tablePartitionId,
primaryReplica.getStartTime().longValue()));
+ } catch (Throwable e) {
+ throw new TransactionException(
+ INTERNAL_ERR,
+ IgniteStringFormatter.format(
+ "Failed to invoke the replica request
[tableName={}, partId={}]",
+ tableName,
+ partId
+ ),
+ e
+ );
+ }
+ });
+
+ return postEvaluate(fut, tx);
+ }
+
+ /**
+ * Evaluates the multi-row request to the cluster for a read-only
single-partition transaction.
+ *
+ * @param rows Rows.
+ * @param op Replica requests factory.
+ * @param <R> Result type.
+ * @return The future.
+ */
+ private <R> CompletableFuture<R> evaluateReadOnlyPrimaryNode(
+ Collection<BinaryRowEx> rows,
+ BiFunction<ReplicationGroupId, Long, ReplicaRequest> op
+ ) {
+ InternalTransaction tx =
txManager.beginImplicit(observableTimestampTracker, true);
+
+ int partId = partitionId(rows.iterator().next());
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
+
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(
+ tablePartitionId,
+ tx.startTimestamp(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ TimeUnit.SECONDS
+ );
+
+ CompletableFuture<R> fut =
primaryReplicaFuture.thenCompose(primaryReplica -> {
+ try {
+ ClusterNode node =
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+
+ if (node == null) {
+ throw new TransactionException(REPLICA_UNAVAILABLE_ERR,
"Failed to resolve the primary replica node [consistentId="
+ + primaryReplica.getLeaseholder() + ']');
+ }
+
+ return replicaSvc.invoke(node, op.apply(tablePartitionId,
primaryReplica.getStartTime().longValue()));
+ } catch (Throwable e) {
+ throw new TransactionException(
+ INTERNAL_ERR,
+ IgniteStringFormatter.format(
+ "Failed to invoke the replica request
[tableName={}, partId={}]",
+ tableName,
+ partId
+ ),
+ e
+ );
+ }
+ });
+
+ return postEvaluate(fut, tx);
+ }
+
+ /**
+ * Performs post evaluate operation.
+ *
+ * @param fut The future.
+ * @param tx The transaction.
+ * @param <R> Operation return type.
+ * @return The future.
+ */
+ private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut,
InternalTransaction tx) {
+ return fut.handle((BiFunction<R, Throwable, CompletableFuture<R>>) (r,
e) -> {
+ if (e != null) {
+ RuntimeException e0 = wrapReplicationException(e);
+
+ return tx.finish(false, clock.now())
+ .handle((ignored, err) -> {
+
+ if (err != null) {
+ e0.addSuppressed(err);
+ }
+ throw e0;
+ }); // Preserve failed state.
+ }
+
+ return tx.finish(true, clock.now())
+ .exceptionally(ex -> {
+ throw wrapReplicationException(ex);
+ })
+ .thenApply(ignored -> r);
+ }).thenCompose(x -> x);
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow,
InternalTransaction tx) {
- if (tx != null && tx.isReadOnly()) {
- return evaluateReadOnlyRecipientNode(partitionId(keyRow))
- .thenCompose(recipientNode -> get(keyRow,
tx.readTimestamp(), recipientNode));
- } else {
- return enlistInTx(
+ if (tx == null) {
+ return evaluateReadOnlyPrimaryNode(
keyRow,
- tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+ (groupId, consistencyToken) ->
tableMessagesFactory.readOnlyDirectSingleRowReplicaRequest()
.groupId(groupId)
+ .enlistmentConsistencyToken(consistencyToken)
.primaryKey(keyRow.tupleSlice())
-
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .transactionId(txo.id())
- .term(term)
- .requestType(RequestType.RW_GET)
- .timestampLong(clock.nowLong())
- .full(tx == null)
+ .requestType(RequestType.RO_GET)
.build()
);
}
+
+ if (tx.isReadOnly()) {
+ return evaluateReadOnlyRecipientNode(partitionId(keyRow))
+ .thenCompose(recipientNode -> get(keyRow,
tx.readTimestamp(), recipientNode));
+ }
+
+ return enlistInTx(
+ keyRow,
+ tx,
+ (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+ .groupId(groupId)
+ .primaryKey(keyRow.tupleSlice())
+
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+ .transactionId(txo.id())
+ .term(term)
+ .requestType(RequestType.RW_GET)
+ .timestampLong(clock.nowLong())
+ .full(tx == null)
+ .build()
+ );
}
@Override
@@ -591,35 +739,69 @@ public class InternalTableImpl implements InternalTable {
);
}
+ /**
+ * Checks that the batch of rows belongs to a single partition.
+ *
+ * @param rows Batch of rows.
+ * @return If all rows belong to one partition, the method returns true;
otherwise, it returns false.
+ */
+ private boolean isSinglePartitionBatch(Collection<BinaryRowEx> rows) {
+ Iterator<BinaryRowEx> rowIterator = rows.iterator();
+
+ int partId = partitionId(rowIterator.next());
+
+ while (rowIterator.hasNext()) {
+ BinaryRowEx row = rowIterator.next();
+
+ if (partId != partitionId(row)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<List<BinaryRow>> getAll(Collection<BinaryRowEx>
keyRows, InternalTransaction tx) {
- if (tx != null && tx.isReadOnly()) {
- BinaryRowEx firstRow = keyRows.iterator().next();
+ if (CollectionUtils.nullOrEmpty(keyRows)) {
+ return completedFuture(Collections.emptyList());
+ }
- if (firstRow == null) {
- return completedFuture(Collections.emptyList());
- } else {
- return evaluateReadOnlyRecipientNode(partitionId(firstRow))
- .thenCompose(recipientNode -> getAll(keyRows,
tx.readTimestamp(), recipientNode));
- }
- } else {
- return enlistInTx(
+ if (tx == null && isSinglePartitionBatch(keyRows)) {
+ return evaluateReadOnlyPrimaryNode(
keyRows,
- tx,
- (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
+ (groupId, consistencyToken) ->
tableMessagesFactory.readOnlyDirectMultiRowReplicaRequest()
.groupId(groupId)
- .primaryKeys(serializePrimaryKeys(keyRows0))
-
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
- .transactionId(txo.id())
- .term(term)
- .requestType(RequestType.RW_GET_ALL)
- .timestampLong(clock.nowLong())
- .full(full)
- .build(),
-
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
+ .enlistmentConsistencyToken(consistencyToken)
+ .primaryKeys(serializePrimaryKeys(keyRows))
+ .requestType(RequestType.RO_GET_ALL)
+ .build()
);
}
+
+ if (tx != null && tx.isReadOnly()) {
+ BinaryRowEx firstRow = keyRows.iterator().next();
+
+ return evaluateReadOnlyRecipientNode(partitionId(firstRow))
+ .thenCompose(recipientNode -> getAll(keyRows,
tx.readTimestamp(), recipientNode));
+ }
+
+ return enlistInTx(
+ keyRows,
+ tx,
+ (keyRows0, txo, groupId, term, full) ->
tableMessagesFactory.readWriteMultiRowPkReplicaRequest()
+ .groupId(groupId)
+ .primaryKeys(serializePrimaryKeys(keyRows0))
+
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
+ .transactionId(txo.id())
+ .term(term)
+ .requestType(RequestType.RW_GET_ALL)
+ .timestampLong(clock.nowLong())
+ .full(full)
+ .build(),
+ InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder
+ );
}
/** {@inheritDoc} */
@@ -713,7 +895,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int
partition) {
- InternalTransaction tx =
txManager.beginImplicit(observableTimestampTracker);
+ InternalTransaction tx =
txManager.beginImplicit(observableTimestampTracker, false);
TablePartitionId partGroupId = new TablePartitionId(tableId,
partition);
CompletableFuture<Void> fut = enlistWithRetry(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
index 863d01f5e2..3a0b2daf0f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
@@ -246,7 +246,7 @@ public class RepeatedFinishReadWriteTransactionTest extends
BaseIgniteAbstractTe
}
@Override
- public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker) {
+ public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker, boolean readOnly) {
throw new UnsupportedOperationException("Not implemented");
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index d5f9fa98d1..3ac7e0ee00 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
@@ -1958,6 +1959,68 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
tx.commit();
}
+ @Test
+ public void testSingleGet() {
+ var accountRecordsView = accounts.recordView();
+
+ accountRecordsView.upsert(null, makeValue(1, 100.));
+
+ Transaction tx1 = igniteTransactions.begin();
+ Transaction tx2 = igniteTransactions.begin();
+
+ accountRecordsView.upsert(tx1, makeValue(1, 200.));
+
+ assertThrows(TransactionException.class, () ->
accountRecordsView.get(tx2, makeKey(1)));
+
+ assertEquals(100., accountRecordsView.get(null,
makeKey(1)).doubleValue("balance"));
+
+ tx1.commit();
+
+ assertEquals(200., accountRecordsView.get(null,
makeKey(1)).doubleValue("balance"));
+ }
+
+ @Test
+ public void testBatchSinglePartitionGet() throws Exception {
+ var accountRecordsView = accounts.recordView();
+
+ var marshaller = new TupleMarshallerImpl(accounts.schemaView());
+
+ int partId =
accounts.internalTable().partition(marshaller.marshalKey(makeKey(0)));
+
+ ArrayList<Integer> keys = new ArrayList<>(10);
+ keys.add(0);
+
+ for (int i = 1; i < 10_000 && keys.size() < 10; i++) {
+ var p =
accounts.internalTable().partition(marshaller.marshalKey(makeKey(i)));
+
+ if (p == partId) {
+ keys.add(i);
+ }
+ }
+
+ log.info("A batch of keys for a single partition is found [partId={},
keys{}]", partId, keys);
+
+ accountRecordsView.upsertAll(null, keys.stream().map(k -> makeValue(k,
100.)).collect(toList()));
+
+ Transaction tx1 = igniteTransactions.begin();
+ Transaction tx2 = igniteTransactions.begin();
+
+ accountRecordsView.upsertAll(tx1, keys.stream().map(k -> makeValue(k,
200.)).collect(toList()));
+
+ assertThrows(TransactionException.class,
+ () -> accountRecordsView.getAll(tx2, keys.stream().map(k ->
makeKey(k)).collect(toList())));
+
+ for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k
-> makeKey(k)).collect(toList()))) {
+ assertEquals(100., tuple.doubleValue("balance"));
+ }
+
+ tx1.commit();
+
+ for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k
-> makeKey(k)).collect(toList()))) {
+ assertEquals(200., tuple.doubleValue("balance"));
+ }
+ }
+
/**
* Checks operations that act after a transaction is committed, are
finished with exception.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index 784a1a1b35..7a60fd949f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -105,4 +105,15 @@ public interface InternalTransaction extends Transaction {
* provided no transaction for an operation) or not.
*/
boolean implicit();
+
+ /**
+ * Finishes a read-only transaction with a specific execution timestamp.
+ *
+ * @param commit Commit flag. The flag is ignored for read-only
transactions.
+ * @param executionTimestamp The timestamp is the time when a read-only
transaction is applied to the remote node.
+ * @return The future.
+ */
+ default CompletableFuture<Void> finish(boolean commit, HybridTimestamp
executionTimestamp) {
+ return commit ? commitAsync() : rollbackAsync();
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index fc06c3ea15..2052a4e6bd 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -64,9 +64,11 @@ public interface TxManager extends IgniteComponent {
*
* @param timestampTracker Observable timestamp tracker is used to track a
timestamp for either read-write or read-only
* transaction execution. The tracker is also used to determine
the read timestamp for read-only transactions.
+ * @param readOnly {@code true} in order to start a read-only transaction,
{@code false} in order to start read-write one.
+ * Calling begin with readOnly {@code false} is an equivalent of
TxManager#begin().
* @return The transaction.
*/
- InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker);
+ InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker,
boolean readOnly);
/**
* Returns a transaction state meta.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
index a721e16708..8ce44dfb98 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
@@ -40,15 +40,19 @@ public abstract class IgniteAbstractTransactionImpl
implements InternalTransacti
/** The transaction manager. */
protected final TxManager txManager;
+ private final boolean implicit;
+
/**
* The constructor.
*
* @param txManager The tx manager.
* @param id The id.
+ * @param implicit Whether the transaction will be implicit or not.
*/
- public IgniteAbstractTransactionImpl(TxManager txManager, UUID id) {
+ public IgniteAbstractTransactionImpl(TxManager txManager, UUID id, boolean
implicit) {
this.txManager = txManager;
this.id = id;
+ this.implicit = implicit;
}
/** {@inheritDoc} */
@@ -57,6 +61,12 @@ public abstract class IgniteAbstractTransactionImpl
implements InternalTransacti
return id;
}
+ /** {@inheritDoc} */
+ @Override
+ public boolean implicit() {
+ return implicit;
+ }
+
/** {@inheritDoc} */
@Nullable
@Override
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 8cfd7b6da8..18200874d2 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.network.ClusterNode;
@@ -39,17 +40,29 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
/** Prevents double finish of the transaction. */
private final AtomicBoolean finishGuard = new AtomicBoolean();
+ /** The tracker is used to track an observable timestamp. */
+ private final HybridTimestampTracker observableTsTracker;
+
/**
* The constructor.
*
* @param txManager The tx manager.
+ * @param observableTsTracker Observable timestamp tracker.
* @param id The id.
* @param readTimestamp The read timestamp.
+ * @param implicit Whether the transaction will be implicit or not.
*/
- ReadOnlyTransactionImpl(TxManagerImpl txManager, UUID id, HybridTimestamp
readTimestamp) {
- super(txManager, id);
+ ReadOnlyTransactionImpl(
+ TxManagerImpl txManager,
+ HybridTimestampTracker observableTsTracker,
+ UUID id,
+ HybridTimestamp readTimestamp,
+ boolean implicit
+ ) {
+ super(txManager, id, implicit);
this.readTimestamp = readTimestamp;
+ this.observableTsTracker = observableTsTracker;
}
@Override
@@ -67,11 +80,6 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
return readTimestamp;
}
- @Override
- public boolean implicit() {
- return false;
- }
-
@Override
public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
// TODO: IGNITE-17666 Close cursor tx finish and do it on the first
finish invocation only.
@@ -101,10 +109,17 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
@Override
// TODO: IGNITE-17666 Close cursor tx finish and do it on the first finish
invocation only.
protected CompletableFuture<Void> finish(boolean commit) {
+ return finish(commit, readTimestamp);
+ }
+
+ @Override
+ public CompletableFuture<Void> finish(boolean commit, HybridTimestamp
executionTimestamp) {
if (!finishGuard.compareAndSet(false, true)) {
return completedFuture(null);
}
+ observableTsTracker.update(executionTimestamp);
+
return ((TxManagerImpl)
txManager).completeReadOnlyTransactionFuture(new
TxIdAndTimestamp(readTimestamp, id()))
.thenRun(() -> txManager.updateTxMeta(
id(),
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index e90ed3e2a4..631d5d6c14 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -62,8 +62,6 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** The tracker is used to track an observable timestamp. */
private final HybridTimestampTracker observableTsTracker;
- private final boolean implicit;
-
/** A partition which stores the transaction state. */
private volatile TablePartitionId commitPart;
@@ -90,10 +88,9 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
* @param implicit Whether the transaction will be implicit or not.
*/
public ReadWriteTransactionImpl(TxManager txManager,
HybridTimestampTracker observableTsTracker, UUID id, boolean implicit) {
- super(txManager, id);
+ super(txManager, id, implicit);
this.observableTsTracker = observableTsTracker;
- this.implicit = implicit;
}
/** {@inheritDoc} */
@@ -200,9 +197,4 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
public HybridTimestamp startTimestamp() {
return TransactionIds.beginTimestamp(id());
}
-
- @Override
- public boolean implicit() {
- return implicit;
- }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index a14b5964a5..aa84bef5bc 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -156,8 +156,8 @@ public class TxManagerImpl implements TxManager {
}
@Override
- public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker) {
- return beginTx(timestampTracker, false, true);
+ public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker, boolean readOnly) {
+ return beginTx(timestampTracker, readOnly, true);
}
private InternalTransaction beginTx(HybridTimestampTracker
timestampTracker, boolean readOnly, boolean implicit) {
@@ -175,8 +175,6 @@ public class TxManagerImpl implements TxManager {
? HybridTimestamp.max(observableTimestamp,
currentReadTimestamp())
: currentReadTimestamp();
- timestampTracker.update(readTimestamp);
-
lowWatermarkReadWriteLock.readLock().lock();
try {
@@ -196,7 +194,7 @@ public class TxManagerImpl implements TxManager {
return new CompletableFuture<>();
});
- return new ReadOnlyTransactionImpl(this, txId, readTimestamp);
+ return new ReadOnlyTransactionImpl(this, timestampTracker, txId,
readTimestamp, implicit);
} finally {
lowWatermarkReadWriteLock.readLock().unlock();
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 5b6d73d1c1..5569cf8c9b 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -107,17 +107,26 @@ public class TxManagerTest extends IgniteAbstractTest {
InternalTransaction tx0 = txManager.begin(hybridTimestampTracker);
InternalTransaction tx1 = txManager.begin(hybridTimestampTracker);
InternalTransaction tx2 = txManager.begin(hybridTimestampTracker,
true);
- InternalTransaction tx3 =
txManager.beginImplicit(hybridTimestampTracker);
+ InternalTransaction tx3 =
txManager.beginImplicit(hybridTimestampTracker, false);
+ InternalTransaction tx4 =
txManager.beginImplicit(hybridTimestampTracker, true);
assertNotNull(tx0.id());
assertNotNull(tx1.id());
assertNotNull(tx2.id());
assertNotNull(tx3.id());
+ assertNotNull(tx4.id());
assertFalse(tx0.implicit());
assertFalse(tx1.implicit());
assertFalse(tx2.implicit());
assertTrue(tx3.implicit());
+ assertTrue(tx4.implicit());
+
+ assertFalse(tx0.isReadOnly());
+ assertFalse(tx1.isReadOnly());
+ assertTrue(tx2.isReadOnly());
+ assertFalse(tx3.isReadOnly());
+ assertTrue(tx4.isReadOnly());
}
@Test
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index cf314addee..65c9cb732a 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -40,7 +41,7 @@ class ReadOnlyTransactionImplTest extends
BaseIgniteAbstractTest {
HybridTimestamp readTimestamp = new HybridClockImpl().now();
UUID txId =
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(readTimestamp);
- var tx = new ReadOnlyTransactionImpl(txManager, txId, readTimestamp);
+ var tx = new ReadOnlyTransactionImpl(txManager, new
HybridTimestampTracker(), txId, readTimestamp, false);
assertThat(tx.startTimestamp(), is(readTimestamp));
}