This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 23da36041f3 IGNITE-25890 .NET: Add ISql.ExecuteBatchAsync (#6306)
23da36041f3 is described below

commit 23da36041f3069e0eff16fbb26a72553da11c6a4
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jul 24 09:08:30 2025 +0300

    IGNITE-25890 .NET: Add ISql.ExecuteBatchAsync (#6306)
---
 .../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs     | 182 ++++++++++++++++++++-
 .../Apache.Ignite/ApiCompatibilitySuppressions.xml |   7 +
 .../dotnet/Apache.Ignite/ClientOperationType.cs    |   7 +-
 .../Internal/Buffers/PooledArrayBuffer.cs          |  23 +++
 .../Apache.Ignite/Internal/Compute/Compute.cs      |   8 +-
 .../Apache.Ignite/Internal/Proto/ClientOp.cs       |   3 +
 .../Internal/Proto/ClientOpExtensions.cs           |   1 +
 .../Internal/Proto/MsgPack/MsgPackWriter.cs        |  28 +++-
 .../dotnet/Apache.Ignite/Internal/Sql/Sql.cs       | 113 ++++++++++++-
 .../Table/Serialization/RecordSerializer.cs        |   8 +-
 .../dotnet/Apache.Ignite/RetryReadPolicy.cs        |   1 +
 modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs |  26 +++
 12 files changed, 386 insertions(+), 21 deletions(-)

diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 499403b50c9..c5ff7b5277f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -27,6 +27,7 @@ namespace Apache.Ignite.Tests.Sql
     using System.Threading.Tasks;
     using Ignite.Sql;
     using Ignite.Table;
+    using Ignite.Transactions;
     using Microsoft.Extensions.Logging.Abstractions;
     using NodaTime;
     using NUnit.Framework;
@@ -62,6 +63,12 @@ namespace Apache.Ignite.Tests.Sql
             await Client.Sql.ExecuteAsync(null, "DROP TABLE IF EXISTS 
TestExecuteScript");
         }
 
+        [SetUp]
+        public async Task ResetData()
+        {
+            await Client.Sql.ExecuteScriptAsync("DELETE FROM TEST WHERE ID >= 
10");
+        }
+
         [Test]
         public async Task TestSimpleQuery()
         {
@@ -593,6 +600,168 @@ namespace Apache.Ignite.Tests.Sql
             AssertInstantSimilar(expectedTime, resTime, $"Offset: {offset}");
         }
 
+        [Test]
+        public async Task TestExecuteBatch()
+        {
+            long[] res = await Client.Sql.ExecuteBatchAsync(
+                transaction: null,
+                statement: "INSERT INTO TEST VALUES (?, ?)",
+                args: [[100, "x"], [101, "y"], [102, "z"]]);
+
+            CollectionAssert.AreEqual(new[] { 1L, 1L, 1L }, res);
+
+            await using var resultSet = await Client.Sql.ExecuteAsync(
+                null, "SELECT ID, VAL FROM TEST WHERE ID >= 100 AND ID <= 102 
ORDER BY ID");
+
+            List<IIgniteTuple> rows = await resultSet.ToListAsync();
+            Assert.AreEqual(3, rows.Count);
+
+            Assert.AreEqual("IgniteTuple { ID = 100, VAL = x }", 
rows[0].ToString());
+            Assert.AreEqual("IgniteTuple { ID = 101, VAL = y }", 
rows[1].ToString());
+            Assert.AreEqual("IgniteTuple { ID = 102, VAL = z }", 
rows[2].ToString());
+        }
+
+        [Test]
+        public async Task TestExecuteBatchInsertUpdateDelete()
+        {
+            long[] insertRes = await Client.Sql.ExecuteBatchAsync(
+                transaction: null,
+                statement: "INSERT INTO TEST VALUES (?, ?)",
+                args: [[100, "x"], [101, "y"], [102, "z"]]);
+
+            CollectionAssert.AreEqual(new[] { 1L, 1L, 1L }, insertRes);
+
+            long[] updateRes = await Client.Sql.ExecuteBatchAsync(
+                transaction: null,
+                statement: "UPDATE TEST SET VAL = ? WHERE ID >= ? AND ID <= ?",
+                args: [["update1", 100, 101], ["update2", 102, 103]]);
+
+            CollectionAssert.AreEqual(new[] { 2L, 1L }, updateRes);
+
+            long[] deleteRes = await Client.Sql.ExecuteBatchAsync(
+                transaction: null,
+                statement: "DELETE FROM TEST WHERE ID >= ? AND ID <= ?",
+                args: [[100, 102]]);
+
+            CollectionAssert.AreEqual(new[] { 3L }, deleteRes);
+        }
+
+        [Test]
+        public async Task TestExecuteBatchArgsCollections()
+        {
+            var statement = "INSERT INTO TEST VALUES (?, ?)";
+
+            // Array.
+            object[][] arr =
+            [
+                [200, "x"],
+                [201, "y"],
+            ];
+
+            await Client.Sql.ExecuteBatchAsync(null, statement, arr);
+
+            // List.
+            List<List<object>> args =
+            [
+                [300, "x"],
+                [301, "y"]
+            ];
+
+            await Client.Sql.ExecuteBatchAsync(null, statement, args);
+
+            // Lazy.
+            IEnumerable<IEnumerable<object>> collection = Yield(
+                Yield<object>(401, "x1"),
+                Yield<object>(402, "x2"));
+
+            await Client.Sql.ExecuteBatchAsync(null, statement, collection);
+
+            static IEnumerable<T> Yield<T>(params T[] args)
+            {
+                foreach (var arg in args)
+                {
+                    yield return arg;
+                }
+            }
+        }
+
+        [Test]
+        public async Task TestExecuteBatchWithTx()
+        {
+            await using var tx = await Client.Transactions.BeginAsync();
+
+            Assert.AreEqual(0, await GetCount(tx));
+
+            await Client.Sql.ExecuteBatchAsync(tx, "INSERT INTO TEST VALUES 
(?, ?)", [[110, "x"], [111, "y"]]);
+
+            Assert.AreEqual(1, await GetCount(tx));
+
+            Assert.AreEqual(0, await GetCount(null));
+
+            async Task<int> GetCount(ITransaction? txn)
+            {
+                await using var resultSet = await Client.Sql.ExecuteAsync(txn, 
"SELECT ID, VAL FROM TEST WHERE ID = 110");
+                var rows = await resultSet.ToListAsync();
+                return rows.Count;
+            }
+        }
+
+        [Test]
+        public async Task TestExecuteBatchNullArg()
+        {
+            var res = await Client.Sql.ExecuteBatchAsync(null, "DELETE FROM 
TEST WHERE VAL IS NOT DISTINCT FROM ?", [[null]]);
+            Assert.AreEqual(new[] { 0L }, res);
+        }
+
+        [Test]
+        public void TestExecuteBatchMissingArgs()
+        {
+            var ex = Assert.ThrowsAsync<ArgumentException>(
+                async () => await Client.Sql.ExecuteBatchAsync(null, "select 
1", []));
+
+            StringAssert.Contains("Batch arguments must not be empty.", 
ex.Message);
+
+            var ex2 = Assert.ThrowsAsync<ArgumentException>(
+                async () => await Client.Sql.ExecuteBatchAsync(null, "select 
1", [[]]));
+
+            StringAssert.Contains("Batch arguments must not contain empty 
rows.", ex2.Message);
+        }
+
+        [Test]
+        public void TestExecuteBatchMismatchingArgs()
+        {
+            var ex = Assert.ThrowsAsync<ArgumentException>(
+                async () => await Client.Sql.ExecuteBatchAsync(null, "select 
1", [[1], [2, 3]]));
+
+            Assert.AreEqual("Inconsistent batch argument size: Expected 1 
objects, but got more.", ex.Message);
+
+            var ex2 = Assert.ThrowsAsync<ArgumentException>(
+                async () => await Client.Sql.ExecuteBatchAsync(null, "select 
1", [[1, 2], [3]]));
+
+            Assert.AreEqual("Inconsistent batch argument size: Expected 2 
objects, but got 1.", ex2.Message);
+
+            var ex3 = Assert.ThrowsAsync<ArgumentException>(
+                async () => await Client.Sql.ExecuteBatchAsync(null, "select 
1", [[1], []]));
+
+            Assert.AreEqual("Inconsistent batch argument size: Expected 1 
objects, but got 0.", ex3.Message);
+        }
+
+        [Test]
+        public void TestExecuteBatchNullArgRow()
+        {
+            Assert.ThrowsAsync<ArgumentNullException>(
+                async () => await Client.Sql.ExecuteBatchAsync(null, "select 
1", [[1], null!, [2]]));
+        }
+
+        [Test]
+        public void TestExecuteBatchInvalidStatement()
+        {
+            var ex = Assert.ThrowsAsync<SqlBatchException>(
+                async () => await Client.Sql.ExecuteBatchAsync(null, "select 
1", [[1]]));
+
+            Assert.AreEqual("Invalid SQL statement type. Expected [DML] but 
got QUERY.", ex.Message);
+        }
+
         [Test]
         public async Task TestCancelQueryCursor([Values(true, false)] bool 
beforeIter)
         {
@@ -622,7 +791,7 @@ namespace Apache.Ignite.Tests.Sql
         }
 
         [Test]
-        public async Task TestCancelQueryExecute([Values("sql", "sql-mapped", 
"script", "reader")] string mode)
+        public async Task TestCancelQueryExecute([Values("sql", "sql-mapped", 
"script", "reader", "batch")] string mode)
         {
             // Cross join will produce 10^N rows, which takes a while to 
execute.
             var manyRowsQuery = $"select count (*) from 
({GenerateCrossJoin(8)})";
@@ -635,6 +804,7 @@ namespace Apache.Ignite.Tests.Sql
                 "sql-mapped" => Client.Sql.ExecuteAsync<int>(transaction: 
null, manyRowsQuery, cts.Token),
                 "script" => Client.Sql.ExecuteScriptAsync($"DELETE FROM 
{TableName} WHERE KEY = ({manyRowsQuery})", cts.Token),
                 "reader" => Client.Sql.ExecuteReaderAsync(transaction: null, 
manyRowsQuery, cts.Token),
+                "batch" => Client.Sql.ExecuteBatchAsync(null, $"DELETE FROM 
{TableName} WHERE KEY = ({manyRowsQuery}) + ?", [[1]], cts.Token),
                 _ => throw new ArgumentException("Invalid mode: " + mode)
             };
 
@@ -644,7 +814,15 @@ namespace Apache.Ignite.Tests.Sql
 
             var ex = Assert.ThrowsAsync<OperationCanceledException>(async () 
=> await task);
             Assert.AreEqual("The query was cancelled while executing.", 
ex!.Message);
-            Assert.IsInstanceOf<SqlException>(ex.InnerException);
+
+            if (mode == "batch")
+            {
+                Assert.IsInstanceOf<SqlBatchException>(ex.InnerException);
+            }
+            else
+            {
+                Assert.IsInstanceOf<SqlException>(ex.InnerException);
+            }
 
             Assert.IsFalse(TestUtils.HasCallbacks(cts));
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml 
b/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml
index bd63ae238cb..145505434af 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml
+++ b/modules/platforms/dotnet/Apache.Ignite/ApiCompatibilitySuppressions.xml
@@ -112,4 +112,11 @@
     <Right>lib/net8.0/Apache.Ignite.dll</Right>
     <IsBaselineSuppression>true</IsBaselineSuppression>
   </Suppression>
+  <Suppression>
+    <DiagnosticId>CP0006</DiagnosticId>
+    
<Target>M:Apache.Ignite.Sql.ISql.ExecuteBatchAsync(Apache.Ignite.Transactions.ITransaction,Apache.Ignite.Sql.SqlStatement,System.Collections.Generic.IEnumerable{System.Collections.Generic.IEnumerable{System.Object}},System.Threading.CancellationToken)</Target>
+    <Left>lib/net8.0/Apache.Ignite.dll</Left>
+    <Right>lib/net8.0/Apache.Ignite.dll</Right>
+    <IsBaselineSuppression>true</IsBaselineSuppression>
+  </Suppression>
 </Suppressions>
\ No newline at end of file
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs 
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 05b08fd1c21..dd9b5317eb5 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -164,6 +164,11 @@ namespace Apache.Ignite
         /// <summary>
         /// Send data streamer batch with receiver (<see 
cref="IDataStreamerTarget{T}"/>).
         /// </summary>
-        StreamerWithReceiverBatchSend
+        StreamerWithReceiverBatchSend,
+
+        /// <summary>
+        /// SQL batch (<see cref="ISql.ExecuteBatchAsync"/>).
+        /// </summary>
+        SqlExecuteBatch
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
index 300d56ee17e..539c780a019 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledArrayBuffer.cs
@@ -283,6 +283,29 @@ namespace Apache.Ignite.Internal.Buffers
         /// <returns>Result.</returns>
         public int ReadInt(int pos) => 
BinaryPrimitives.ReadInt32LittleEndian(_buffer.AsSpan(pos + _prefixSize));
 
+        /// <summary>
+        /// Reserves space for a fixed-size MsgPack integer (5 bytes).
+        /// </summary>
+        /// <returns>Position in the buffer where the integer can be 
written.</returns>
+        public int ReserveMsgPackInt32()
+        {
+            var pos = Position;
+            Advance(5);
+            return pos;
+        }
+
+        /// <summary>
+        /// Writes a MsgPack integer (5 bytes) at the specified position 
reserved by <see cref="ReserveMsgPackInt32"/>.
+        /// </summary>
+        /// <param name="value">Value.</param>
+        /// <param name="pos">Position.</param>
+        public void WriteMsgPackInt32(int value, int pos)
+        {
+            var span = GetSpanAt(pos, 5);
+            span[0] = MsgPackCode.Int32;
+            BinaryPrimitives.WriteInt32BigEndian(span[1..], value);
+        }
+
         /// <summary>
         /// Checks underlying buffer and resizes if necessary.
         /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 9f50431627c..9b9460b68b4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -18,7 +18,6 @@
 namespace Apache.Ignite.Internal.Compute
 {
     using System;
-    using System.Buffers.Binary;
     using System.Collections.Concurrent;
     using System.Collections.Generic;
     using System.Diagnostics.CodeAnalysis;
@@ -242,8 +241,7 @@ namespace Apache.Ignite.Internal.Compute
         private static void WriteEnumerable<T>(IEnumerable<T> items, 
PooledArrayBuffer buf, Action<T, PooledArrayBuffer> writerFunc)
         {
             var count = 0;
-            var countPos = buf.Position;
-            buf.Advance(5);
+            var countPos = buf.ReserveMsgPackInt32();
 
             foreach (var item in items)
             {
@@ -251,9 +249,7 @@ namespace Apache.Ignite.Internal.Compute
                 writerFunc(item, buf);
             }
 
-            var countSpan = buf.GetSpanAt(countPos, 5);
-            countSpan[0] = MsgPackCode.Int32;
-            BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
+            buf.WriteMsgPackInt32(count, countPos);
         }
 
         private static void WriteNodeNames(PooledArrayBuffer buf, 
IEnumerable<IClusterNode> nodes) =>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 150c412ed23..0743ba8eb5c 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -133,6 +133,9 @@ namespace Apache.Ignite.Internal.Proto
         /** Send streamer batch. */
         StreamerBatchSend = 62,
 
+        /** Execute SQL batch. */
+        SqlExecBatch = 63,
+
         /** Execute MapReduce task. */
         ComputeExecuteMapReduce = 64,
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 651ed57bca2..568f05a2c15 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -60,6 +60,7 @@ namespace Apache.Ignite.Internal.Proto
                 ClientOp.ComputeChangePriority => 
ClientOperationType.ComputeChangePriority,
                 ClientOp.SqlExec => ClientOperationType.SqlExecute,
                 ClientOp.SqlExecScript => ClientOperationType.SqlExecuteScript,
+                ClientOp.SqlExecBatch => ClientOperationType.SqlExecuteBatch,
                 ClientOp.SqlCursorNextPage => null,
                 ClientOp.SqlCursorClose => null,
                 ClientOp.TxBegin => null,
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index 635d083aab8..8f045d6cf8e 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -399,7 +399,7 @@ internal readonly ref struct MsgPackWriter
     /// Writes an array of objects with type codes.
     /// </summary>
     /// <param name="col">Array.</param>
-    public void WriteObjectCollectionAsBinaryTuple(ICollection<object?>? col)
+    public void 
WriteObjectCollectionWithCountAsBinaryTuple(ICollection<object?>? col)
     {
         if (col == null)
         {
@@ -415,13 +415,37 @@ internal readonly ref struct MsgPackWriter
             return;
         }
 
-        using var builder = new BinaryTupleBuilder(col.Count * 3);
+        WriteObjectEnumerableAsBinaryTuple(col, col.Count);
+    }
+
+    /// <summary>
+    /// Writes a collection of objects with type codes.
+    /// </summary>
+    /// <param name="col">Objects.</param>
+    /// <param name="expectedCount">Count.</param>
+    /// <param name="errorPrefix">Error prefix.</param>
+    public void WriteObjectEnumerableAsBinaryTuple(IEnumerable<object?> col, 
int expectedCount, string? errorPrefix = null)
+    {
+        using var builder = new BinaryTupleBuilder(expectedCount * 3);
+        int actualCount = 0;
 
         foreach (var obj in col)
         {
+            actualCount++;
+
+            if (actualCount > expectedCount)
+            {
+                throw new ArgumentException($"{errorPrefix}Expected 
{expectedCount} objects, but got more.");
+            }
+
             builder.AppendObjectWithType(obj);
         }
 
+        if (actualCount != expectedCount)
+        {
+            throw new ArgumentException($"{errorPrefix}Expected 
{expectedCount} objects, but got {actualCount}.");
+        }
+
         Write(builder.Build().Span);
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index 85786ff4e50..696a965d610 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Internal.Sql
 {
     using System;
     using System.Collections.Generic;
+    using System.Linq;
     using System.Threading;
     using System.Threading.Tasks;
     using Buffers;
@@ -111,6 +112,59 @@ namespace Apache.Ignite.Internal.Sql
             }
         }
 
+        /// <inheritdoc/>
+        public async Task<long[]> ExecuteBatchAsync(
+            ITransaction? transaction,
+            SqlStatement statement,
+            IEnumerable<IEnumerable<object?>> args,
+            CancellationToken cancellationToken = default)
+        {
+            IgniteArgumentCheck.NotNull(statement);
+            IgniteArgumentCheck.NotNull(args);
+
+            cancellationToken.ThrowIfCancellationRequested();
+            Transaction? tx = await 
LazyTransaction.EnsureStartedAsync(transaction, _socket, 
default).ConfigureAwait(false);
+
+            using var bufferWriter = ProtoCommon.GetMessageWriter();
+
+            WriteStatement(bufferWriter, statement, tx, writeTx: true);
+            WriteBatchArgs(bufferWriter, args);
+            bufferWriter.MessageWriter.Write(_socket.ObservableTimestamp);
+
+            try
+            {
+                var (buf, _) = await _socket.DoOutInOpAndGetSocketAsync(
+                    ClientOp.SqlExecBatch, tx, bufferWriter, 
cancellationToken: cancellationToken).ConfigureAwait(false);
+
+                using (buf)
+                {
+                    return Read(buf);
+                }
+            }
+            catch (SqlBatchException e)
+            {
+                ConvertExceptionAndThrow(e, statement, cancellationToken);
+
+                throw;
+            }
+
+            static long[] Read(PooledBuffer resBuf)
+            {
+                var r = resBuf.GetReader();
+                r.Skip(4); // Unused values: resourceId, rowSet, morePages, 
wasApplied
+
+                int count = r.ReadInt32();
+                var affectedRows = new long[count];
+
+                for (var i = 0; i < count; i++)
+                {
+                    affectedRows[i] = r.ReadInt64();
+                }
+
+                return affectedRows;
+            }
+        }
+
         /// <inheritdoc/>
         public override string ToString() => 
IgniteToStringBuilder.Build(GetType());
 
@@ -203,7 +257,7 @@ namespace Apache.Ignite.Internal.Sql
             }
         }
 
-        private static void ConvertExceptionAndThrow(SqlException e, 
SqlStatement statement, CancellationToken token)
+        private static void ConvertExceptionAndThrow(IgniteException e, 
SqlStatement statement, CancellationToken token)
         {
             switch (e.Code)
             {
@@ -252,10 +306,48 @@ namespace Apache.Ignite.Internal.Sql
         private static RowReader<T> 
GetReaderFactory<T>(IReadOnlyList<IColumnMetadata> cols) =>
             ResultSelector.Get<T>(cols, selectorExpression: null, 
ResultSelectorOptions.None);
 
-        private void WriteStatement(
+        private static void WriteBatchArgs(PooledArrayBuffer writer, 
IEnumerable<IEnumerable<object?>> args)
+        {
+            int rowSize = -1;
+            int rowCountPos = -1;
+            int rowCount = 0;
+
+            var w = writer.MessageWriter;
+
+            foreach (var arg in args)
+            {
+                IgniteArgumentCheck.NotNull(arg);
+                IEnumerable<object?> row = arg;
+                rowCount++;
+
+                if (rowSize < 0)
+                {
+                    // First row, write header.
+                    if (!row.TryGetNonEnumeratedCount(out rowSize))
+                    {
+                        var list = row.ToList();
+                        rowSize = list.Count;
+                        row = list;
+                    }
+
+                    IgniteArgumentCheck.Ensure(rowSize > 0, nameof(args), 
"Batch arguments must not contain empty rows.");
+
+                    w.Write(rowSize);
+                    rowCountPos = writer.ReserveMsgPackInt32();
+                    w.Write(false); // Paged args.
+                }
+
+                w.WriteObjectEnumerableAsBinaryTuple(row, expectedCount: 
rowSize, errorPrefix: "Inconsistent batch argument size: ");
+            }
+
+            IgniteArgumentCheck.Ensure(rowCount > 0, nameof(args), "Batch 
arguments must not be empty.");
+
+            writer.WriteMsgPackInt32(rowCount, rowCountPos);
+        }
+
+        private static void WriteStatement(
             PooledArrayBuffer writer,
             SqlStatement statement,
-            ICollection<object?>? args,
             Transaction? tx = null,
             bool writeTx = false)
         {
@@ -274,7 +366,20 @@ namespace Apache.Ignite.Internal.Sql
 
             WriteProperties(statement, ref w);
             w.Write(statement.Query);
-            w.WriteObjectCollectionAsBinaryTuple(args);
+        }
+
+        private void WriteStatement(
+            PooledArrayBuffer writer,
+            SqlStatement statement,
+            ICollection<object?>? args,
+            Transaction? tx = null,
+            bool writeTx = false)
+        {
+            var w = writer.MessageWriter;
+
+            WriteStatement(writer, statement, tx, writeTx);
+
+            w.WriteObjectCollectionWithCountAsBinaryTuple(args);
             w.Write(_socket.ObservableTimestamp);
         }
     }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
index 96065335339..1a1dde620e7 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/Serialization/RecordSerializer.cs
@@ -18,7 +18,6 @@
 namespace Apache.Ignite.Internal.Table.Serialization
 {
     using System;
-    using System.Buffers.Binary;
     using System.Collections.Generic;
     using Buffers;
     using Proto.MsgPack;
@@ -220,8 +219,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
             var count = 0;
             var firstHash = 0;
 
-            var countPos = buf.Position;
-            buf.Advance(5);
+            var countPos = buf.ReserveMsgPackInt32();
 
             do
             {
@@ -243,9 +241,7 @@ namespace Apache.Ignite.Internal.Table.Serialization
             }
             while (recs.MoveNext()); // First MoveNext is called outside to 
check for empty IEnumerable.
 
-            var countSpan = buf.GetSpanAt(countPos, 5);
-            countSpan[0] = MsgPackCode.Int32;
-            BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
+            buf.WriteMsgPackInt32(count, countPos);
 
             return (firstHash, txIdPos);
         }
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs 
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index fa51aca74e2..bedda74d7cd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -59,6 +59,7 @@ namespace Apache.Ignite
                 ClientOperationType.ComputeExecuteMapReduce => false,
                 ClientOperationType.SqlExecute => false,
                 ClientOperationType.SqlExecuteScript => false,
+                ClientOperationType.SqlExecuteBatch => false,
                 ClientOperationType.ComputeCancel => false,
                 ClientOperationType.ComputeChangePriority => false,
                 ClientOperationType.ComputeGetStatus => true,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs 
b/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs
index 60806525709..3f9e59d3742 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Sql/ISql.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Sql
 {
+    using System.Collections.Generic;
     using System.Data.Common;
     using System.Threading;
     using System.Threading.Tasks;
@@ -111,5 +112,30 @@ namespace Apache.Ignite.Sql
         /// <param name="args">Arguments.</param>
         /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
         Task ExecuteScriptAsync(SqlStatement script, CancellationToken 
cancellationToken, params object?[]? args);
+
+        /// <summary>
+        /// Executes an SQL statement for every set of arguments and returns 
the number of affected rows for each statement execution.
+        /// <para />
+        /// Only DML statements (INSERT, UPDATE, DELETE) are supported.
+        /// <para />
+        /// <example>
+        /// <code>
+        /// long[] res = await sql.ExecuteBatchAsync(
+        ///     transaction: null,
+        ///     "INSERT INTO Person (Id, Name) VALUES (?, ?)",
+        ///     [[1, "Alice"], [2, "Bob"], [3, "Charlie"]]);
+        /// </code>
+        /// </example>
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <param name="statement">Statement to execute once for every entry 
in <paramref name="args"/>.</param>
+        /// <param name="args">Batched arguments. The specified statement will 
be executed once for each entry in this collection. Cannot be empty or contain 
empty rows.</param>
+        /// <param name="cancellationToken">Cancellation token.</param>
+        /// <returns>The number of affected rows for each set of arguments. 
The size of the returned array will match the size of <paramref 
name="args"/>.</returns>
+        Task<long[]> ExecuteBatchAsync(
+            ITransaction? transaction,
+            SqlStatement statement,
+            IEnumerable<IEnumerable<object?>> args,
+            CancellationToken cancellationToken = default);
     }
 }

Reply via email to