This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 23da36041f3 IGNITE-25890 .NET: Add ISql.ExecuteBatchAsync (#6306)
23da36041f3 is described below
commit 23da36041f3069e0eff16fbb26a72553da11c6a4
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jul 24 09:08:30 2025 +0300
IGNITE-25890 .NET: Add ISql.ExecuteBatchAsync (#6306)
---
.../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs | 182 ++++++++++++++++++++-
.../Apache.Ignite/ApiCompatibilitySuppressions.xml | 7 +
.../dotnet/Apache.Ignite/ClientOperationType.cs | 7 +-
.../Internal/Buffers/PooledArrayBuffer.cs | 23 +++
.../Apache.Ignite/Internal/Compute/Compute.cs | 8 +-
.../Apache.Ignite/Internal/Proto/ClientOp.cs | 3 +
.../Internal/Proto/ClientOpExtensions.cs | 1 +
.../Internal/Proto/MsgPack/MsgPackWriter.cs | 28 +++-
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 113 ++++++++++++-
.../Table/Serialization/RecordSerializer.cs | 8 +-
.../dotnet/Apache.Ignite/RetryReadPolicy.cs | 1 +
modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs | 26 +++
12 files changed, 386 insertions(+), 21 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 499403b50c9..c5ff7b5277f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -27,6 +27,7 @@ namespace Apache.Ignite.Tests.Sql
using System.Threading.Tasks;
using Ignite.Sql;
using Ignite.Table;
+ using Ignite.Transactions;
using Microsoft.Extensions.Logging.Abstractions;
using NodaTime;
using NUnit.Framework;
@@ -62,6 +63,12 @@ namespace Apache.Ignite.Tests.Sql
await Client.Sql.ExecuteAsync(null, "DROP TABLE IF EXISTS
TestExecuteScript");
}
+ [SetUp]
+ public async Task ResetData()
+ {
+ await Client.Sql.ExecuteScriptAsync("DELETE FROM TEST WHERE ID >=
10");
+ }
+
[Test]
public async Task TestSimpleQuery()
{
@@ -593,6 +600,168 @@ namespace Apache.Ignite.Tests.Sql
AssertInstantSimilar(expectedTime, resTime, $"Offset: {offset}");
}
+ [Test]
+ public async Task TestExecuteBatch()
+ {
+ long[] res = await Client.Sql.ExecuteBatchAsync(
+ transaction: null,
+ statement: "INSERT INTO TEST VALUES (?, ?)",
+ args: [[100, "x"], [101, "y"], [102, "z"]]);
+
+ CollectionAssert.AreEqual(new[] { 1L, 1L, 1L }, res);
+
+ await using var resultSet = await Client.Sql.ExecuteAsync(
+ null, "SELECT ID, VAL FROM TEST WHERE ID >= 100 AND ID <= 102
ORDER BY ID");
+
+ List<IIgniteTuple> rows = await resultSet.ToListAsync();
+ Assert.AreEqual(3, rows.Count);
+
+ Assert.AreEqual("IgniteTuple { ID = 100, VAL = x }",
rows[0].ToString());
+ Assert.AreEqual("IgniteTuple { ID = 101, VAL = y }",
rows[1].ToString());
+ Assert.AreEqual("IgniteTuple { ID = 102, VAL = z }",
rows[2].ToString());
+ }
+
+ [Test]
+ public async Task TestExecuteBatchInsertUpdateDelete()
+ {
+ long[] insertRes = await Client.Sql.ExecuteBatchAsync(
+ transaction: null,
+ statement: "INSERT INTO TEST VALUES (?, ?)",
+ args: [[100, "x"], [101, "y"], [102, "z"]]);
+
+ CollectionAssert.AreEqual(new[] { 1L, 1L, 1L }, insertRes);
+
+ long[] updateRes = await Client.Sql.ExecuteBatchAsync(
+ transaction: null,
+ statement: "UPDATE TEST SET VAL = ? WHERE ID >= ? AND ID <= ?",
+ args: [["update1", 100, 101], ["update2", 102, 103]]);
+
+ CollectionAssert.AreEqual(new[] { 2L, 1L }, updateRes);
+
+ long[] deleteRes = await Client.Sql.ExecuteBatchAsync(
+ transaction: null,
+ statement: "DELETE FROM TEST WHERE ID >= ? AND ID <= ?",
+ args: [[100, 102]]);
+
+ CollectionAssert.AreEqual(new[] { 3L }, deleteRes);
+ }
+
+ [Test]
+ public async Task TestExecuteBatchArgsCollections()
+ {
+ var statement = "INSERT INTO TEST VALUES (?, ?)";
+
+ // Array.
+ object[][] arr =
+ [
+ [200, "x"],
+ [201, "y"],
+ ];
+
+ await Client.Sql.ExecuteBatchAsync(null, statement, arr);
+
+ // List.
+ List<List<object>> args =
+ [
+ [300, "x"],
+ [301, "y"]
+ ];
+
+ await Client.Sql.ExecuteBatchAsync(null, statement, args);
+
+ // Lazy.
+ IEnumerable<IEnumerable<object>> collection = Yield(
+ Yield<object>(401, "x1"),
+ Yield<object>(402, "x2"));
+
+ await Client.Sql.ExecuteBatchAsync(null, statement, collection);
+
+ static IEnumerable<T> Yield<T>(params T[] args)
+ {
+ foreach (var arg in args)
+ {
+ yield return arg;
+ }
+ }
+ }
+
+ [Test]
+ public async Task TestExecuteBatchWithTx()
+ {
+ await using var tx = await Client.Transactions.BeginAsync();
+
+ Assert.AreEqual(0, await GetCount(tx));
+
+ await Client.Sql.ExecuteBatchAsync(tx, "INSERT INTO TEST VALUES
(?, ?)", [[110, "x"], [111, "y"]]);
+
+ Assert.AreEqual(1, await GetCount(tx));
+
+ Assert.AreEqual(0, await GetCount(null));
+
+ async Task<int> GetCount(ITransaction? txn)
+ {
+ await using var resultSet = await Client.Sql.ExecuteAsync(txn,
"SELECT ID, VAL FROM TEST WHERE ID = 110");
+ var rows = await resultSet.ToListAsync();
+ return rows.Count;
+ }
+ }
+
+ [Test]
+ public async Task TestExecuteBatchNullArg()
+ {
+ var res = await Client.Sql.ExecuteBatchAsync(null, "DELETE FROM
TEST WHERE VAL IS NOT DISTINCT FROM ?", [[null]]);
+ Assert.AreEqual(new[] { 0L }, res);
+ }
+
+ [Test]
+ public void TestExecuteBatchMissingArgs()
+ {
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await Client.Sql.ExecuteBatchAsync(null, "select
1", []));
+
+ StringAssert.Contains("Batch arguments must not be empty.",
ex.Message);
+
+ var ex2 = Assert.ThrowsAsync<ArgumentException>(
+ async () => await Client.Sql.ExecuteBatchAsync(null, "select
1", [[]]));
+
+ StringAssert.Contains("Batch arguments must not contain empty
rows.", ex2.Message);
+ }
+
+ [Test]
+ public void TestExecuteBatchMismatchingArgs()
+ {
+ var ex = Assert.ThrowsAsync<ArgumentException>(
+ async () => await Client.Sql.ExecuteBatchAsync(null, "select
1", [[1], [2, 3]]));
+
+ Assert.AreEqual("Inconsistent batch argument size: Expected 1
objects, but got more.", ex.Message);
+
+ var ex2 = Assert.ThrowsAsync<ArgumentException>(
+ async () => await Client.Sql.ExecuteBatchAsync(null, "select
1", [[1, 2], [3]]));
+
+ Assert.AreEqual("Inconsistent batch argument size: Expected 2
objects, but got 1.", ex2.Message);
+
+ var ex3 = Assert.ThrowsAsync<ArgumentException>(
+ async () => await Client.Sql.ExecuteBatchAsync(null, "select
1", [[1], []]));
+
+ Assert.AreEqual("Inconsistent batch argument size: Expected 1
objects, but got 0.", ex3.Message);
+ }
+
+ [Test]
+ public void TestExecuteBatchNullArgRow()
+ {
+ Assert.ThrowsAsync<ArgumentNullException>(
+ async () => await Client.Sql.ExecuteBatchAsync(null, "select
1", [[1], null!, [2]]));
+ }
+
+ [Test]
+ public void TestExecuteBatchInvalidStatement()
+ {
+ var ex = Assert.ThrowsAsync<SqlBatchException>(
+ async () => await Client.Sql.ExecuteBatchAsync(null, "select
1", [[1]]));
+
+ Assert.AreEqual("Invalid SQL statement type. Expected [DML] but
got QUERY.", ex.Message);
+ }
+
[Test]
public async Task TestCancelQueryCursor([Values(true, false)] bool
beforeIter)
{
@@ -622,7 +791,7 @@ namespace Apache.Ignite.Tests.Sql
}
[Test]
- public async Task TestCancelQueryExecute([Values("sql", "sql-mapped",
"script", "reader")] string mode)
+ public async Task TestCancelQueryExecute([Values("sql", "sql-mapped",
"script", "reader", "batch")] string mode)
{
// Cross join will produce 10^N rows, which takes a while to
execute.
var manyRowsQuery = $"select count (*) from
({GenerateCrossJoin(8)})";
@@ -635,6 +804,7 @@ namespace Apache.Ignite.Tests.Sql
"sql-mapped" => Client.Sql.ExecuteAsync<int>(transaction:
null, manyRowsQuery, cts.Token),
"script" => Client.Sql.ExecuteScriptAsync($"DELETE FROM
{TableName} WHERE KEY = ({manyRowsQuery})", cts.Token),
"reader" => Client.Sql.ExecuteReaderAsync(transaction: null,
manyRowsQuery, cts.Token),
+ "batch" => Client.Sql.ExecuteBatchAsync(null, $"DELETE FROM
{TableName} WHERE KEY = ({manyRowsQuery}) + ?", [[1]], cts.Token),
_ => throw new ArgumentException("Invalid mode: " + mode)
};
@@ -644,7 +814,15 @@ namespace Apache.Ignite.Tests.Sql
var ex = Assert.ThrowsAsync<OperationCanceledException>(async ()
=> await task);
Assert.AreEqual("The query was cancelled while executing.",
ex!.Message);
- Assert.IsInstanceOf<SqlException>(ex.InnerException);
+
+ if (mode == "batch")
+ {
+ Assert.IsInstanceOf<SqlBatchException>(ex.InnerException);
+ }
+ else
+ {
+ Assert.IsInstanceOf<SqlException>(ex.InnerException);
+ }
Assert.IsFalse(TestUtils.HasCallbacks(cts));
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml
b/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml
index bd63ae238cb..145505434af 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml
+++ b/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml
@@ -112,4 +112,11 @@
<Right>lib/net8.0/Apache.Ignite.dll</Right>
<IsBaselineSuppression>true</IsBaselineSuppression>
</Suppression>
+ <Suppression>
+ <DiagnosticId>CP0006</DiagnosticId>
+
<Target>M:Apache.Ignite.Sql.ISql.ExecuteBatchAsync(Apache.Ignite.Transactions.ITransaction,Apache.Ignite.Sql.SqlStatement,System.Collections.Generic.IEnumerable{System.Collections.Generic.IEnumerable{System.Object}},System.Threading.CancellationToken)</Target>
+ <Left>lib/net8.0/Apache.Ignite.dll</Left>
+ <Right>lib/net8.0/Apache.Ignite.dll</Right>
+ <IsBaselineSuppression>true</IsBaselineSuppression>
+ </Suppression>
</Suppressions>
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 05b08fd1c21..dd9b5317eb5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -164,6 +164,11 @@ namespace Apache.Ignite
/// <summary>
/// Send data streamer batch with receiver (<see
cref="IDataStreamerTarget{T}"/>).
/// </summary>
- StreamerWithReceiverBatchSend
+ StreamerWithReceiverBatchSend,
+
+ /// <summary>
+ /// SQL batch (<see cref="ISql.ExecuteBatchAsync"/>).
+ /// </summary>
+ SqlExecuteBatch
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 300d56ee17e..539c780a019 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -283,6 +283,29 @@ namespace Apache.Ignite.Internal.Buffers
/// <returns>Result.</returns>
public int ReadInt(int pos) =>
BinaryPrimitives.ReadInt32LittleEndian(_buffer.AsSpan(pos + _prefixSize));
+ /// <summary>
+ /// Reserves space for a fixed-size MsgPack integer (5 bytes).
+ /// </summary>
+ /// <returns>Position in the buffer where the integer can be
written.</returns>
+ public int ReserveMsgPackInt32()
+ {
+ var pos = Position;
+ Advance(5);
+ return pos;
+ }
+
+ /// <summary>
+ /// Writes a MsgPack integer (5 bytes) at the specified position
reserved by <see cref="ReserveMsgPackInt32"/>.
+ /// </summary>
+ /// <param name="value">Value.</param>
+ /// <param name="pos">Position.</param>
+ public void WriteMsgPackInt32(int value, int pos)
+ {
+ var span = GetSpanAt(pos, 5);
+ span[0] = MsgPackCode.Int32;
+ BinaryPrimitives.WriteInt32BigEndian(span[1..], value);
+ }
+
/// <summary>
/// Checks underlying buffer and resizes if necessary.
/// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 9f50431627c..9b9460b68b4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -18,7 +18,6 @@
namespace Apache.Ignite.Internal.Compute
{
using System;
- using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
@@ -242,8 +241,7 @@ namespace Apache.Ignite.Internal.Compute
private static void WriteEnumerable<T>(IEnumerable<T> items,
PooledArrayBuffer buf, Action<T, PooledArrayBuffer> writerFunc)
{
var count = 0;
- var countPos = buf.Position;
- buf.Advance(5);
+ var countPos = buf.ReserveMsgPackInt32();
foreach (var item in items)
{
@@ -251,9 +249,7 @@ namespace Apache.Ignite.Internal.Compute
writerFunc(item, buf);
}
- var countSpan = buf.GetSpanAt(countPos, 5);
- countSpan[0] = MsgPackCode.Int32;
- BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
+ buf.WriteMsgPackInt32(count, countPos);
}
private static void WriteNodeNames(PooledArrayBuffer buf,
IEnumerable<IClusterNode> nodes) =>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 150c412ed23..0743ba8eb5c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -133,6 +133,9 @@ namespace Apache.Ignite.Internal.Proto
/** Send streamer batch. */
StreamerBatchSend = 62,
+ /** Execute SQL batch. */
+ SqlExecBatch = 63,
+
/** Execute MapReduce task. */
ComputeExecuteMapReduce = 64,
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 651ed57bca2..568f05a2c15 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -60,6 +60,7 @@ namespace Apache.Ignite.Internal.Proto
ClientOp.ComputeChangePriority =>
ClientOperationType.ComputeChangePriority,
ClientOp.SqlExec => ClientOperationType.SqlExecute,
ClientOp.SqlExecScript => ClientOperationType.SqlExecuteScript,
+ ClientOp.SqlExecBatch => ClientOperationType.SqlExecuteBatch,
ClientOp.SqlCursorNextPage => null,
ClientOp.SqlCursorClose => null,
ClientOp.TxBegin => null,
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index 635d083aab8..8f045d6cf8e 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -399,7 +399,7 @@ internal readonly ref struct MsgPackWriter
/// Writes an array of objects with type codes.
/// </summary>
/// <param name="col">Array.</param>
- public void WriteObjectCollectionAsBinaryTuple(ICollection<object?>? col)
+ public void
WriteObjectCollectionWithCountAsBinaryTuple(ICollection<object?>? col)
{
if (col == null)
{
@@ -415,13 +415,37 @@ internal readonly ref struct MsgPackWriter
return;
}
- using var builder = new BinaryTupleBuilder(col.Count * 3);
+ WriteObjectEnumerableAsBinaryTuple(col, col.Count);
+ }
+
+ /// <summary>
+ /// Writes a collection of objects with type codes.
+ /// </summary>
+ /// <param name="col">Objects.</param>
+ /// <param name="expectedCount">Count.</param>
+ /// <param name="errorPrefix">Error prefix.</param>
+ public void WriteObjectEnumerableAsBinaryTuple(IEnumerable<object?> col,
int expectedCount, string? errorPrefix = null)
+ {
+ using var builder = new BinaryTupleBuilder(expectedCount * 3);
+ int actualCount = 0;
foreach (var obj in col)
{
+ actualCount++;
+
+ if (actualCount > expectedCount)
+ {
+ throw new ArgumentException($"{errorPrefix}Expected
{expectedCount} objects, but got more.");
+ }
+
builder.AppendObjectWithType(obj);
}
+ if (actualCount != expectedCount)
+ {
+ throw new ArgumentException($"{errorPrefix}Expected
{expectedCount} objects, but got {actualCount}.");
+ }
+
Write(builder.Build().Span);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index 85786ff4e50..696a965d610 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Internal.Sql
{
using System;
using System.Collections.Generic;
+ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Buffers;
@@ -111,6 +112,59 @@ namespace Apache.Ignite.Internal.Sql
}
}
+ /// <inheritdoc/>
+ public async Task<long[]> ExecuteBatchAsync(
+ ITransaction? transaction,
+ SqlStatement statement,
+ IEnumerable<IEnumerable<object?>> args,
+ CancellationToken cancellationToken = default)
+ {
+ IgniteArgumentCheck.NotNull(statement);
+ IgniteArgumentCheck.NotNull(args);
+
+ cancellationToken.ThrowIfCancellationRequested();
+ Transaction? tx = await
LazyTransaction.EnsureStartedAsync(transaction, _socket,
default).ConfigureAwait(false);
+
+ using var bufferWriter = ProtoCommon.GetMessageWriter();
+
+ WriteStatement(bufferWriter, statement, tx, writeTx: true);
+ WriteBatchArgs(bufferWriter, args);
+ bufferWriter.MessageWriter.Write(_socket.ObservableTimestamp);
+
+ try
+ {
+ var (buf, _) = await _socket.DoOutInOpAndGetSocketAsync(
+ ClientOp.SqlExecBatch, tx, bufferWriter,
cancellationToken: cancellationToken).ConfigureAwait(false);
+
+ using (buf)
+ {
+ return Read(buf);
+ }
+ }
+ catch (SqlBatchException e)
+ {
+ ConvertExceptionAndThrow(e, statement, cancellationToken);
+
+ throw;
+ }
+
+ static long[] Read(PooledBuffer resBuf)
+ {
+ var r = resBuf.GetReader();
+ r.Skip(4); // Unused values: resourceId, rowSet, morePages,
wasApplied
+
+ int count = r.ReadInt32();
+ var affectedRows = new long[count];
+
+ for (var i = 0; i < count; i++)
+ {
+ affectedRows[i] = r.ReadInt64();
+ }
+
+ return affectedRows;
+ }
+ }
+
/// <inheritdoc/>
public override string ToString() =>
IgniteToStringBuilder.Build(GetType());
@@ -203,7 +257,7 @@ namespace Apache.Ignite.Internal.Sql
}
}
- private static void ConvertExceptionAndThrow(SqlException e,
SqlStatement statement, CancellationToken token)
+ private static void ConvertExceptionAndThrow(IgniteException e,
SqlStatement statement, CancellationToken token)
{
switch (e.Code)
{
@@ -252,10 +306,48 @@ namespace Apache.Ignite.Internal.Sql
private static RowReader<T>
GetReaderFactory<T>(IReadOnlyList<IColumnMetadata> cols) =>
ResultSelector.Get<T>(cols, selectorExpression: null,
ResultSelectorOptions.None);
- private void WriteStatement(
+ private static void WriteBatchArgs(PooledArrayBuffer writer,
IEnumerable<IEnumerable<object?>> args)
+ {
+ int rowSize = -1;
+ int rowCountPos = -1;
+ int rowCount = 0;
+
+ var w = writer.MessageWriter;
+
+ foreach (var arg in args)
+ {
+ IgniteArgumentCheck.NotNull(arg);
+ IEnumerable<object?> row = arg;
+ rowCount++;
+
+ if (rowSize < 0)
+ {
+ // First row, write header.
+ if (!row.TryGetNonEnumeratedCount(out rowSize))
+ {
+ var list = row.ToList();
+ rowSize = list.Count;
+ row = list;
+ }
+
+ IgniteArgumentCheck.Ensure(rowSize > 0, nameof(args),
"Batch arguments must not contain empty rows.");
+
+ w.Write(rowSize);
+ rowCountPos = writer.ReserveMsgPackInt32();
+ w.Write(false); // Paged args.
+ }
+
+ w.WriteObjectEnumerableAsBinaryTuple(row, expectedCount:
rowSize, errorPrefix: "Inconsistent batch argument size: ");
+ }
+
+ IgniteArgumentCheck.Ensure(rowCount > 0, nameof(args), "Batch
arguments must not be empty.");
+
+ writer.WriteMsgPackInt32(rowCount, rowCountPos);
+ }
+
+ private static void WriteStatement(
PooledArrayBuffer writer,
SqlStatement statement,
- ICollection<object?>? args,
Transaction? tx = null,
bool writeTx = false)
{
@@ -274,7 +366,20 @@ namespace Apache.Ignite.Internal.Sql
WriteProperties(statement, ref w);
w.Write(statement.Query);
- w.WriteObjectCollectionAsBinaryTuple(args);
+ }
+
+ private void WriteStatement(
+ PooledArrayBuffer writer,
+ SqlStatement statement,
+ ICollection<object?>? args,
+ Transaction? tx = null,
+ bool writeTx = false)
+ {
+ var w = writer.MessageWriter;
+
+ WriteStatement(writer, statement, tx, writeTx);
+
+ w.WriteObjectCollectionWithCountAsBinaryTuple(args);
w.Write(_socket.ObservableTimestamp);
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index 96065335339..1a1dde620e7 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -18,7 +18,6 @@
namespace Apache.Ignite.Internal.Table.Serialization
{
using System;
- using System.Buffers.Binary;
using System.Collections.Generic;
using Buffers;
using Proto.MsgPack;
@@ -220,8 +219,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
var count = 0;
var firstHash = 0;
- var countPos = buf.Position;
- buf.Advance(5);
+ var countPos = buf.ReserveMsgPackInt32();
do
{
@@ -243,9 +241,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
}
while (recs.MoveNext()); // First MoveNext is called outside to
check for empty IEnumerable.
- var countSpan = buf.GetSpanAt(countPos, 5);
- countSpan[0] = MsgPackCode.Int32;
- BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
+ buf.WriteMsgPackInt32(count, countPos);
return (firstHash, txIdPos);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index fa51aca74e2..bedda74d7cd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -59,6 +59,7 @@ namespace Apache.Ignite
ClientOperationType.ComputeExecuteMapReduce => false,
ClientOperationType.SqlExecute => false,
ClientOperationType.SqlExecuteScript => false,
+ ClientOperationType.SqlExecuteBatch => false,
ClientOperationType.ComputeCancel => false,
ClientOperationType.ComputeChangePriority => false,
ClientOperationType.ComputeGetStatus => true,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs
b/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs
index 60806525709..3f9e59d3742 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Sql
{
+ using System.Collections.Generic;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
@@ -111,5 +112,30 @@ namespace Apache.Ignite.Sql
/// <param name="args">Arguments.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
Task ExecuteScriptAsync(SqlStatement script, CancellationToken
cancellationToken, params object?[]? args);
+
+ /// <summary>
+ /// Executes an SQL statement for every set of arguments and returns
the number of affected rows for each statement execution.
+ /// <para />
+ /// Only DML statements (INSERT, UPDATE, DELETE) are supported.
+ /// <para />
+ /// <example>
+ /// <code>
+ /// long[] res = await sql.ExecuteBatchAsync(
+ /// transaction: null,
+ /// "INSERT INTO Person (Id, Name) VALUES (?, ?)",
+ /// [[1, "Alice"], [2, "Bob"], [3, "Charlie"]]);
+ /// </code>
+ /// </example>
+ /// </summary>
+ /// <param name="transaction">Transaction.</param>
+ /// <param name="statement">Statement to execute once for every entry
in <paramref name="args"/>.</param>
+ /// <param name="args">Batched arguments. The specified statement will
be executed once for each entry in this collection. Cannot be empty or contain
empty rows.</param>
+ /// <param name="cancellationToken">Cancellation token.</param>
+ /// <returns>The number of affected rows for each set of arguments.
The size of the returned array will match the size of <paramref
name="args"/>.</returns>
+ Task<long[]> ExecuteBatchAsync(
+ ITransaction? transaction,
+ SqlStatement statement,
+ IEnumerable<IEnumerable<object?>> args,
+ CancellationToken cancellationToken = default);
}
}