This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1c0ee27562 IGNITE-19682 .NET: Add tx partition awareness (lazy tx
start) (#3728)
1c0ee27562 is described below
commit 1c0ee275624f07b1dab1a2c0a0307ae74ab0a579
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri May 10 07:05:33 2024 +0300
IGNITE-19682 .NET: Add tx partition awareness (lazy tx start) (#3728)
**Problem**
Client starts transaction on a random node. We don't know which keys will
participate at the moment of `Begin()` call. Chosen target node (coordinator)
might have none of the participating keys, which affects performance due to
extra network calls.
**Fix**
* Start client transaction lazily, together with the first enlisted
operation
* Choose coordinator using partition awareness for the first operation
---
.../PartitionAwarenessRealClusterTests.cs | 21 ++-
.../Apache.Ignite.Tests/PartitionAwarenessTests.cs | 198 +++++++++++---------
.../Proto/ColocationHashTests.cs | 2 +-
.../dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs | 2 +
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 6 -
.../dotnet/Apache.Ignite.Tests/TestUtils.cs | 10 +
.../Transactions/TransactionsTests.cs | 18 +-
.../Internal/Buffers/PooledArrayBuffer.cs | 8 +
.../Apache.Ignite/Internal/IgniteClientInternal.cs | 2 +-
.../Internal/Proto/MsgPack/MsgPackWriter.cs | 14 +-
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 4 +-
.../Apache.Ignite/Internal/Table/RecordView.cs | 42 ++++-
.../Table/Serialization/RecordSerializer.cs | 48 ++---
.../dotnet/Apache.Ignite/Internal/Table/Table.cs | 5 +-
.../Internal/Transactions/LazyTransaction.cs | 205 +++++++++++++++++++++
.../Internal/Transactions/Transaction.cs | 132 +------------
.../Internal/Transactions/TransactionExtensions.cs | 49 -----
.../Internal/Transactions/Transactions.cs | 64 ++-----
.../Apache.Ignite/Transactions/ITransactions.cs | 8 +-
19 files changed, 479 insertions(+), 359 deletions(-)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
index 3dfe065c6e..3059ad29e5 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
@@ -34,9 +34,10 @@ public class PartitionAwarenessRealClusterTests :
IgniteTestsBase
/// Uses <see cref="ComputeTests.NodeNameJob"/> to get the name of the
node that should be the primary for the given key,
/// and compares to the actual node that received the request (using
IgniteProxy).
/// </summary>
+ /// <param name="withTx">Whether to use transactions.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous unit
test.</returns>
[Test]
- public async Task TestPutRoutesRequestToPrimaryNode()
+ public async Task TestPutRoutesRequestToPrimaryNode([Values(true, false)]
bool withTx)
{
var proxies = GetProxies();
using var client = await IgniteClient.StartAsync(GetConfig(proxies));
@@ -64,10 +65,22 @@ public class PartitionAwarenessRealClusterTests :
IgniteTestsBase
continue;
}
- await recordView.UpsertAsync(null, keyTuple);
- var requestTargetNodeName = GetRequestTargetNodeName(proxies,
ClientOp.TupleUpsert);
+ var tx = withTx ? await client.Transactions.BeginAsync() : null;
- Assert.AreEqual(primaryNodeName, requestTargetNodeName);
+ try
+ {
+ await recordView.UpsertAsync(tx, keyTuple);
+ var requestTargetNodeName = GetRequestTargetNodeName(proxies,
ClientOp.TupleUpsert);
+
+ Assert.AreEqual(primaryNodeName, requestTargetNodeName);
+ }
+ finally
+ {
+ if (tx != null)
+ {
+ await tx.RollbackAsync();
+ }
+ }
}
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
index c3c7059aa7..aa66a42d3b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessTests.cs
@@ -24,7 +24,9 @@ using System.Threading.Tasks;
using Compute;
using Ignite.Compute;
using Ignite.Table;
+using Ignite.Transactions;
using Internal.Proto;
+using Internal.Transactions;
using NUnit.Framework;
/// <summary>
@@ -81,10 +83,10 @@ public class PartitionAwarenessTests
await recordView.UpsertAsync(null, 1);
// Check.
- await AssertOpOnNode(async () => await recordView.UpsertAsync(null,
1), ClientOp.TupleUpsert, _server2, _server1);
- await AssertOpOnNode(async () => await recordView.UpsertAsync(null,
3), ClientOp.TupleUpsert, _server1, _server2);
- await AssertOpOnNode(async () => await recordView.UpsertAsync(null,
4), ClientOp.TupleUpsert, _server2, _server1);
- await AssertOpOnNode(async () => await recordView.UpsertAsync(null,
7), ClientOp.TupleUpsert, _server1, _server2);
+ await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 1),
ClientOp.TupleUpsert, _server2, _server1);
+ await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 3),
ClientOp.TupleUpsert, _server1, _server2);
+ await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 4),
ClientOp.TupleUpsert, _server2, _server1);
+ await AssertOpOnNode(async tx => await recordView.UpsertAsync(tx, 7),
ClientOp.TupleUpsert, _server1, _server2);
}
[Test]
@@ -93,8 +95,13 @@ public class PartitionAwarenessTests
using var client = await GetClient();
var recordView = (await
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
+ var txServer = _server1;
+ txServer.ClearOps();
+
var tx = await client.Transactions.BeginAsync();
- var txServer = new[] { _server1, _server2 }.Single(x =>
x.ClientOps.Contains(ClientOp.TxBegin));
+ await TestUtils.ForceLazyTxStart(tx, client,
PreferredNode.FromName(_server1.Node.Name));
+
+ Assert.AreEqual(ClientOp.TxBegin, txServer.ClientOps.Single());
for (int i = 0; i < 10; i++)
{
@@ -106,12 +113,12 @@ public class PartitionAwarenessTests
[Test]
public async Task TestClientReceivesPartitionAssignmentUpdates() =>
- await TestClientReceivesPartitionAssignmentUpdates(view =>
view.UpsertAsync(null, 1), ClientOp.TupleUpsert);
+ await TestClientReceivesPartitionAssignmentUpdates((view, tx) =>
view.UpsertAsync(tx, 1), ClientOp.TupleUpsert);
[Test]
public async Task TestDataStreamerReceivesPartitionAssignmentUpdates() =>
await TestClientReceivesPartitionAssignmentUpdates(
- view => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
+ (view, _) => view.StreamDataAsync(new[] { 1 }.ToAsyncEnumerable()),
ClientOp.StreamerBatchSend);
[Test]
@@ -128,25 +135,25 @@ public class PartitionAwarenessTests
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
- await AssertOpOnNode(() => recordView.GetAsync(null, key),
ClientOp.TupleGet, expectedNode);
- await AssertOpOnNode(() => recordView.GetAndDeleteAsync(null, key),
ClientOp.TupleGetAndDelete, expectedNode);
- await AssertOpOnNode(() => recordView.GetAndReplaceAsync(null, key),
ClientOp.TupleGetAndReplace, expectedNode);
- await AssertOpOnNode(() => recordView.GetAndUpsertAsync(null, key),
ClientOp.TupleGetAndUpsert, expectedNode);
- await AssertOpOnNode(() => recordView.UpsertAsync(null, key),
ClientOp.TupleUpsert, expectedNode);
- await AssertOpOnNode(() => recordView.InsertAsync(null, key),
ClientOp.TupleInsert, expectedNode);
- await AssertOpOnNode(() => recordView.ReplaceAsync(null, key),
ClientOp.TupleReplace, expectedNode);
- await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key),
ClientOp.TupleReplaceExact, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteAsync(null, key),
ClientOp.TupleDelete, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key),
ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAsync(tx, key),
ClientOp.TupleGet, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAndDeleteAsync(tx, key),
ClientOp.TupleGetAndDelete, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAndReplaceAsync(tx, key),
ClientOp.TupleGetAndReplace, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAndUpsertAsync(tx, key),
ClientOp.TupleGetAndUpsert, expectedNode);
+ await AssertOpOnNode(tx => recordView.UpsertAsync(tx, key),
ClientOp.TupleUpsert, expectedNode);
+ await AssertOpOnNode(tx => recordView.InsertAsync(tx, key),
ClientOp.TupleInsert, expectedNode);
+ await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key),
ClientOp.TupleReplace, expectedNode);
+ await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key, key),
ClientOp.TupleReplaceExact, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteAsync(tx, key),
ClientOp.TupleDelete, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteExactAsync(tx, key),
ClientOp.TupleDeleteExact, expectedNode);
+ await AssertOpOnNode(_ => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new
IgniteTuple { ["ID"] = keyId + 1 } };
- await AssertOpOnNode(() => recordView.GetAllAsync(null, keys),
ClientOp.TupleGetAll, expectedNode);
- await AssertOpOnNode(() => recordView.InsertAllAsync(null, keys),
ClientOp.TupleInsertAll, expectedNode);
- await AssertOpOnNode(() => recordView.UpsertAllAsync(null, keys),
ClientOp.TupleUpsertAll, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteAllAsync(null, keys),
ClientOp.TupleDeleteAll, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteAllExactAsync(null, keys),
ClientOp.TupleDeleteAllExact, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAllAsync(tx, keys),
ClientOp.TupleGetAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.InsertAllAsync(tx, keys),
ClientOp.TupleInsertAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.UpsertAllAsync(tx, keys),
ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteAllAsync(tx, keys),
ClientOp.TupleDeleteAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteAllExactAsync(tx, keys),
ClientOp.TupleDeleteAllExact, expectedNode);
}
[Test]
@@ -162,25 +169,25 @@ public class PartitionAwarenessTests
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
- await AssertOpOnNode(() => recordView.GetAsync(null, key),
ClientOp.TupleGet, expectedNode);
- await AssertOpOnNode(() => recordView.GetAndDeleteAsync(null, key),
ClientOp.TupleGetAndDelete, expectedNode);
- await AssertOpOnNode(() => recordView.GetAndReplaceAsync(null, key),
ClientOp.TupleGetAndReplace, expectedNode);
- await AssertOpOnNode(() => recordView.GetAndUpsertAsync(null, key),
ClientOp.TupleGetAndUpsert, expectedNode);
- await AssertOpOnNode(() => recordView.UpsertAsync(null, key),
ClientOp.TupleUpsert, expectedNode);
- await AssertOpOnNode(() => recordView.InsertAsync(null, key),
ClientOp.TupleInsert, expectedNode);
- await AssertOpOnNode(() => recordView.ReplaceAsync(null, key),
ClientOp.TupleReplace, expectedNode);
- await AssertOpOnNode(() => recordView.ReplaceAsync(null, key, key),
ClientOp.TupleReplaceExact, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteAsync(null, key),
ClientOp.TupleDelete, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteExactAsync(null, key),
ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAsync(tx, key),
ClientOp.TupleGet, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAndDeleteAsync(tx, key),
ClientOp.TupleGetAndDelete, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAndReplaceAsync(tx, key),
ClientOp.TupleGetAndReplace, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAndUpsertAsync(tx, key),
ClientOp.TupleGetAndUpsert, expectedNode);
+ await AssertOpOnNode(tx => recordView.UpsertAsync(tx, key),
ClientOp.TupleUpsert, expectedNode);
+ await AssertOpOnNode(tx => recordView.InsertAsync(tx, key),
ClientOp.TupleInsert, expectedNode);
+ await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key),
ClientOp.TupleReplace, expectedNode);
+ await AssertOpOnNode(tx => recordView.ReplaceAsync(tx, key, key),
ClientOp.TupleReplaceExact, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteAsync(tx, key),
ClientOp.TupleDelete, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteExactAsync(tx, key),
ClientOp.TupleDeleteExact, expectedNode);
+ await AssertOpOnNode(_ => recordView.StreamDataAsync(new[] { key
}.ToAsyncEnumerable()), ClientOp.StreamerBatchSend, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
- await AssertOpOnNode(() => recordView.GetAllAsync(null, keys),
ClientOp.TupleGetAll, expectedNode);
- await AssertOpOnNode(() => recordView.InsertAllAsync(null, keys),
ClientOp.TupleInsertAll, expectedNode);
- await AssertOpOnNode(() => recordView.UpsertAllAsync(null, keys),
ClientOp.TupleUpsertAll, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteAllAsync(null, keys),
ClientOp.TupleDeleteAll, expectedNode);
- await AssertOpOnNode(() => recordView.DeleteAllExactAsync(null, keys),
ClientOp.TupleDeleteAllExact, expectedNode);
+ await AssertOpOnNode(tx => recordView.GetAllAsync(tx, keys),
ClientOp.TupleGetAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.InsertAllAsync(tx, keys),
ClientOp.TupleInsertAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.UpsertAllAsync(tx, keys),
ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteAllAsync(tx, keys),
ClientOp.TupleDeleteAll, expectedNode);
+ await AssertOpOnNode(tx => recordView.DeleteAllExactAsync(tx, keys),
ClientOp.TupleDeleteAllExact, expectedNode);
}
[Test]
@@ -198,27 +205,27 @@ public class PartitionAwarenessTests
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
- await AssertOpOnNode(() => kvView.GetAsync(null, key),
ClientOp.TupleGet, expectedNode);
- await AssertOpOnNode(() => kvView.GetAndRemoveAsync(null, key),
ClientOp.TupleGetAndDelete, expectedNode);
- await AssertOpOnNode(() => kvView.GetAndReplaceAsync(null, key, val),
ClientOp.TupleGetAndReplace, expectedNode);
- await AssertOpOnNode(() => kvView.GetAndPutAsync(null, key, val),
ClientOp.TupleGetAndUpsert, expectedNode);
- await AssertOpOnNode(() => kvView.PutAsync(null, key, val),
ClientOp.TupleUpsert, expectedNode);
- await AssertOpOnNode(() => kvView.PutIfAbsentAsync(null, key, val),
ClientOp.TupleInsert, expectedNode);
- await AssertOpOnNode(() => kvView.ReplaceAsync(null, key, val),
ClientOp.TupleReplace, expectedNode);
- await AssertOpOnNode(() => kvView.ReplaceAsync(null, key, val, val),
ClientOp.TupleReplaceExact, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAsync(null, key),
ClientOp.TupleDelete, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAsync(null, key, val),
ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => kvView.ContainsAsync(null, key),
ClientOp.TupleContainsKey, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAsync(tx, key),
ClientOp.TupleGet, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAndRemoveAsync(tx, key),
ClientOp.TupleGetAndDelete, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAndReplaceAsync(tx, key, val),
ClientOp.TupleGetAndReplace, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAndPutAsync(tx, key, val),
ClientOp.TupleGetAndUpsert, expectedNode);
+ await AssertOpOnNode(tx => kvView.PutAsync(tx, key, val),
ClientOp.TupleUpsert, expectedNode);
+ await AssertOpOnNode(tx => kvView.PutIfAbsentAsync(tx, key, val),
ClientOp.TupleInsert, expectedNode);
+ await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val),
ClientOp.TupleReplace, expectedNode);
+ await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val, val),
ClientOp.TupleReplaceExact, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key),
ClientOp.TupleDelete, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key, val),
ClientOp.TupleDeleteExact, expectedNode);
+ await AssertOpOnNode(tx => kvView.ContainsAsync(tx, key),
ClientOp.TupleContainsKey, expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, new IgniteTuple { ["ID"] = keyId - 1 }, new
IgniteTuple { ["ID"] = keyId + 1 } };
var pairs = keys.ToDictionary(x => (IIgniteTuple)x, _ =>
(IIgniteTuple)val);
- await AssertOpOnNode(() => kvView.GetAllAsync(null, keys),
ClientOp.TupleGetAll, expectedNode);
- await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs),
ClientOp.TupleUpsertAll, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys),
ClientOp.TupleDeleteAll, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs),
ClientOp.TupleDeleteAllExact, expectedNode);
- await AssertOpOnNode(() =>
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.StreamerBatchSend,
expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAllAsync(tx, keys),
ClientOp.TupleGetAll, expectedNode);
+ await AssertOpOnNode(tx => kvView.PutAllAsync(tx, pairs),
ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, keys),
ClientOp.TupleDeleteAll, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, pairs),
ClientOp.TupleDeleteAllExact, expectedNode);
+ await AssertOpOnNode(_ =>
kvView.StreamDataAsync(pairs.ToAsyncEnumerable()), ClientOp.StreamerBatchSend,
expectedNode);
}
[Test]
@@ -235,29 +242,29 @@ public class PartitionAwarenessTests
// Single-key operations.
var expectedNode = node == 1 ? _server1 : _server2;
- await AssertOpOnNode(() => kvView.GetAsync(null, key),
ClientOp.TupleGet, expectedNode);
- await AssertOpOnNode(() => kvView.GetAndRemoveAsync(null, key),
ClientOp.TupleGetAndDelete, expectedNode);
- await AssertOpOnNode(() => kvView.GetAndReplaceAsync(null, key, val),
ClientOp.TupleGetAndReplace, expectedNode);
- await AssertOpOnNode(() => kvView.GetAndPutAsync(null, key, val),
ClientOp.TupleGetAndUpsert, expectedNode);
- await AssertOpOnNode(() => kvView.PutAsync(null, key, val),
ClientOp.TupleUpsert, expectedNode);
- await AssertOpOnNode(() => kvView.PutIfAbsentAsync(null, key, val),
ClientOp.TupleInsert, expectedNode);
- await AssertOpOnNode(() => kvView.ReplaceAsync(null, key, val),
ClientOp.TupleReplace, expectedNode);
- await AssertOpOnNode(() => kvView.ReplaceAsync(null, key, val, val),
ClientOp.TupleReplaceExact, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAsync(null, key),
ClientOp.TupleDelete, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAsync(null, key, val),
ClientOp.TupleDeleteExact, expectedNode);
- await AssertOpOnNode(() => kvView.ContainsAsync(null, key),
ClientOp.TupleContainsKey, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAsync(tx, key),
ClientOp.TupleGet, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAndRemoveAsync(tx, key),
ClientOp.TupleGetAndDelete, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAndReplaceAsync(tx, key, val),
ClientOp.TupleGetAndReplace, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAndPutAsync(tx, key, val),
ClientOp.TupleGetAndUpsert, expectedNode);
+ await AssertOpOnNode(tx => kvView.PutAsync(tx, key, val),
ClientOp.TupleUpsert, expectedNode);
+ await AssertOpOnNode(tx => kvView.PutIfAbsentAsync(tx, key, val),
ClientOp.TupleInsert, expectedNode);
+ await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val),
ClientOp.TupleReplace, expectedNode);
+ await AssertOpOnNode(tx => kvView.ReplaceAsync(tx, key, val, val),
ClientOp.TupleReplaceExact, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key),
ClientOp.TupleDelete, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAsync(tx, key, val),
ClientOp.TupleDeleteExact, expectedNode);
+ await AssertOpOnNode(tx => kvView.ContainsAsync(tx, key),
ClientOp.TupleContainsKey, expectedNode);
await AssertOpOnNode(
- () => kvView.StreamDataAsync(new[] { new KeyValuePair<int,
int>(key, val) }.ToAsyncEnumerable()),
+ _ => kvView.StreamDataAsync(new[] { new KeyValuePair<int,
int>(key, val) }.ToAsyncEnumerable()),
ClientOp.StreamerBatchSend,
expectedNode);
// Multi-key operations use the first key for colocation.
var keys = new[] { key, key - 1, key + 1 };
var pairs = keys.ToDictionary(x => x, _ => val);
- await AssertOpOnNode(() => kvView.GetAllAsync(null, keys),
ClientOp.TupleGetAll, expectedNode);
- await AssertOpOnNode(() => kvView.PutAllAsync(null, pairs),
ClientOp.TupleUpsertAll, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAllAsync(null, keys),
ClientOp.TupleDeleteAll, expectedNode);
- await AssertOpOnNode(() => kvView.RemoveAllAsync(null, pairs),
ClientOp.TupleDeleteAllExact, expectedNode);
+ await AssertOpOnNode(tx => kvView.GetAllAsync(tx, keys),
ClientOp.TupleGetAll, expectedNode);
+ await AssertOpOnNode(tx => kvView.PutAllAsync(tx, pairs),
ClientOp.TupleUpsertAll, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, keys),
ClientOp.TupleDeleteAll, expectedNode);
+ await AssertOpOnNode(tx => kvView.RemoveAllAsync(tx, pairs),
ClientOp.TupleDeleteAllExact, expectedNode);
}
[Test]
@@ -275,7 +282,7 @@ public class PartitionAwarenessTests
await Test("c", Guid.Parse("b0000000-0000-0000-0000-000000000000"),
_server1);
async Task Test(string idStr, Guid idGuid, FakeServer node) =>
- await AssertOpOnNode(() => view.UpsertAsync(null, new
CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
+ await AssertOpOnNode(tx => view.UpsertAsync(tx, new
CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
}
[Test]
@@ -292,7 +299,7 @@ public class PartitionAwarenessTests
await Test("c", Guid.NewGuid(), _server1);
async Task Test(string idStr, Guid idGuid, FakeServer node) =>
- await AssertOpOnNode(() => view.UpsertAsync(null, new
CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
+ await AssertOpOnNode(tx => view.UpsertAsync(tx, new
CompositeKey(idStr, idGuid)), ClientOp.TupleUpsert, node);
}
[Test]
@@ -307,7 +314,7 @@ public class PartitionAwarenessTests
await
client.Compute.SubmitColocatedAsync<object?>(FakeServer.ExistingTableName, key,
Array.Empty<DeploymentUnit>(), "job");
await AssertOpOnNode(
- () =>
client.Compute.SubmitColocatedAsync<object?>(FakeServer.ExistingTableName, key,
Array.Empty<DeploymentUnit>(), "job"),
+ _ =>
client.Compute.SubmitColocatedAsync<object?>(FakeServer.ExistingTableName, key,
Array.Empty<DeploymentUnit>(), "job"),
ClientOp.ComputeExecuteColocated,
expectedNode);
}
@@ -325,7 +332,7 @@ public class PartitionAwarenessTests
FakeServer.ExistingTableName, key, Array.Empty<DeploymentUnit>(),
"job");
await AssertOpOnNode(
- () => client.Compute.SubmitColocatedAsync<object?, SimpleKey>(
+ _ => client.Compute.SubmitColocatedAsync<object?, SimpleKey>(
FakeServer.ExistingTableName, key,
Array.Empty<DeploymentUnit>(), "job"),
ClientOp.ComputeExecuteColocated,
expectedNode);
@@ -339,7 +346,7 @@ public class PartitionAwarenessTests
// Check default assignment.
await recordView.UpsertAsync(null, 1);
- await AssertOpOnNode(() => recordView.UpsertAsync(null, 1),
ClientOp.TupleUpsert, _server2);
+ await AssertOpOnNode(tx => recordView.UpsertAsync(tx, 1),
ClientOp.TupleUpsert, _server2);
// One server has old assignment
_server1.PartitionAssignment =
_server1.PartitionAssignment.Reverse().ToArray();
@@ -356,20 +363,38 @@ public class PartitionAwarenessTests
_server2.ClearOps();
await recordView.UpsertAsync(null, 1);
- await AssertOpOnNode(() => recordView.UpsertAsync(null, 1),
ClientOp.TupleUpsert, _server2);
+ await AssertOpOnNode(tx => recordView.UpsertAsync(tx, 1),
ClientOp.TupleUpsert, _server2);
}
private static async Task AssertOpOnNode(
- Func<Task> action,
+ Func<ITransaction?, Task> action,
ClientOp op,
FakeServer node,
FakeServer? node2 = null,
bool allowExtraOps = false)
+ {
+ await AssertOpOnNodeInner(action, op, node, node2, allowExtraOps,
withTx: false);
+
+ if (op != ClientOp.StreamerBatchSend && op !=
ClientOp.ComputeExecuteColocated)
+ {
+ await AssertOpOnNodeInner(action, op, node, node2, allowExtraOps,
withTx: true);
+ }
+ }
+
+ private static async Task AssertOpOnNodeInner(
+ Func<ITransaction?, Task> action,
+ ClientOp op,
+ FakeServer node,
+ FakeServer? node2 = null,
+ bool allowExtraOps = false,
+ bool withTx = false)
{
node.ClearOps();
node2?.ClearOps();
- await action();
+ ITransaction? tx = withTx ? new LazyTransaction(default) : null;
+
+ await action(tx);
if (allowExtraOps)
{
@@ -377,7 +402,14 @@ public class PartitionAwarenessTests
}
else
{
- Assert.AreEqual(new[] { op }, node.ClientOps);
+ if (withTx)
+ {
+ Assert.AreEqual(new[] { ClientOp.TxBegin, op },
node.ClientOps);
+ }
+ else
+ {
+ Assert.AreEqual(new[] { op }, node.ClientOps);
+ }
}
if (node2 != null)
@@ -386,14 +418,14 @@ public class PartitionAwarenessTests
}
}
- private async Task
TestClientReceivesPartitionAssignmentUpdates(Func<IRecordView<int>, Task> func,
ClientOp op)
+ private async Task
TestClientReceivesPartitionAssignmentUpdates(Func<IRecordView<int>,
ITransaction?, Task> func, ClientOp op)
{
using var client = await GetClient();
var recordView = (await
client.Tables.GetTableAsync(FakeServer.ExistingTableName))!.GetRecordView<int>();
// Check default assignment.
await recordView.UpsertAsync(null, 1);
- await AssertOpOnNode(() => func(recordView), op, _server2);
+ await AssertOpOnNode(tx => func(recordView, tx), op, _server2);
// Update assignment.
var assignmentTimestamp = DateTime.UtcNow.Ticks;
@@ -409,7 +441,7 @@ public class PartitionAwarenessTests
await client.Tables.GetTablesAsync();
// Second request loads and uses new assignment.
- await AssertOpOnNode(() => func(recordView), op, _server1,
allowExtraOps: true);
+ await AssertOpOnNode(tx => func(recordView, tx), op, _server1,
allowExtraOps: true);
}
private async Task<IIgniteClient> GetClient()
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index 2a620f4d6a..c58f3f6c7f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -193,7 +193,7 @@ public class ColocationHashTests : IgniteTestsBase
var key = new IgniteTuple { ["id"] = 1 + i, ["id0"] = 2L + i,
["id1"] = "3" + i };
using var writer = ProtoCommon.GetMessageWriter();
- var clientColocationHash = ser.Write(writer, null, schema, key);
+ var (clientColocationHash, _) = ser.Write(writer, null, schema,
key);
var serverColocationHashExec = await
Client.Compute.SubmitAsync<int>(
clusterNodes,
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
index 5c1c7bc1fa..7c7201886a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RetryPolicyTests.cs
@@ -69,6 +69,7 @@ namespace Apache.Ignite.Tests
using var client = await server.ConnectClientAsync(cfg);
var tx = await client.Transactions.BeginAsync();
+ await TestUtils.ForceLazyTxStart(tx, client);
Assert.ThrowsAsync<IgniteClientConnectionException>(async () =>
await tx.CommitAsync());
Assert.IsEmpty(testRetryPolicy.Invocations);
@@ -227,6 +228,7 @@ namespace Apache.Ignite.Tests
using var server = new FakeServer(ctx => ctx.RequestCount % 2 ==
0);
using var client = await server.ConnectClientAsync(cfg);
var tx = await client.Transactions.BeginAsync();
+ await TestUtils.ForceLazyTxStart(tx, client);
var table = await
client.Tables.GetTableAsync(FakeServer.ExistingTableName);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index 4c6210ac15..b6e1b79ebf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -212,12 +212,6 @@ public class DataStreamerTests : IgniteTestsBase
[Values(1, 2, 100)] int pageSize,
[Values(true, false)] bool existingMinKey)
{
- if (pageSize > 1)
- {
- // TODO: IGNITE-21992 Data Streamer removal does not work for a
new key in the same batch
- return;
- }
-
var minKey = existingMinKey ? UpdatedKey : Interlocked.Add(ref
_unknownKey, 10);
await Table.GetRecordView<Poco>().StreamDataAsync(
GetData(),
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
index 13df36ce58..ea93777ebd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
@@ -24,8 +24,12 @@ namespace Apache.Ignite.Tests
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
+ using Ignite.Transactions;
+ using Internal;
using Internal.Buffers;
using Internal.Common;
+ using Internal.Proto;
+ using Internal.Transactions;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
@@ -96,6 +100,12 @@ namespace Apache.Ignite.Tests
#endif
}
+ internal static async Task ForceLazyTxStart(ITransaction tx, IIgnite
client, PreferredNode preferredNode = default) =>
+ await LazyTransaction.EnsureStartedAsync(
+ tx,
+ client.GetFieldValue<ClientFailoverSocket>("_socket"),
+ preferredNode);
+
private static FieldInfo GetNonPublicField(object obj, string
fieldName)
{
var field = obj.GetType().GetField(fieldName,
BindingFlags.Instance | BindingFlags.NonPublic);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index 9dbc31309b..30d7445154 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -19,13 +19,13 @@ namespace Apache.Ignite.Tests.Transactions
{
using System;
using System.Linq;
- using System.Text.RegularExpressions;
using System.Threading.Tasks;
- using System.Transactions;
using Ignite.Transactions;
using Internal;
+ using Internal.Transactions;
using NUnit.Framework;
using Table;
+ using Tx;
using TransactionOptions = Ignite.Transactions.TransactionOptions;
/// <summary>
@@ -165,6 +165,8 @@ namespace Apache.Ignite.Tests.Transactions
public async Task TestClientDisconnectClosesActiveTransactions()
{
await using var tx0 = await Client.Transactions.BeginAsync();
+ await TestUtils.ForceLazyTxStart(tx0, Client);
+
await TupleView.UpsertAsync(null, GetTuple(1, "1"));
using (var client2 = await IgniteClient.StartAsync(GetConfig()))
@@ -187,6 +189,7 @@ namespace Apache.Ignite.Tests.Transactions
{
using var client2 = await IgniteClient.StartAsync(GetConfig());
await using var tx = await client2.Transactions.BeginAsync();
+ await using var cursor = await client2.Sql.ExecuteAsync(tx,
"select 1"); // Force lazy tx start.
var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
await TupleView.UpsertAsync(tx, GetTuple(1, "2")));
Assert.AreEqual("Specified transaction belongs to a different
IgniteClient instance.", ex!.Message);
@@ -222,7 +225,7 @@ namespace Apache.Ignite.Tests.Transactions
public async Task TestUpdateInReadOnlyTxThrows()
{
await using var tx = await Client.Transactions.BeginAsync(new
TransactionOptions { ReadOnly = true });
- var ex = Assert.ThrowsAsync<Tx.TransactionException>(async () =>
await TupleView.UpsertAsync(tx, GetTuple(1, "1")));
+ var ex = Assert.ThrowsAsync<TransactionException>(async () =>
await TupleView.UpsertAsync(tx, GetTuple(1, "1")));
Assert.AreEqual(ErrorGroups.Transactions.TxFailedReadWriteOperation, ex!.Code,
ex.Message);
StringAssert.Contains("Failed to enlist read-write operation into
read-only transaction", ex.Message);
@@ -271,13 +274,18 @@ namespace Apache.Ignite.Tests.Transactions
using var client = await IgniteClient.StartAsync(new() { Endpoints
= { "127.0.0.1:" + ServerPort } });
await using var tx1 = await client.Transactions.BeginAsync();
+ await TestUtils.ForceLazyTxStart(tx1, client);
+
await using var tx2 = await
client.Transactions.BeginAsync(new(ReadOnly: true));
+ await TestUtils.ForceLazyTxStart(tx2, client);
+
await using var tx3 = await client.Transactions.BeginAsync();
+ await TestUtils.ForceLazyTxStart(tx3, client);
await tx2.RollbackAsync();
await tx3.CommitAsync();
- var id = int.Parse(Regex.Match(tx1.ToString()!, @"\d+").Value);
+ var id = LazyTransaction.Get(tx1)!.Id;
Assert.AreEqual($"Transaction {{ Id = {id}, State = Open,
IsReadOnly = False }}", tx1.ToString());
Assert.AreEqual($"Transaction {{ Id = {id + 1}, State =
RolledBack, IsReadOnly = True }}", tx2.ToString());
@@ -323,7 +331,7 @@ namespace Apache.Ignite.Tests.Transactions
}
else
{
- await client.Transactions.BeginAsync();
+ await TestUtils.ForceLazyTxStart(await
client.Transactions.BeginAsync(), client);
}
Assert.AreEqual(123, server.LastClientObservableTimestamp);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index aaef2724f9..85105d99d2 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -229,6 +229,14 @@ namespace Apache.Ignite.Internal.Buffers
_index += 8;
}
+ /// <summary>
+ /// Writes a long at specified position.
+ /// </summary>
+ /// <param name="val">Value.</param>
+ /// <param name="pos">Position.</param>
+ public void WriteLongBigEndian(long val, int pos) =>
+ BinaryPrimitives.WriteInt64BigEndian(_buffer.AsSpan(pos +
_prefixSize), val);
+
/// <summary>
/// Reads a short at specified position.
/// </summary>
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
index 92dd3f89a7..060783ae79 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/IgniteClientInternal.cs
@@ -52,7 +52,7 @@ namespace Apache.Ignite.Internal
var tables = new Tables(socket, sql);
Tables = tables;
- Transactions = new Transactions.Transactions(socket);
+ Transactions = new Transactions.Transactions();
Compute = new Compute.Compute(socket, tables);
Sql = sql;
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index 5058eb234e..f8bb8d07c4 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -22,7 +22,6 @@ using System.Buffers.Binary;
using System.Collections.Generic;
using BinaryTuple;
using Buffers;
-using Transactions;
/// <summary>
/// MsgPack writer. Wraps <see cref="PooledArrayBuffer"/>. Writer index is
kept by the buffer, so this struct is readonly.
@@ -42,6 +41,11 @@ internal readonly ref struct MsgPackWriter
Buf = buf;
}
+ /// <summary>
+ /// Gets the current position.
+ /// </summary>
+ public int Position => Buf.Position;
+
private PooledArrayBuffer Buf { get; }
/// <summary>
@@ -339,16 +343,16 @@ internal readonly ref struct MsgPackWriter
/// <summary>
/// Writes a transaction.
/// </summary>
- /// <param name="tx">Transaction.</param>
- public void WriteTx(Transaction? tx)
+ /// <param name="txId">Transaction id.</param>
+ public void WriteTx(long? txId)
{
- if (tx == null)
+ if (txId == null)
{
WriteNil();
}
else
{
- Write(tx.Id);
+ Write(txId.Value);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index 46442d96b4..9b21a8a547 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -157,7 +157,7 @@ namespace Apache.Ignite.Internal.Sql
{
IgniteArgumentCheck.NotNull(statement);
- Transaction? tx = transaction.ToInternal();
+ Transaction? tx = await
LazyTransaction.EnsureStartedAsync(transaction, _socket,
default).ConfigureAwait(false);
using var bufferWriter = ProtoCommon.GetMessageWriter();
WriteStatement(bufferWriter, statement, args, tx, writeTx: true);
@@ -231,7 +231,7 @@ namespace Apache.Ignite.Internal.Sql
if (writeTx)
{
- w.WriteTx(tx);
+ w.WriteTx(tx?.Id);
}
w.Write(statement.Schema);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 051baf2ce5..86ce4e8d00 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -405,12 +405,22 @@ namespace Apache.Ignite.Internal.Table
try
{
schema = await
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
- var tx = transaction.ToInternal();
+
+ LazyTransaction? lazyTx = LazyTransaction.Get(transaction);
+ var txId = lazyTx?.Id;
using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.Write(writer, tx, schema, record,
keyOnly);
+ var (colocationHash, txIdPos) = _ser.Write(writer, txId,
schema, record, keyOnly);
var preferredNode = await
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+ var tx = await LazyTransaction.EnsureStartedAsync(transaction,
_table.Socket, preferredNode)
+ .ConfigureAwait(false);
+
+ if (tx != null && txId == LazyTransaction.TxIdPlaceholder)
+ {
+ writer.WriteLongBigEndian(tx.Id, txIdPos + 1);
+ }
+
return await DoOutInOpAsync(op, tx, writer,
preferredNode).ConfigureAwait(false);
}
catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch &&
@@ -446,12 +456,22 @@ namespace Apache.Ignite.Internal.Table
try
{
schema = await
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
- var tx = transaction.ToInternal();
+
+ LazyTransaction? lazyTx = LazyTransaction.Get(transaction);
+ var txId = lazyTx?.Id;
using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.WriteTwo(writer, tx, schema, record,
record2, keyOnly);
+ var (colocationHash, txIdPos) = _ser.WriteTwo(writer, txId,
schema, record, record2, keyOnly);
var preferredNode = await
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+ var tx = await LazyTransaction.EnsureStartedAsync(transaction,
_table.Socket, preferredNode)
+ .ConfigureAwait(false);
+
+ if (tx != null && txId == LazyTransaction.TxIdPlaceholder)
+ {
+ writer.WriteLongBigEndian(tx.Id, txIdPos + 1);
+ }
+
return await DoOutInOpAsync(op, tx, writer,
preferredNode).ConfigureAwait(false);
}
catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch &&
@@ -493,12 +513,22 @@ namespace Apache.Ignite.Internal.Table
try
{
schema = await
_table.GetSchemaAsync(schemaVersionOverride).ConfigureAwait(false);
- var tx = transaction.ToInternal();
+
+ LazyTransaction? lazyTx = LazyTransaction.Get(transaction);
+ var txId = lazyTx?.Id;
using var writer = ProtoCommon.GetMessageWriter();
- var colocationHash = _ser.WriteMultiple(writer, tx, schema,
iterator, keyOnly);
+ var (colocationHash, txIdPos) = _ser.WriteMultiple(writer,
txId, schema, iterator, keyOnly);
var preferredNode = await
_table.GetPreferredNode(colocationHash, transaction).ConfigureAwait(false);
+ var tx = await LazyTransaction.EnsureStartedAsync(transaction,
_table.Socket, preferredNode)
+ .ConfigureAwait(false);
+
+ if (tx != null && txId == LazyTransaction.TxIdPlaceholder)
+ {
+ writer.WriteLongBigEndian(tx.Id, txIdPos + 1);
+ }
+
return await DoOutInOpAsync(op, tx, writer,
preferredNode).ConfigureAwait(false);
}
catch (IgniteException e) when (e.Code ==
ErrorGroups.Table.SchemaVersionMismatch &&
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index 46d4df001f..e59491dd6f 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -151,21 +151,21 @@ namespace Apache.Ignite.Internal.Table.Serialization
/// Write record.
/// </summary>
/// <param name="buf">Buffer.</param>
- /// <param name="tx">Transaction.</param>
+ /// <param name="txId">Transaction id.</param>
/// <param name="schema">Schema.</param>
/// <param name="rec">Record.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>Colocation hash.</returns>
- public int Write(
+ public (int ColocationHash, int TxIdPos) Write(
PooledArrayBuffer buf,
- Transactions.Transaction? tx,
+ long? txId,
Schema schema,
T rec,
bool keyOnly = false)
{
var w = buf.MessageWriter;
- var colocationHash = WriteWithHeader(ref w, tx, schema, rec,
keyOnly);
+ var colocationHash = WriteWithHeader(ref w, txId, schema, rec,
keyOnly);
return colocationHash;
}
@@ -174,15 +174,15 @@ namespace Apache.Ignite.Internal.Table.Serialization
/// Write two records.
/// </summary>
/// <param name="buf">Buffer.</param>
- /// <param name="tx">Transaction.</param>
+ /// <param name="txId">Transaction id.</param>
/// <param name="schema">Schema.</param>
/// <param name="t">Record 1.</param>
/// <param name="t2">Record 2.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>First record hash.</returns>
- public int WriteTwo(
+ public (int ColocationHash, int TxIdPos) WriteTwo(
PooledArrayBuffer buf,
- Transactions.Transaction? tx,
+ long? txId,
Schema schema,
T t,
T t2,
@@ -190,7 +190,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
{
var w = buf.MessageWriter;
- var firstHash = WriteWithHeader(ref w, tx, schema, t, keyOnly);
+ var firstHash = WriteWithHeader(ref w, txId, schema, t, keyOnly);
_handler.Write(ref w, schema, t2, keyOnly);
return firstHash;
@@ -200,21 +200,21 @@ namespace Apache.Ignite.Internal.Table.Serialization
/// Write multiple records.
/// </summary>
/// <param name="buf">Buffer.</param>
- /// <param name="tx">Transaction.</param>
+ /// <param name="txId">Transaction.</param>
/// <param name="schema">Schema.</param>
/// <param name="recs">Records.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>First record hash.</returns>
- public int WriteMultiple(
+ public (int ColocationHash, int TxIdPos) WriteMultiple(
PooledArrayBuffer buf,
- Transactions.Transaction? tx,
+ long? txId,
Schema schema,
IEnumerator<T> recs,
bool keyOnly = false)
{
var w = buf.MessageWriter;
- WriteIdAndTx(ref w, tx);
+ var txIdPos = WriteIdAndTx(ref w, txId);
w.Write(schema.Version);
var count = 0;
@@ -245,40 +245,46 @@ namespace Apache.Ignite.Internal.Table.Serialization
countSpan[0] = MsgPackCode.Int32;
BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
- return firstHash;
+ return (firstHash, txIdPos);
}
/// <summary>
/// Write record with header.
/// </summary>
/// <param name="w">Writer.</param>
- /// <param name="tx">Transaction.</param>
+ /// <param name="txId">Transaction id.</param>
/// <param name="schema">Schema.</param>
/// <param name="rec">Record.</param>
/// <param name="keyOnly">Key only columns.</param>
/// <returns>Colocation hash.</returns>
- private int WriteWithHeader(
+ private (int ColocationHash, int TxIdPos) WriteWithHeader(
ref MsgPackWriter w,
- Transactions.Transaction? tx,
+ long? txId,
Schema schema,
T rec,
bool keyOnly = false)
{
- WriteIdAndTx(ref w, tx);
+ var txIdPos = WriteIdAndTx(ref w, txId);
w.Write(schema.Version);
- return _handler.Write(ref w, schema, rec, keyOnly, computeHash:
true);
+ var colocationHash = _handler.Write(ref w, schema, rec, keyOnly,
computeHash: true);
+
+ return (colocationHash, txIdPos);
}
/// <summary>
/// Writes table id and transaction id, if present.
/// </summary>
/// <param name="w">Writer.</param>
- /// <param name="tx">Transaction.</param>
- private void WriteIdAndTx(ref MsgPackWriter w,
Transactions.Transaction? tx)
+ /// <param name="txId">Transaction id.</param>
+ private int WriteIdAndTx(ref MsgPackWriter w, long? txId)
{
w.Write(_table.Id);
- w.WriteTx(tx);
+
+ var txIdPos = w.Position;
+ w.WriteTx(txId);
+
+ return txIdPos;
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
index 27b1f3bd85..196cc852b7 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
@@ -33,6 +33,7 @@ namespace Apache.Ignite.Internal.Table
using Proto.MsgPack;
using Serialization;
using Sql;
+ using Transactions;
/// <summary>
/// Table API.
@@ -189,7 +190,9 @@ namespace Apache.Ignite.Internal.Table
/// <returns>Preferred node.</returns>
internal async ValueTask<PreferredNode> GetPreferredNode(int
colocationHash, ITransaction? transaction)
{
- if (transaction != null)
+ // This check is not accurate when the same lazy tx is used from
multiple threads.
+ // But it is only an optimization to skip the calculation below:
preferredNode is ignored when tx is started anyway.
+ if (LazyTransaction.IsStarted(transaction))
{
return default;
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/LazyTransaction.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/LazyTransaction.cs
new file mode 100644
index 0000000000..c2976cf5fa
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/LazyTransaction.cs
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Transactions;
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Common;
+using Ignite.Transactions;
+using Proto;
+using Tx;
+
+/// <summary>
+/// Lazy Ignite transaction.
+/// </summary>
+internal sealed class LazyTransaction : ITransaction
+{
+ /// <summary>
+ /// Transaction ID placeholder. Uses MaxValue to reserve bytes in varint
format.
+ /// It will also work correctly if the actual tx id matches the
placeholder.
+ /// </summary>
+ public const long TxIdPlaceholder = long.MaxValue;
+
+ private const int StateOpen = 0;
+
+ private const int StateCommitted = 1;
+
+ private const int StateRolledBack = 2;
+
+ private readonly object _syncRoot = new();
+
+ private readonly TransactionOptions _options;
+
+ private int _state = StateOpen;
+
+ private volatile Task<Transaction>? _tx;
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="LazyTransaction"/> class.
+ /// </summary>
+ /// <param name="options">Options.</param>
+ public LazyTransaction(TransactionOptions options) => _options = options;
+
+ /// <inheritdoc/>
+ public bool IsReadOnly => _options.ReadOnly;
+
+ /// <summary>
+ /// Gets the transaction ID.
+ /// </summary>
+ internal long Id =>
+ _tx is { IsCompleted: true }
+ ? _tx.Result.Id
+ : TxIdPlaceholder;
+
+ /// <summary>
+ /// Gets the transaction state.
+ /// </summary>
+ private string State => _state switch
+ {
+ StateOpen => "Open",
+ StateCommitted => "Committed",
+ _ => "RolledBack"
+ };
+
+ /// <inheritdoc/>
+ public async Task CommitAsync()
+ {
+ if (TrySetState(StateCommitted))
+ {
+ await DoOpAsync(_tx, ClientOp.TxCommit).ConfigureAwait(false);
+ }
+ }
+
+ /// <inheritdoc/>
+ public async Task RollbackAsync()
+ {
+ if (TrySetState(StateRolledBack))
+ {
+ await DoOpAsync(_tx, ClientOp.TxRollback).ConfigureAwait(false);
+ }
+ }
+
+ /// <inheritdoc/>
+ public async ValueTask DisposeAsync() => await
RollbackAsync().ConfigureAwait(false);
+
+ /// <inheritdoc/>
+ public override string ToString()
+ {
+ var builder = new IgniteToStringBuilder(typeof(Transaction));
+
+ builder.Append(Id);
+ builder.Append(State);
+ builder.Append(IsReadOnly);
+
+ return builder.Build();
+ }
+
+ /// <summary>
+ /// Ensures that the lazy transaction is actually started on the server.
+ /// </summary>
+ /// <param name="tx">Lazy transaction.</param>
+ /// <param name="socket">Socket.</param>
+ /// <param name="preferredNode">Preferred target node.</param>
+ /// <returns>Task that will be completed when the transaction is
started.</returns>
+ internal static async ValueTask<Transaction?>
EnsureStartedAsync(ITransaction? tx, ClientFailoverSocket socket, PreferredNode
preferredNode) =>
+ Get(tx) is { } lazyTx
+ ? await lazyTx.EnsureStartedAsync(socket,
preferredNode).ConfigureAwait(false)
+ : null;
+
+ /// <summary>
+ /// Gets the underlying lazy transaction or throws if the transaction type
is not supported.
+ /// </summary>
+ /// <param name="tx">Public transaction.</param>
+ /// <returns>Internal lazy transaction.</returns>
+ internal static LazyTransaction? Get(ITransaction? tx) => tx switch
+ {
+ null => null,
+ LazyTransaction t => t,
+ _ => throw new TransactionException(
+ Guid.NewGuid(),
+ ErrorGroups.Common.Internal,
+ "Unsupported transaction implementation: " + tx.GetType())
+ };
+
+ /// <summary>
+ /// Gets a value indicating whether the underlying lazy transaction is
started.
+ /// </summary>
+ /// <param name="tx">Transaction.</param>
+ /// <returns>True when the underlying lazy transaction is started, false
otherwise.</returns>
+ internal static bool IsStarted(ITransaction? tx) => Get(tx)?._tx != null;
+
+ private static async Task DoOpAsync(Task<Transaction>? txTask, ClientOp op)
+ {
+ if (txTask == null)
+ {
+ // No operations were performed, nothing to commit or roll back.
+ return;
+ }
+
+ var tx = await txTask.ConfigureAwait(false);
+
+ using var writer = ProtoCommon.GetMessageWriter();
+ writer.MessageWriter.Write(tx.Id);
+ using var buffer = await tx.Socket.DoOutInOpAsync(op,
writer).ConfigureAwait(false);
+ }
+
+ private Task<Transaction> EnsureStartedAsync(ClientFailoverSocket socket,
PreferredNode preferredNode)
+ {
+ lock (_syncRoot)
+ {
+ var txTask = _tx;
+
+ if (txTask != null)
+ {
+ return txTask;
+ }
+
+ txTask = BeginAsync(socket, preferredNode);
+ _tx = txTask;
+
+ return txTask;
+ }
+ }
+
+ private async Task<Transaction> BeginAsync(ClientFailoverSocket
failoverSocket, PreferredNode preferredNode)
+ {
+ using var writer = ProtoCommon.GetMessageWriter();
+ Write();
+
+ // Transaction and all corresponding operations must be performed
using the same connection.
+ var (resBuf, socket) = await failoverSocket.DoOutInOpAndGetSocketAsync(
+ ClientOp.TxBegin, request: writer, preferredNode:
preferredNode).ConfigureAwait(false);
+
+ using (resBuf)
+ {
+ var txId = resBuf.GetReader().ReadInt64();
+
+ return new Transaction(txId, socket, failoverSocket);
+ }
+
+ void Write()
+ {
+ var w = writer.MessageWriter;
+ w.Write(_options.ReadOnly);
+ w.Write(failoverSocket.ObservableTimestamp);
+ }
+ }
+
+ private bool TrySetState(int state) => Interlocked.CompareExchange(ref
_state, state, StateOpen) == StateOpen;
+}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
index aeb9b36d09..a0b5c41f1e 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
@@ -15,126 +15,12 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Transactions
-{
- using System.Threading;
- using System.Threading.Tasks;
- using Common;
- using Ignite.Transactions;
- using Proto;
- using Proto.MsgPack;
-
- /// <summary>
- /// Ignite transaction.
- /// </summary>
- internal sealed class Transaction : ITransaction
- {
- /** Open state. */
- private const int StateOpen = 0;
-
- /** Committed state. */
- private const int StateCommitted = 1;
-
- /** Rolled back state. */
- private const int StateRolledBack = 2;
-
- /** State. */
- private int _state = StateOpen;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="Transaction"/> class.
- /// </summary>
- /// <param name="id">Transaction id.</param>
- /// <param name="socket">Associated connection.</param>
- /// <param name="failoverSocket">Associated connection
multiplexer.</param>
- /// <param name="isReadOnly">Read-only flag.</param>
- public Transaction(long id, ClientSocket socket, ClientFailoverSocket
failoverSocket, bool isReadOnly)
- {
- Id = id;
- Socket = socket;
- FailoverSocket = failoverSocket;
- IsReadOnly = isReadOnly;
- }
-
- /// <summary>
- /// Gets the owner socket.
- /// </summary>
- public ClientSocket Socket { get; }
-
- /// <summary>
- /// Gets the owner multiplexer socket.
- /// </summary>
- public ClientFailoverSocket FailoverSocket { get; }
-
- /// <summary>
- /// Gets the transaction id.
- /// </summary>
- public long Id { get; }
-
- /// <inheritdoc/>
- public bool IsReadOnly { get; }
-
- /// <inheritdoc/>
- public async Task CommitAsync()
- {
- if (TrySetState(StateCommitted))
- {
- using var writer = ProtoCommon.GetMessageWriter();
- Write(writer.MessageWriter);
-
- using var buffer = await
Socket.DoOutInOpAsync(ClientOp.TxCommit, writer).ConfigureAwait(false);
- }
- }
-
- /// <inheritdoc/>
- public async Task RollbackAsync() => await
RollbackAsyncInternal().ConfigureAwait(false);
-
- /// <inheritdoc/>
- public async ValueTask DisposeAsync() => await
RollbackAsyncInternal().ConfigureAwait(false);
-
- /// <inheritdoc/>
- public override string ToString()
- {
- var state = _state switch
- {
- StateOpen => "Open",
- StateCommitted => "Committed",
- _ => "RolledBack"
- };
-
- var builder = new IgniteToStringBuilder(typeof(Transaction));
- builder.Append(Id);
- builder.Append(state, "State");
- builder.Append(IsReadOnly);
-
- return builder.Build();
- }
-
- /// <summary>
- /// Rolls back the transaction without state check.
- /// </summary>
- private async ValueTask RollbackAsyncInternal()
- {
- if (TrySetState(StateRolledBack))
- {
- using var writer = ProtoCommon.GetMessageWriter();
- Write(writer.MessageWriter);
-
- using var buffer = await
Socket.DoOutInOpAsync(ClientOp.TxRollback, writer).ConfigureAwait(false);
- }
- }
-
- /// <summary>
- /// Attempts to set the specified state.
- /// </summary>
- /// <param name="state">State to set.</param>
- /// <returns>True when specified state was set successfully; false
otherwise.</returns>
- private bool TrySetState(int state) => Interlocked.CompareExchange(ref
_state, state, StateOpen) == StateOpen;
-
- /// <summary>
- /// Writes the transaction.
- /// </summary>
- /// <param name="writer">Writer.</param>
- private void Write(MsgPackWriter writer) => writer.Write(Id);
- }
-}
+namespace Apache.Ignite.Internal.Transactions;
+
+/// <summary>
+/// Ignite internal transaction.
+/// </summary>
+/// <param name="Id">Transaction id.</param>
+/// <param name="Socket">Associated connection.</param>
+/// <param name="FailoverSocket">Associated connection multiplexer.</param>
+internal sealed record Transaction(long Id, ClientSocket Socket,
ClientFailoverSocket FailoverSocket);
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/TransactionExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/TransactionExtensions.cs
deleted file mode 100644
index 8a53880a07..0000000000
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/TransactionExtensions.cs
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Internal.Transactions
-{
- using System.Transactions;
- using Ignite.Transactions;
-
- /// <summary>
- /// Transaction extension methods.
- /// </summary>
- internal static class TransactionExtensions
- {
- /// <summary>
- /// Gets transaction as internal <see cref="Transaction"/> class.
- /// </summary>
- /// <param name="tx">Transaction.</param>
- /// <returns>Internal transaction.</returns>
- /// <exception cref="TransactionException">When provided transaction
is not supported.</exception>
- public static Transaction? ToInternal(this ITransaction? tx)
- {
- if (tx == null)
- {
- return null;
- }
-
- if (tx is Transaction t)
- {
- return t;
- }
-
- throw new TransactionException("Unsupported transaction
implementation: " + tx.GetType());
- }
- }
-}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
index bf4698341c..02fb3f12ab 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
@@ -15,55 +15,23 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Internal.Transactions
-{
- using System.Threading.Tasks;
- using Common;
- using Ignite.Transactions;
- using Proto;
-
- /// <summary>
- /// Transactions API.
- /// </summary>
- internal class Transactions : ITransactions
- {
- /** Underlying connection. */
- private readonly ClientFailoverSocket _socket;
-
- /// <summary>
- /// Initializes a new instance of the <see cref="Transactions"/> class.
- /// </summary>
- /// <param name="socket">Socket.</param>
- public Transactions(ClientFailoverSocket socket)
- {
- _socket = socket;
- }
-
- /// <inheritdoc/>
- public async Task<ITransaction> BeginAsync(TransactionOptions options)
- {
- using var writer = ProtoCommon.GetMessageWriter();
- Write();
+namespace Apache.Ignite.Internal.Transactions;
- // Transaction and all corresponding operations must be performed
using the same connection.
- var (resBuf, socket) = await
_socket.DoOutInOpAndGetSocketAsync(ClientOp.TxBegin, request:
writer).ConfigureAwait(false);
+using System.Diagnostics.CodeAnalysis;
+using System.Threading.Tasks;
+using Common;
+using Ignite.Transactions;
- using (resBuf)
- {
- var txId = resBuf.GetReader().ReadInt64();
-
- return new Transaction(txId, socket, _socket,
options.ReadOnly);
- }
-
- void Write()
- {
- var w = writer.MessageWriter;
- w.Write(options.ReadOnly);
- w.Write(_socket.ObservableTimestamp);
- }
- }
+/// <summary>
+/// Transactions API.
+/// </summary>
+internal class Transactions : ITransactions
+{
+ /// <inheritdoc/>
+ [SuppressMessage("Reliability", "CA2000:Dispose objects before losing
scope", Justification = "Tx is returned.")]
+ public ValueTask<ITransaction> BeginAsync(TransactionOptions options) =>
+ ValueTask.FromResult((ITransaction)new LazyTransaction(options));
- /// <inheritdoc />
- public override string ToString() =>
IgniteToStringBuilder.Build(GetType());
- }
+ /// <inheritdoc />
+ public override string ToString() =>
IgniteToStringBuilder.Build(GetType());
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Transactions/ITransactions.cs
b/modules/platforms/dotnet/Apache.Ignite/Transactions/ITransactions.cs
index 6914d1a7da..427342eca3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Transactions/ITransactions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Transactions/ITransactions.cs
@@ -28,13 +28,13 @@ namespace Apache.Ignite.Transactions
/// Starts a new transaction.
/// </summary>
/// <param name="options">Transaction options.</param>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<ITransaction> BeginAsync(TransactionOptions options);
+ /// <returns>A <see cref="ValueTask"/> representing the asynchronous
operation.</returns>
+ ValueTask<ITransaction> BeginAsync(TransactionOptions options);
/// <summary>
/// Starts a new transaction.
/// </summary>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<ITransaction> BeginAsync() => BeginAsync(default);
+ /// <returns>A <see cref="ValueTask"/> representing the asynchronous
operation.</returns>
+ ValueTask<ITransaction> BeginAsync() => BeginAsync(default);
}
}