This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 74d5bc69cb IGNITE-21028 .NET: Fix pooled buffer leaks (#3063)
74d5bc69cb is described below
commit 74d5bc69cb3843094cc53b1ae8d0e3d3457bfecb
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Thu Jan 18 17:55:18 2024 +0200
IGNITE-21028 .NET: Fix pooled buffer leaks (#3063)
* Add `TestSharedArrayPoolIsNotUsedDirectly` to ensure that all pooling
goes through our own `ByteArrayPool` class
* Add pooled array tracking to `ByteArrayPool` and check after tests
* Fix detected leaks in core module:
* `ClientSocket.HandleResponse` when there is an error from the server or
an exception
* `ClientSockeet.SendHeartbeatAsync`
* `Sql.ExecuteAsyncInternal` when there is an exception during
`ResultSet` initialization
* `Transaction.CommitAsync`
* Fix many leaks in test code
---
.../Apache.Ignite.Tests/ByteArrayPoolTest.cs | 51 ++++++++++
.../dotnet/Apache.Ignite.Tests/IgniteServerBase.cs | 24 +++--
.../dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs | 7 +-
.../Linq/LinqSqlGenerationTests.cs | 2 +
.../dotnet/Apache.Ignite.Tests/LoggingTests.cs | 11 +-
.../dotnet/Apache.Ignite.Tests/MetricsTests.cs | 2 +
.../dotnet/Apache.Ignite.Tests/MultiClusterTest.cs | 3 +
.../Apache.Ignite.Tests/ProjectFilesTests.cs | 7 +-
.../Proto/MsgPack/MsgPackReaderTests.cs | 4 +-
.../Sql/SqlResultSetObjectMappingTests.cs | 6 +-
.../dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs | 11 +-
.../Table/BinaryTupleIgniteTupleAdapterTests.cs | 4 +-
.../Apache.Ignite.Tests/Table/DataStreamerTests.cs | 2 +-
.../Apache.Ignite.Tests/Table/IgniteTupleTests.cs | 6 ++
.../dotnet/Apache.Ignite.Tests/TestUtils.cs | 20 ++++
.../Transactions/TransactionsTests.cs | 4 +-
.../Internal/Buffers/ByteArrayPool.cs | 14 +++
.../Apache.Ignite/Internal/Buffers/PooledBuffer.cs | 78 +++++++-------
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 113 ++++++++++++++-------
.../dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs | 11 +-
.../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 12 ++-
.../Apache.Ignite/Internal/Table/RecordView.cs | 12 +--
.../Internal/Transactions/Transaction.cs | 4 +-
23 files changed, 287 insertions(+), 121 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ByteArrayPoolTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/ByteArrayPoolTest.cs
new file mode 100644
index 0000000000..b7cfdda7c3
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ByteArrayPoolTest.cs
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Tests;
+
+using System.IO;
+using Internal.Buffers;
+using NUnit.Framework;
+
+/// <summary>
+/// Tests for <see cref="ByteArrayPool"/>.
+/// </summary>
+public class ByteArrayPoolTest
+{
+ /// <summary>
+ /// Ensure that we don't use ArrayPool.Shared directly, but use our
wrapper instead.
+ /// </summary>
+ [Test]
+ public void TestSharedArrayPoolIsNotUsedDirectly()
+ {
+ foreach (var file in ProjectFilesTests.GetCsFiles())
+ {
+ if (file.EndsWith("ByteArrayPool.cs",
System.StringComparison.Ordinal) ||
+ file.EndsWith("ByteArrayPoolTest.cs",
System.StringComparison.Ordinal))
+ {
+ continue;
+ }
+
+ var text = File.ReadAllText(file);
+
+ StringAssert.DoesNotContain(
+ expected: "ArrayPool<byte>",
+ actual: text,
+ message: "ArrayPool<byte> should not be used directly. Use
ByteArrayPool instead: " + file);
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs
index f9efb1565a..06d3a94ed9 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteServerBase.cs
@@ -84,19 +84,27 @@ public abstract class IgniteServerBase : IDisposable
int received = 0;
var buf = ByteArrayPool.Rent(size);
- while (received < size)
+ try
{
- var res = socket.Receive(buf, received, size - received,
SocketFlags.None);
-
- if (res == 0)
+ while (received < size)
{
- throw new ConnectionLostException();
+ var res = socket.Receive(buf, received, size - received,
SocketFlags.None);
+
+ if (res == 0)
+ {
+ throw new ConnectionLostException();
+ }
+
+ received += res;
}
- received += res;
+ return new PooledBuffer(buf, 0, size);
+ }
+ catch (Exception)
+ {
+ ByteArrayPool.Return(buf);
+ throw;
}
-
- return new PooledBuffer(buf, 0, size);
}
protected virtual void Handle(Socket handler, CancellationToken
cancellationToken)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
index fe43913ff2..88eb023be0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/IgniteTestsBase.cs
@@ -133,6 +133,7 @@ namespace Apache.Ignite.Tests
public void SetUp()
{
Console.WriteLine("SetUp: " +
TestContext.CurrentContext.Test.Name);
+ TestUtils.CheckByteArrayPoolLeak();
}
[TearDown]
@@ -140,11 +141,11 @@ namespace Apache.Ignite.Tests
{
Console.WriteLine("TearDown start: " +
TestContext.CurrentContext.Test.Name);
- CheckPooledBufferLeak();
-
_disposables.ForEach(x => x.Dispose());
_disposables.Clear();
+ CheckPooledBufferLeak();
+
Console.WriteLine("TearDown end: " +
TestContext.CurrentContext.Test.Name);
}
@@ -210,6 +211,8 @@ namespace Apache.Ignite.Tests
condition: () => listener.BuffersReturned ==
listener.BuffersRented,
timeoutMs: 1000,
messageFactory: () => $"rented = {listener.BuffersRented},
returned = {listener.BuffersReturned}");
+
+ TestUtils.CheckByteArrayPoolLeak();
}
}
}
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
index 3f50d92aa2..48c18dc638 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Linq/LinqSqlGenerationTests.cs
@@ -432,6 +432,8 @@ public partial class LinqSqlGenerationTests
{
_client.Dispose();
_server.Dispose();
+
+ TestUtils.CheckByteArrayPoolLeak();
}
private void AssertSql(string expectedSql, Func<IQueryable<Poco>, object?>
query) =>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
index d59c10071f..e7266f0ad4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/LoggingTests.cs
@@ -31,12 +31,19 @@ using NUnit.Framework;
/// </summary>
public class LoggingTests
{
+ [TearDown]
+ public void TearDown() => TestUtils.CheckByteArrayPoolLeak(5000);
+
[Test]
[SuppressMessage("ReSharper", "AccessToDisposedClosure", Justification =
"Reviewed.")]
public async Task TestBasicLogging()
{
var logger = new ListLoggerFactory(Enum.GetValues<LogLevel>());
- var cfg = new IgniteClientConfiguration { LoggerFactory = logger };
+ var cfg = new IgniteClientConfiguration
+ {
+ LoggerFactory = logger,
+ SocketTimeout = TimeSpan.FromSeconds(1)
+ };
using var servers = FakeServerGroup.Create(3);
using (var client = await servers.ConnectClientAsync(cfg))
@@ -44,7 +51,7 @@ public class LoggingTests
client.WaitForConnections(3);
await client.Tables.GetTablesAsync();
- await client.Sql.ExecuteAsync(null, "select 1");
+ await using var cursor = await client.Sql.ExecuteAsync(null,
"select 1");
}
var log = logger.GetLogString();
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index ed1ece274f..e3509e537d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -46,6 +46,8 @@ public class MetricsTests
AssertMetric("connections-active", 0);
_listener.Dispose();
+
+ TestUtils.CheckByteArrayPoolLeak(5000);
}
[OneTimeTearDown]
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MultiClusterTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/MultiClusterTest.cs
index cec14fdbc1..73a1e3f3ef 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MultiClusterTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MultiClusterTest.cs
@@ -28,6 +28,9 @@ using NUnit.Framework;
/// </summary>
public class MultiClusterTest
{
+ [TearDown]
+ public void TearDown() => TestUtils.CheckByteArrayPoolLeak();
+
[Test]
public async Task TestClientDropsConnectionOnClusterIdMismatch()
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
index e889cc007a..fa3715a6e3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/ProjectFilesTests.cs
@@ -36,6 +36,8 @@ namespace Apache.Ignite.Tests
private static readonly string InternalDir =
$"{Path.DirectorySeparatorChar}Internal{Path.DirectorySeparatorChar}";
+ public static IEnumerable<string> GetCsFiles() =>
Directory.GetFiles(TestUtils.SolutionDir, "*.cs", SearchOption.AllDirectories);
+
[Test]
public void TestInternalNamespaceHasNoPublicTypes()
{
@@ -113,11 +115,6 @@ namespace Apache.Ignite.Tests
}
}
- private static IEnumerable<string> GetCsFiles()
- {
- return Directory.GetFiles(TestUtils.SolutionDir, "*.cs",
SearchOption.AllDirectories);
- }
-
[SuppressMessage("Design", "CA1064:Exceptions should be public",
Justification = "Tests.")]
[SuppressMessage("Design", "CA1032:Implement standard exception
constructors", Justification = "Tests.")]
private sealed class TodoWithoutTicketException : AssertionException
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MsgPack/MsgPackReaderTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MsgPack/MsgPackReaderTests.cs
index 839ad80f24..99d9513287 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MsgPack/MsgPackReaderTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/MsgPack/MsgPackReaderTests.cs
@@ -197,7 +197,7 @@ public class MsgPackReaderTests
[Test]
public void TestWriteJavaGuidReturnsIdenticalByteRepresentation()
{
- var bufferWriter = new PooledArrayBuffer();
+ using var bufferWriter = new PooledArrayBuffer();
bufferWriter.MessageWriter.Write(Guid.Parse(JavaUuidString));
var bytes = bufferWriter.GetWrittenMemory()
@@ -349,7 +349,7 @@ public class MsgPackReaderTests
private static T WriteRead<T>(Action<PooledArrayBuffer> write,
Func<ReadOnlyMemory<byte>, T> read)
{
- var bufferWriter = new PooledArrayBuffer();
+ using var bufferWriter = new PooledArrayBuffer();
write(bufferWriter);
var mem = bufferWriter.GetWrittenMemory();
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlResultSetObjectMappingTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlResultSetObjectMappingTests.cs
index 803dfe1226..5af9b24b7e 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlResultSetObjectMappingTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlResultSetObjectMappingTests.cs
@@ -68,7 +68,7 @@ public class SqlResultSetObjectMappingTests : IgniteTestsBase
[Test]
public async Task TestSelectOneColumnAsPrimitiveType()
{
- var resultSet = await Client.Sql.ExecuteAsync<int>(null, "select INT32
from TBL_ALL_COLUMNS_SQL where INT32 is not null order by 1");
+ await using var resultSet = await Client.Sql.ExecuteAsync<int>(null,
"select INT32 from TBL_ALL_COLUMNS_SQL where INT32 is not null order by 1");
var rows = await resultSet.ToListAsync();
Assert.AreEqual(Count, rows.Count);
@@ -127,7 +127,7 @@ public class SqlResultSetObjectMappingTests :
IgniteTestsBase
[Test]
public async Task TestSelectNullIntoNonNullablePrimitiveTypeThrows()
{
- var resultSet = await Client.Sql.ExecuteAsync<int>(null, "select INT32
from TBL_ALL_COLUMNS_SQL");
+ await using var resultSet = await Client.Sql.ExecuteAsync<int>(null,
"select INT32 from TBL_ALL_COLUMNS_SQL");
var ex = Assert.ThrowsAsync<InvalidOperationException>(async () =>
await resultSet.ToListAsync());
@@ -139,7 +139,7 @@ public class SqlResultSetObjectMappingTests :
IgniteTestsBase
[Test]
public async Task TestSelectNullIntoNonNullablePrimitiveTypeFieldThrows()
{
- var resultSet = await Client.Sql.ExecuteAsync<IntRec>(null, "select
INT32 from TBL_ALL_COLUMNS_SQL");
+ await using var resultSet = await
Client.Sql.ExecuteAsync<IntRec>(null, "select INT32 from TBL_ALL_COLUMNS_SQL");
var ex = Assert.ThrowsAsync<InvalidOperationException>(async () =>
await resultSet.ToListAsync());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
index 65051aa7d7..4e72d71362 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Sql/SqlTests.cs
@@ -241,7 +241,6 @@ namespace Apache.Ignite.Tests.Sql
Assert.ThrowsAsync<ObjectDisposedException>(async () => await
resultSet.ToListAsync());
var enumerator = resultSet2.GetAsyncEnumerator();
- await enumerator.MoveNextAsync(); // Skip first element.
Assert.ThrowsAsync<ObjectDisposedException>(async () => await
enumerator.MoveNextAsync());
}
@@ -280,7 +279,7 @@ namespace Apache.Ignite.Tests.Sql
// Insert data.
for (var i = 0; i < 10; i++)
{
- var insertRes = await Client.Sql.ExecuteAsync(null, "INSERT
INTO TestDdlDml VALUES (?, ?)", i, "hello " + i);
+ await using var insertRes = await
Client.Sql.ExecuteAsync(null, "INSERT INTO TestDdlDml VALUES (?, ?)", i, "hello
" + i);
Assert.IsFalse(insertRes.HasRowSet);
Assert.IsFalse(insertRes.WasApplied);
@@ -289,7 +288,7 @@ namespace Apache.Ignite.Tests.Sql
}
// Query data.
- var selectRes = await Client.Sql.ExecuteAsync(null, "SELECT VAL as
MYVALUE, ID, ID + 1 FROM TestDdlDml ORDER BY ID");
+ await using var selectRes = await Client.Sql.ExecuteAsync(null,
"SELECT VAL as MYVALUE, ID, ID + 1 FROM TestDdlDml ORDER BY ID");
Assert.IsTrue(selectRes.HasRowSet);
Assert.IsFalse(selectRes.WasApplied);
@@ -319,7 +318,7 @@ namespace Apache.Ignite.Tests.Sql
Assert.IsNull(columns[2].Origin);
// Update data.
- var updateRes = await Client.Sql.ExecuteAsync(null, "UPDATE
TESTDDLDML SET VAL='upd' WHERE ID < 5");
+ await using var updateRes = await Client.Sql.ExecuteAsync(null,
"UPDATE TESTDDLDML SET VAL='upd' WHERE ID < 5");
Assert.IsFalse(updateRes.WasApplied);
Assert.IsFalse(updateRes.HasRowSet);
@@ -327,7 +326,7 @@ namespace Apache.Ignite.Tests.Sql
Assert.AreEqual(5, updateRes.AffectedRows);
// Drop table.
- var deleteRes = await Client.Sql.ExecuteAsync(null, "DROP TABLE
TESTDDLDML");
+ await using var deleteRes = await Client.Sql.ExecuteAsync(null,
"DROP TABLE TESTDDLDML");
Assert.IsFalse(deleteRes.HasRowSet);
Assert.IsNull(deleteRes.Metadata);
@@ -398,7 +397,7 @@ namespace Apache.Ignite.Tests.Sql
pageSize: 987,
properties: new Dictionary<string, object?> { { "prop1", 10 },
{ "prop-2", "xyz" } });
- var res = await client.Sql.ExecuteAsync(null, sqlStatement);
+ await using var res = await client.Sql.ExecuteAsync(null,
sqlStatement);
var rows = await res.ToListAsync();
var props = rows.ToDictionary(x => (string)x["NAME"]!, x =>
(string)x["VAL"]!);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/BinaryTupleIgniteTupleAdapterTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/BinaryTupleIgniteTupleAdapterTests.cs
index cd6bb62f3e..3a1727b461 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/BinaryTupleIgniteTupleAdapterTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/BinaryTupleIgniteTupleAdapterTests.cs
@@ -70,7 +70,7 @@ public class BinaryTupleIgniteTupleAdapterTests :
IgniteTupleTests
protected override IIgniteTuple CreateTuple(IIgniteTuple source)
{
var cols = new List<Column>();
- var builder = new BinaryTupleBuilder(source.FieldCount);
+ using var builder = new BinaryTupleBuilder(source.FieldCount);
for (var i = 0; i < source.FieldCount; i++)
{
@@ -83,7 +83,7 @@ public class BinaryTupleIgniteTupleAdapterTests :
IgniteTupleTests
builder.AppendObject(val, type);
}
- var buf = builder.Build();
+ var buf = builder.Build().ToArray();
var schema = new Schema(0, 0, 0, 0, cols);
return new BinaryTupleIgniteTupleAdapter(buf, schema, cols.Count);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
index d53774938b..cd7249ade8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/DataStreamerTests.cs
@@ -179,7 +179,7 @@ public class DataStreamerTests : IgniteTestsBase
await
table!.RecordBinaryView.StreamDataAsync(GetFakeServerData(count));
Assert.AreEqual(count, server.UpsertAllRowCount);
- Assert.AreEqual(count / DataStreamerOptions.Default.BatchSize,
server.DroppedConnectionCount);
+ Assert.That(server.DroppedConnectionCount,
Is.GreaterThanOrEqualTo(count / DataStreamerOptions.Default.BatchSize));
}
private static async IAsyncEnumerable<IIgniteTuple> GetFakeServerData(int
count)
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/IgniteTupleTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/IgniteTupleTests.cs
index 40ed2d7149..b9eb625fd3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/IgniteTupleTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/IgniteTupleTests.cs
@@ -27,6 +27,12 @@ namespace Apache.Ignite.Tests.Table
/// </summary>
public class IgniteTupleTests
{
+ [TearDown]
+ public void TearDown()
+ {
+ TestUtils.CheckByteArrayPoolLeak();
+ }
+
[Test]
public void TestCreateUpdateRead()
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
index e32dbbf2bf..13df36ce58 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/TestUtils.cs
@@ -20,9 +20,12 @@ namespace Apache.Ignite.Tests
using System;
using System.Diagnostics;
using System.IO;
+ using System.Linq;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
+ using Internal.Buffers;
+ using Internal.Common;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
@@ -76,6 +79,23 @@ namespace Apache.Ignite.Tests
public static ILoggerFactory GetConsoleLoggerFactory(LogLevel
minLevel) => new ConsoleLogger(minLevel);
+ public static void CheckByteArrayPoolLeak(int timeoutMs = 1000)
+ {
+#if DEBUG
+ WaitForCondition(
+ condition: () => ByteArrayPool.CurrentlyRentedArrays.IsEmpty,
+ timeoutMs: timeoutMs,
+ messageFactory: () =>
+ {
+ var bufs = ByteArrayPool.CurrentlyRentedArrays
+ .Select(x => $"{x.Value.DeclaringType}.{x.Value.Name}")
+ .StringJoin();
+
+ return $"Leaked buffers: {bufs}";
+ });
+#endif
+ }
+
private static FieldInfo GetNonPublicField(object obj, string
fieldName)
{
var field = obj.GetType().GetField(fieldName,
BindingFlags.Instance | BindingFlags.NonPublic);
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
index ecb2f87914..00acc0f4ec 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Tests/Transactions/TransactionsTests.cs
@@ -35,7 +35,7 @@ namespace Apache.Ignite.Tests.Transactions
[TearDown]
public async Task CleanTable()
{
- await TupleView.DeleteAllAsync(null, Enumerable.Range(1,
2).Select(x => GetTuple(x)));
+ await TupleView.DeleteAllAsync(null, Enumerable.Range(1,
3).Select(x => GetTuple(x)));
}
[Test]
@@ -300,7 +300,7 @@ namespace Apache.Ignite.Tests.Transactions
// Transactional operations propagate timestamp.
if (sql)
{
- await client.Sql.ExecuteAsync(null, "select 1");
+ await using var resultSet = await
client.Sql.ExecuteAsync(null, "select 1");
}
else
{
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/ByteArrayPool.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/ByteArrayPool.cs
index 408ea94a0b..08f1f3a6ce 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/ByteArrayPool.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/ByteArrayPool.cs
@@ -18,6 +18,9 @@
namespace Apache.Ignite.Internal.Buffers
{
using System.Buffers;
+ using System.Collections.Concurrent;
+ using System.Diagnostics;
+ using System.Reflection;
/// <summary>
/// Wrapper for the standard <see cref="ArrayPool{T}.Shared"/> with safety
checks in debug mode.
@@ -25,6 +28,11 @@ namespace Apache.Ignite.Internal.Buffers
internal static class ByteArrayPool
{
#if DEBUG
+ /// <summary>
+ /// Gets the currently rented arrays.
+ /// </summary>
+ public static readonly ConcurrentDictionary<byte[], MethodBase>
CurrentlyRentedArrays = new();
+
/// <summary>
/// Track pooled arrays in debug mode to detect double-return - the
most dangerous scenario which can cause application-wide
/// memory corruption, when the same array is returned from the pool
twice and used concurrently.
@@ -45,6 +53,10 @@ namespace Apache.Ignite.Internal.Buffers
var bytes = ArrayPool<byte>.Shared.Rent(minimumLength);
#if DEBUG
+ var stackTrace = new StackTrace();
+ var frame = stackTrace.GetFrame(1);
+
+ CurrentlyRentedArrays.TryAdd(bytes, frame!.GetMethod()!);
ReturnedArrays.Remove(bytes);
#endif
@@ -58,6 +70,8 @@ namespace Apache.Ignite.Internal.Buffers
public static void Return(byte[] array)
{
#if DEBUG
+ CurrentlyRentedArrays.TryRemove(array, out _);
+
// Will throw when key exists.
ReturnedArrays.Add(array, null);
#endif
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledBuffer.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledBuffer.cs
index 1940b26470..1f59a1d9ce 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledBuffer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Buffers/PooledBuffer.cs
@@ -18,36 +18,27 @@
namespace Apache.Ignite.Internal.Buffers
{
using System;
- using System.Diagnostics;
- using System.Diagnostics.CodeAnalysis;
using Proto.MsgPack;
/// <summary>
/// Pooled byte buffer. Wraps a byte array rented from <see
cref="ByteArrayPool"/>,
/// returns it to the pool on <see cref="Dispose"/>.
/// </summary>
- [SuppressMessage(
- "Microsoft.Performance",
- "CA1815:OverrideEqualsAndOperatorEqualsOnValueTypes",
- Justification = "Not used in comparisons")]
- internal readonly struct PooledBuffer : IDisposable
+ internal sealed class PooledBuffer : IDisposable
{
/// <summary>
/// Default capacity for all buffers.
/// </summary>
public const int DefaultCapacity = 65_535;
- /** Bytes. */
private readonly byte[] _bytes;
- /** Position. */
- private readonly int _position;
-
- /** Length. */
private readonly int _length;
+ private bool _disposed;
+
/// <summary>
- /// Initializes a new instance of the <see cref="PooledBuffer"/>
struct.
+ /// Initializes a new instance of the <see cref="PooledBuffer"/> class.
/// </summary>
/// <param name="bytes">Bytes.</param>
/// <param name="position">Data position within specified byte
array.</param>
@@ -56,56 +47,73 @@ namespace Apache.Ignite.Internal.Buffers
public PooledBuffer(byte[] bytes, int position, int length, object?
metadata = null)
{
_bytes = bytes;
- _position = position;
+ Position = position;
_length = length;
Metadata = metadata;
}
/// <summary>
- /// Gets the optional metadata.
+ /// Finalizes an instance of the <see cref="PooledBuffer"/> class.
/// </summary>
- public object? Metadata { get; }
+ ~PooledBuffer()
+ {
+ Dispose();
+ }
/// <summary>
- /// Gets a <see cref="MsgPackReader"/> for this buffer.
+ /// Gets or sets the position.
/// </summary>
- /// <param name="offset">Offset.</param>
- /// <returns><see cref="MsgPackReader"/> for this buffer.</returns>
- public MsgPackReader GetReader(int offset = 0) =>
new(_bytes.AsSpan(_position + offset, _length - offset));
+ public int Position { get; set; }
/// <summary>
- /// Gets this buffer contents as memory.
+ /// Gets or sets the optional metadata.
/// </summary>
- /// <param name="offset">Offset.</param>
- /// <returns>Memory of byte.</returns>
- public ReadOnlyMemory<byte> AsMemory(int offset = 0) => new(_bytes,
_position + offset, _length - offset);
+ public object? Metadata { get; set; }
/// <summary>
- /// Gets a slice of the current buffer.
+ /// Gets a <see cref="MsgPackReader"/> for this buffer.
/// </summary>
/// <param name="offset">Offset.</param>
- /// <returns>Sliced buffer.</returns>
- public PooledBuffer Slice(int offset)
+ /// <returns><see cref="MsgPackReader"/> for this buffer.</returns>
+ public MsgPackReader GetReader(int offset = 0)
{
- Debug.Assert(offset > 0, "offset > 0");
- Debug.Assert(offset <= _length, "offset <= _length");
-
- return new(_bytes, _position + offset, _length - offset);
+ CheckDisposed();
+ return new MsgPackReader(_bytes.AsSpan(Position + offset, _length
- offset - Position));
}
/// <summary>
- /// Gets a copy of the current buffer with the specified metadata.
+ /// Gets this buffer contents as memory.
/// </summary>
- /// <param name="metadata">Metadata.</param>
- /// <returns>Buffer.</returns>
- public PooledBuffer WithMetadata(object? metadata) => new(_bytes,
_position, _length, metadata);
+ /// <param name="offset">Offset.</param>
+ /// <returns>Memory of byte.</returns>
+ public ReadOnlyMemory<byte> AsMemory(int offset = 0)
+ {
+ CheckDisposed();
+ return new ReadOnlyMemory<byte>(_bytes, Position + offset, _length
- offset - Position);
+ }
/// <summary>
/// Releases the pooled buffer.
/// </summary>
public void Dispose()
{
+ if (_disposed)
+ {
+ return;
+ }
+
ByteArrayPool.Return(_bytes);
+ _disposed = true;
+
+ GC.SuppressFinalize(this);
+ }
+
+ private void CheckDisposed()
+ {
+ if (_disposed)
+ {
+ throw new ObjectDisposedException(nameof(PooledBuffer));
+ }
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index 045ae33d57..99b5cc127a 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -302,10 +302,9 @@ namespace Apache.Ignite.Internal
{
await SendRequestAsync(request, clientOp,
requestId).ConfigureAwait(false);
PooledBuffer resBuf = await
taskCompletionSource.Task.ConfigureAwait(false);
+ resBuf.Metadata = notificationHandler;
- return notificationHandler == null
- ? resBuf
- : resBuf.WithMetadata(notificationHandler);
+ return resBuf;
}
catch (Exception e)
{
@@ -682,7 +681,10 @@ namespace Apache.Ignite.Internal
// Invoke response handler in another thread to continue
the receive loop.
// Response buffer should be disposed by the task handler.
- ThreadPool.QueueUserWorkItem(r =>
HandleResponse((PooledBuffer)r!), response);
+ ThreadPool.QueueUserWorkItem<(ClientSocket Socket,
PooledBuffer Buf)>(
+ callBack: static r => r.Socket.HandleResponse(r.Buf),
+ state: (this, response),
+ preferLocal: true);
}
}
catch (Exception e)
@@ -694,7 +696,41 @@ namespace Apache.Ignite.Internal
}
}
+ [SuppressMessage("Design", "CA1031:Do not catch general exception
types", Justification = "Thread root.")]
private void HandleResponse(PooledBuffer response)
+ {
+ bool handled = false;
+
+ try
+ {
+ handled = HandleResponseInner(response);
+ }
+ catch (IgniteClientConnectionException e)
+ {
+ Dispose(e);
+ }
+ catch (Exception e)
+ {
+ var message = "Exception while handling response, connection
closed: " + e.Message;
+ Dispose(new
IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
+ }
+ finally
+ {
+ if (!handled)
+ {
+ response.Dispose();
+ }
+ }
+ }
+
+ /// <summary>
+ /// Handles a server response.
+ /// </summary>
+ /// <param name="response">Response buffer.</param>
+ /// <returns>
+ /// A value indicating whether the response buffer was passed on to
the final handler and does not need to be disposed.
+ /// </returns>
+ private bool HandleResponseInner(PooledBuffer response)
{
var reader = response.GetReader();
@@ -707,11 +743,11 @@ namespace Apache.Ignite.Internal
HandleObservableTimestamp(ref reader);
var exception = flags.HasFlag(ResponseFlags.Error) ? ReadError(ref
reader) : null;
+ response.Position += reader.Consumed;
if (flags.HasFlag(ResponseFlags.Notification))
{
- HandleNotification(requestId, exception, response,
reader.Consumed);
- return;
+ return HandleNotification(requestId, exception, response);
}
if (!_requests.TryRemove(requestId, out var taskCompletionSource))
@@ -720,29 +756,51 @@ namespace Apache.Ignite.Internal
$"[remoteAddress={ConnectionContext.ClusterNode.Address}], closing the socket.";
_logger.LogUnexpectedResponseIdError(null, message);
- Dispose(new
IgniteClientConnectionException(ErrorGroups.Client.Protocol, message));
-
- return;
+ throw new
IgniteClientConnectionException(ErrorGroups.Client.Protocol, message);
}
Metrics.RequestsActiveDecrement();
if (exception != null)
{
- response.Dispose();
-
Metrics.RequestsFailed.Add(1);
taskCompletionSource.TrySetException(exception);
+ return false;
}
- else
+
+ Metrics.RequestsCompleted.Add(1);
+
+ return taskCompletionSource.TrySetResult(response);
+ }
+
+ /// <summary>
+ /// Handles a server notification.
+ /// </summary>
+ /// <param name="requestId">Request id.</param>
+ /// <param name="exception">Exception.</param>
+ /// <param name="response">Response buffer.</param>
+ /// <returns>
+ /// A value indicating whether the response buffer was passed on to
the final handler and does not need to be disposed.
+ /// </returns>
+ private bool HandleNotification(long requestId, Exception? exception,
PooledBuffer response)
+ {
+ if (!_notificationHandlers.TryRemove(requestId, out var
notificationHandler))
{
- var resultBuffer = response.Slice(reader.Consumed);
+ var message = $"Unexpected notification ID ({requestId})
received from the server " +
+
$"[remoteAddress={ConnectionContext.ClusterNode.Address}], closing the socket.";
- Metrics.RequestsCompleted.Add(1);
+ _logger.LogUnexpectedResponseIdError(null, message);
+ throw new
IgniteClientConnectionException(ErrorGroups.Client.Protocol, message);
+ }
- taskCompletionSource.TrySetResult(resultBuffer);
+ if (exception != null)
+ {
+ notificationHandler.TrySetException(exception);
+ return false;
}
+
+ return notificationHandler.TrySetResult(response);
}
private void HandleObservableTimestamp(ref MsgPackReader reader)
@@ -763,29 +821,6 @@ namespace Apache.Ignite.Internal
}
}
- private void HandleNotification(long requestId, Exception? exception,
PooledBuffer response, int consumed)
- {
- if (!_notificationHandlers.TryRemove(requestId, out var
notificationHandler))
- {
- var message = $"Unexpected notification ID ({requestId})
received from the server " +
-
$"[remoteAddress={ConnectionContext.ClusterNode.Address}], closing the socket.";
-
- _logger.LogUnexpectedResponseIdError(null, message);
- Dispose(new
IgniteClientConnectionException(ErrorGroups.Client.Protocol, message));
-
- return;
- }
-
- if (exception != null)
- {
- notificationHandler.TrySetException(exception);
- }
- else
- {
- notificationHandler.TrySetResult(response.Slice(consumed));
- }
- }
-
/// <summary>
/// Sends heartbeat message.
/// </summary>
@@ -797,7 +832,7 @@ namespace Apache.Ignite.Internal
{
try
{
- await
DoOutInOpAsync(ClientOp.Heartbeat).WaitAsync(_socketTimeout).ConfigureAwait(false);
+ using var buf = await
DoOutInOpAsync(ClientOp.Heartbeat).WaitAsync(_socketTimeout).ConfigureAwait(false);
}
catch (Exception e)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
index 887fff64d2..a2bd3a6f52 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/ResultSet.cs
@@ -77,7 +77,8 @@ namespace Apache.Ignite.Internal.Sql
if (HasRowSet)
{
- _buffer = buf.Slice(reader.Consumed);
+ buf.Position += reader.Consumed;
+ _buffer = buf;
HasRows = reader.ReadInt32() > 0;
}
else
@@ -154,7 +155,7 @@ namespace Apache.Ignite.Internal.Sql
var hasMore = _hasMorePages;
TResult? res = default;
- ReadPage(_buffer!.Value);
+ ReadPage(_buffer!);
ReleaseBuffer();
while (hasMore)
@@ -208,7 +209,7 @@ namespace Apache.Ignite.Internal.Sql
using var writer = ProtoCommon.GetMessageWriter();
WriteId(writer.MessageWriter);
- await _socket.DoOutInOpAsync(ClientOp.SqlCursorClose,
writer).ConfigureAwait(false);
+ using var buffer = await
_socket.DoOutInOpAsync(ClientOp.SqlCursorClose, writer).ConfigureAwait(false);
}
catch (Exception)
{
@@ -247,7 +248,7 @@ namespace Apache.Ignite.Internal.Sql
{
ValidateAndSetIteratorState();
- yield return _buffer!.Value;
+ yield return _buffer!;
ReleaseBuffer();
@@ -324,7 +325,7 @@ namespace Apache.Ignite.Internal.Sql
var offset = 0;
// First page.
- foreach (var row in EnumeratePage(_buffer!.Value))
+ foreach (var row in EnumeratePage(_buffer!))
{
yield return row;
}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index 2742f1e51c..46442d96b4 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -162,21 +162,31 @@ namespace Apache.Ignite.Internal.Sql
using var bufferWriter = ProtoCommon.GetMessageWriter();
WriteStatement(bufferWriter, statement, args, tx, writeTx: true);
+ PooledBuffer? buf = null;
+
try
{
- var (buf, socket) = await
_socket.DoOutInOpAndGetSocketAsync(ClientOp.SqlExec, tx,
bufferWriter).ConfigureAwait(false);
+ (buf, var socket) = await
_socket.DoOutInOpAndGetSocketAsync(ClientOp.SqlExec, tx,
bufferWriter).ConfigureAwait(false);
// ResultSet will dispose the pooled buffer.
return new ResultSet<T>(socket, buf, rowReaderFactory);
}
catch (SqlException e) when (e.Code == ErrorGroups.Sql.StmtParse)
{
+ buf?.Dispose();
+
throw new SqlException(
e.TraceId,
ErrorGroups.Sql.StmtValidation,
"Invalid query, check inner exceptions for details: " +
statement,
e);
}
+ catch (Exception)
+ {
+ buf?.Dispose();
+
+ throw;
+ }
}
private static void WriteProperties(SqlStatement statement, ref
MsgPackWriter w)
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
index 277d48fe74..b8f93f4bf9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Table/RecordView.cs
@@ -158,10 +158,10 @@ namespace Apache.Ignite.Internal.Table
return resultFactory(0);
}
- var resSchema = await
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
+ var resSchema = await
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
// TODO: Read value parts only (IGNITE-16022).
- return _ser.ReadMultipleNullable(resBuf.Value, resSchema,
resultFactory, addAction);
+ return _ser.ReadMultipleNullable(resBuf, resSchema, resultFactory,
addAction);
}
/// <inheritdoc/>
@@ -211,11 +211,11 @@ namespace Apache.Ignite.Internal.Table
return Array.Empty<T>();
}
- var resSchema = await
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
+ var resSchema = await
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
// TODO: Read value parts only (IGNITE-16022).
return _ser.ReadMultiple(
- buf: resBuf.Value,
+ buf: resBuf,
schema: resSchema,
keyOnly: false,
resultFactory: static count => count == 0
@@ -370,11 +370,11 @@ namespace Apache.Ignite.Internal.Table
return resultFactory(0);
}
- var resSchema = await
_table.ReadSchemaAsync(resBuf.Value).ConfigureAwait(false);
+ var resSchema = await
_table.ReadSchemaAsync(resBuf).ConfigureAwait(false);
// TODO: Read value parts only (IGNITE-16022).
return _ser.ReadMultiple(
- buf: resBuf.Value,
+ buf: resBuf,
schema: resSchema,
keyOnly: !exact,
resultFactory: resultFactory,
diff --git
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
index f52f6be431..aeb9b36d09 100644
---
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
+++
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transaction.cs
@@ -82,7 +82,7 @@ namespace Apache.Ignite.Internal.Transactions
using var writer = ProtoCommon.GetMessageWriter();
Write(writer.MessageWriter);
- await Socket.DoOutInOpAsync(ClientOp.TxCommit,
writer).ConfigureAwait(false);
+ using var buffer = await
Socket.DoOutInOpAsync(ClientOp.TxCommit, writer).ConfigureAwait(false);
}
}
@@ -120,7 +120,7 @@ namespace Apache.Ignite.Internal.Transactions
using var writer = ProtoCommon.GetMessageWriter();
Write(writer.MessageWriter);
- await Socket.DoOutInOpAsync(ClientOp.TxRollback,
writer).ConfigureAwait(false);
+ using var buffer = await
Socket.DoOutInOpAsync(ClientOp.TxRollback, writer).ConfigureAwait(false);
}
}