This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb-client-csharp.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 26d96b1 [To dev/1.3] Fix SessionPool client leak on reconnection and
query failures, and preserve server error messages (#45)
26d96b1 is described below
commit 26d96b12ccb169913cee61ef0f23199948ba461d
Author: CritasWang <[email protected]>
AuthorDate: Tue Feb 10 15:33:43 2026 +0800
[To dev/1.3] Fix SessionPool client leak on reconnection and query
failures, and preserve server error messages (#45)
* [To dev/1.3] Fix SessionPool client leak on reconnection and query
failures, and preserve server error messages
* fix ByteBuffer build error ,use Array.Reverse()
---
docs/SessionPool_Exception_Handling.md | 487 +++++++++++++++++++++++
src/Apache.IoTDB.Data/IoTDBDataReader.cs | 4 +-
src/Apache.IoTDB/ConcurrentClientQueue.cs | 68 ++--
src/Apache.IoTDB/DataStructure/ByteBuffer.cs | 68 ++--
src/Apache.IoTDB/DataStructure/SessionDataSet.cs | 18 +-
src/Apache.IoTDB/PoolHealthMetrics.cs | 52 +++
src/Apache.IoTDB/ReconnectionFailedException.cs | 41 ++
src/Apache.IoTDB/SessionPool.cs | 98 +++--
src/Apache.IoTDB/SessionPoolDepletedException.cs | 74 ++++
9 files changed, 815 insertions(+), 95 deletions(-)
diff --git a/docs/SessionPool_Exception_Handling.md
b/docs/SessionPool_Exception_Handling.md
new file mode 100644
index 0000000..75fd019
--- /dev/null
+++ b/docs/SessionPool_Exception_Handling.md
@@ -0,0 +1,487 @@
+# SessionPool Exception Handling and Health Monitoring
+
+## Overview
+
+The Apache IoTDB C# client library provides comprehensive exception handling
and health monitoring capabilities for SessionPool operations. This document
explains how to handle pool depletion scenarios, monitor pool health, and
implement recovery strategies.
+
+## SessionPoolDepletedException
+
+### Description
+
+`SessionPoolDepletedException` is a specialized exception thrown when the
SessionPool cannot provide a client connection. This indicates that:
+
+- All clients in the pool are currently in use, OR
+- Client connections have failed and reconnection attempts were unsuccessful,
OR
+- The pool wait timeout has been exceeded
+
+### Exception Properties
+
+The exception provides detailed diagnostic information through the following
properties:
+
+| Property | Type | Description
|
+| --------------------- | ------ |
-------------------------------------------------------------------------- |
+| `DepletionReason` | string | A human-readable description of why the
pool was depleted |
+| `AvailableClients` | int | Number of currently available clients in
the pool at the time of exception |
+| `TotalPoolSize` | int | The total configured size of the session
pool |
+| `FailedReconnections` | int | Number of failed reconnection attempts
since the pool was opened |
+
+### Example Usage
+
+```csharp
+using Apache.IoTDB;
+using System;
+
+try
+{
+ var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(4)
+ .Build();
+
+ await sessionPool.Open();
+
+ // Perform operations...
+ await sessionPool.InsertRecordAsync("root.sg.d1", record);
+}
+catch (SessionPoolDepletedException ex)
+{
+ Console.WriteLine($"Pool depleted: {ex.DepletionReason}");
+ Console.WriteLine($"Available clients:
{ex.AvailableClients}/{ex.TotalPoolSize}");
+ Console.WriteLine($"Failed reconnections: {ex.FailedReconnections}");
+
+ // Implement recovery strategy (see below)
+}
+```
+
+## Pool Health Metrics
+
+### Monitoring Pool Status
+
+The `SessionPool` class exposes real-time health metrics that can be used for
monitoring and alerting:
+
+```csharp
+var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(8)
+ .Build();
+
+await sessionPool.Open();
+
+// Check pool health
+Console.WriteLine($"Available Clients: {sessionPool.AvailableClients}");
+Console.WriteLine($"Total Pool Size: {sessionPool.TotalPoolSize}");
+Console.WriteLine($"Failed Reconnections: {sessionPool.FailedReconnections}");
+```
+
+### Health Metrics
+
+| Metric | Property | Description
| Recommended Threshold |
+| -------------------- | --------------------- |
------------------------------------------------ | --------------------------- |
+| Available Clients | `AvailableClients` | Number of idle clients ready
for use | Alert if < 25% of pool size |
+| Total Pool Size | `TotalPoolSize` | Configured maximum pool size
| N/A (constant) |
+| Failed Reconnections | `FailedReconnections` | Cumulative count of failed
reconnection attempts | Alert if > 0 and increasing |
+
+## Failure Scenarios and Recovery Strategies
+
+### Scenario 1: Pool Exhaustion (High Load)
+
+**Symptoms:**
+
+- `SessionPoolDepletedException` with reason "Connection pool is empty and
wait time out"
+- `AvailableClients` = 0
+- `FailedReconnections` = 0 or low
+
+**Root Cause:** Application workload exceeds pool capacity
+
+**Recovery Strategies:**
+
+1. **Increase Pool Size:**
+
+```csharp
+var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(16) // Increased from 8
+ .Build();
+```
+
+2. **Implement Connection Retry with Backoff:**
+
+```csharp
+int maxRetries = 3;
+int retryDelayMs = 1000;
+
+for (int i = 0; i < maxRetries; i++)
+{
+ try
+ {
+ await sessionPool.InsertRecordAsync(deviceId, record);
+ break; // Success
+ }
+ catch (SessionPoolDepletedException ex) when (i < maxRetries - 1)
+ {
+ await Task.Delay(retryDelayMs * (i + 1)); // Exponential backoff
+ }
+}
+```
+
+3. **Optimize Operation Duration:**
+ - Reduce the time each client is held
+ - Batch multiple operations together
+ - Use async operations efficiently
+
+### Scenario 2: Network Connectivity Issues
+
+**Symptoms:**
+
+- `SessionPoolDepletedException` with reason "Reconnection failed"
+- `AvailableClients` decreases over time
+- `FailedReconnections` > 0 and increasing
+
+**Root Cause:** IoTDB server unreachable or network issues
+
+**Recovery Strategies:**
+
+1. **Reinitialize SessionPool:**
+
+```csharp
+catch (SessionPoolDepletedException ex) when (ex.FailedReconnections > 5)
+{
+ Console.WriteLine($"Critical: {ex.FailedReconnections} failed
reconnections");
+
+ // Close existing pool
+ await sessionPool.Close();
+
+ // Wait for network recovery
+ await Task.Delay(5000);
+
+ // Create new pool
+ sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(8)
+ .Build();
+
+ await sessionPool.Open();
+}
+```
+
+2. **Implement Circuit Breaker Pattern:**
+
+```csharp
+public class SessionPoolCircuitBreaker
+{
+ private SessionPool _pool;
+ private int _failureCount = 0;
+ private const int FailureThreshold = 5;
+ private bool _circuitOpen = false;
+ private DateTime _lastFailureTime;
+
+ public async Task<T> ExecuteAsync<T>(Func<SessionPool, Task<T>> operation)
+ {
+ if (_circuitOpen && DateTime.Now - _lastFailureTime <
TimeSpan.FromMinutes(1))
+ {
+ throw new Exception("Circuit breaker is open");
+ }
+
+ try
+ {
+ var result = await operation(_pool);
+ _failureCount = 0; // Reset on success
+ _circuitOpen = false;
+ return result;
+ }
+ catch (SessionPoolDepletedException ex)
+ {
+ _failureCount++;
+ _lastFailureTime = DateTime.Now;
+
+ if (_failureCount >= FailureThreshold)
+ {
+ _circuitOpen = true;
+ Console.WriteLine("Circuit breaker opened - too many
failures");
+ }
+ throw;
+ }
+ }
+}
+```
+
+### Scenario 3: Server Overload
+
+**Symptoms:**
+
+- Intermittent `SessionPoolDepletedException`
+- Both connection timeouts and reconnection failures
+
+**Root Cause:** IoTDB server is overloaded
+
+**Recovery Strategies:**
+
+1. **Implement Rate Limiting:**
+
+```csharp
+using System.Threading;
+
+private SemaphoreSlim _rateLimiter = new SemaphoreSlim(10, 10); // Max 10
concurrent operations
+
+public async Task RateLimitedInsert(string deviceId, RowRecord record)
+{
+ await _rateLimiter.WaitAsync();
+ try
+ {
+ await sessionPool.InsertRecordAsync(deviceId, record);
+ }
+ finally
+ {
+ _rateLimiter.Release();
+ }
+}
+```
+
+2. **Add Timeout Configuration:**
+
+```csharp
+var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .Timeout(120) // Increased timeout for slow server
+ .Build();
+```
+
+## Monitoring and Alerting Recommendations
+
+### Health Check Implementation
+
+```csharp
+public class SessionPoolHealthCheck
+{
+ private readonly SessionPool _pool;
+
+ public SessionPoolHealthCheck(SessionPool pool)
+ {
+ _pool = pool;
+ }
+
+ public HealthStatus CheckHealth()
+ {
+ var availableRatio = (double)_pool.AvailableClients /
_pool.TotalPoolSize;
+
+ if (_pool.FailedReconnections > 10)
+ {
+ return new HealthStatus
+ {
+ Status = "Critical",
+ Message = $"High reconnection failures:
{_pool.FailedReconnections}",
+ Recommendation = "Check IoTDB server availability"
+ };
+ }
+
+ if (availableRatio < 0.25)
+ {
+ return new HealthStatus
+ {
+ Status = "Warning",
+ Message = $"Low available clients:
{_pool.AvailableClients}/{_pool.TotalPoolSize}",
+ Recommendation = "Consider increasing pool size"
+ };
+ }
+
+ return new HealthStatus
+ {
+ Status = "Healthy",
+ Message = $"Pool healthy:
{_pool.AvailableClients}/{_pool.TotalPoolSize} available"
+ };
+ }
+}
+
+public class HealthStatus
+{
+ public string Status { get; set; }
+ public string Message { get; set; }
+ public string Recommendation { get; set; }
+}
+```
+
+### Metrics Collection for Monitoring Systems
+
+```csharp
+// Example: Export metrics to Prometheus, StatsD, or similar
+public class SessionPoolMetricsCollector
+{
+ private readonly SessionPool _pool;
+
+ public void CollectMetrics()
+ {
+ // Gauge: Current available clients
+ MetricsCollector.Set("iotdb_pool_available_clients",
_pool.AvailableClients);
+
+ // Gauge: Total pool size
+ MetricsCollector.Set("iotdb_pool_total_size", _pool.TotalPoolSize);
+
+ // Counter: Failed reconnections
+ MetricsCollector.Set("iotdb_pool_failed_reconnections",
_pool.FailedReconnections);
+
+ // Calculated: Pool utilization percentage
+ var utilization = (1.0 - (double)_pool.AvailableClients /
_pool.TotalPoolSize) * 100;
+ MetricsCollector.Set("iotdb_pool_utilization_percent", utilization);
+ }
+}
+```
+
+### Recommended Alert Rules
+
+1. **Critical Alerts:**
+ - `FailedReconnections > 10`: Server connectivity issues
+ - `AvailableClients == 0` for > 30 seconds: Complete pool exhaustion
+
+2. **Warning Alerts:**
+ - `AvailableClients < TotalPoolSize * 0.25`: Pool under pressure
+ - `FailedReconnections > 0` and increasing: Network instability
+
+3. **Info Alerts:**
+ - Pool utilization > 75% for extended periods: Consider scaling
+
+## Best Practices
+
+1. **Pool Sizing:**
+ - Start with poolSize = 2 × expected concurrent operations
+ - Monitor and adjust based on actual usage patterns
+ - Larger pools use more server resources but provide better throughput
+
+2. **Error Handling:**
+ - Always catch `SessionPoolDepletedException` specifically
+ - Log exception properties for debugging
+ - Implement appropriate retry logic based on depletion reason
+
+3. **Monitoring:**
+ - Continuously monitor `AvailableClients` metric
+ - Track `FailedReconnections` as a leading indicator of problems
+ - Set up alerts before pool is completely depleted
+
+4. **Resource Management:**
+ - Always call `sessionPool.Close()` when done
+ - Use `using` statements or try-finally blocks for proper cleanup
+ - Don't create multiple SessionPool instances unnecessarily
+
+## Example: Complete Production-Ready Implementation
+
+```csharp
+using Apache.IoTDB;
+using System;
+using System.Threading.Tasks;
+
+public class ProductionSessionPoolManager
+{
+ private SessionPool _pool;
+ private readonly object _lock = new object();
+
+ public async Task Initialize()
+ {
+ _pool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(8)
+ .Timeout(60)
+ .Build();
+
+ await _pool.Open();
+
+ // Start health monitoring
+ _ = Task.Run(MonitorHealth);
+ }
+
+ public async Task<T> ExecuteWithRetry<T>(Func<SessionPool, Task<T>>
operation)
+ {
+ const int maxRetries = 3;
+ const int baseDelayMs = 1000;
+
+ for (int attempt = 0; attempt < maxRetries; attempt++)
+ {
+ try
+ {
+ return await operation(_pool);
+ }
+ catch (SessionPoolDepletedException ex)
+ {
+ Console.WriteLine($"Attempt {attempt + 1} failed:
{ex.Message}");
+ Console.WriteLine($"Pool state - Available:
{ex.AvailableClients}/{ex.TotalPoolSize}, Failed reconnections:
{ex.FailedReconnections}");
+
+ if (attempt == maxRetries - 1)
+ {
+ // Last attempt failed
+ if (ex.FailedReconnections > 5)
+ {
+ // Reinitialize pool
+ await ReinitializePool();
+ }
+ throw;
+ }
+
+ // Exponential backoff
+ await Task.Delay(baseDelayMs * (int)Math.Pow(2, attempt));
+ }
+ }
+
+ throw new InvalidOperationException("Should not reach here");
+ }
+
+ private async Task ReinitializePool()
+ {
+ lock (_lock)
+ {
+ try
+ {
+ _pool?.Close().Wait();
+ }
+ catch { }
+ }
+
+ await Task.Delay(5000); // Wait for server recovery
+ await Initialize();
+ }
+
+ private async Task MonitorHealth()
+ {
+ while (true)
+ {
+ await Task.Delay(10000); // Check every 10 seconds
+
+ try
+ {
+ var availableRatio = (double)_pool.AvailableClients /
_pool.TotalPoolSize;
+
+ if (_pool.FailedReconnections > 10)
+ {
+ Console.WriteLine($"CRITICAL: {_pool.FailedReconnections}
failed reconnections");
+ }
+ else if (availableRatio < 0.25)
+ {
+ Console.WriteLine($"WARNING: Low available clients -
{_pool.AvailableClients}/{_pool.TotalPoolSize}");
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Health check failed: {ex.Message}");
+ }
+ }
+ }
+
+ public async Task Cleanup()
+ {
+ await _pool?.Close();
+ }
+}
+```
+
+## Summary
+
+The SessionPool exception handling and health monitoring features provide
comprehensive tools for building robust IoTDB applications:
+
+- Use `SessionPoolDepletedException` to understand and react to pool issues
+- Monitor `AvailableClients`, `TotalPoolSize`, and `FailedReconnections`
metrics
+- Implement appropriate recovery strategies based on failure scenarios
+- Set up proactive monitoring and alerting to prevent issues
+- Follow best practices for pool sizing and resource management
diff --git a/src/Apache.IoTDB.Data/IoTDBDataReader.cs
b/src/Apache.IoTDB.Data/IoTDBDataReader.cs
index 156844e..7076a68 100644
--- a/src/Apache.IoTDB.Data/IoTDBDataReader.cs
+++ b/src/Apache.IoTDB.Data/IoTDBDataReader.cs
@@ -56,8 +56,8 @@ namespace Apache.IoTDB.Data
_command = IoTDBCommand;
_closeConnection = closeConnection;
_fieldCount = dataSet.ColumnNames.Count;
- _hasRows = dataSet.RowCount > 0;
- _recordsAffected = dataSet.RowCount;
+ _hasRows = dataSet.CurrentBatchRowCount() > 0;
+ _recordsAffected = -1; // Total row count is unknown; use -1 per
ADO.NET convention
_closed = _closeConnection;
_metas = dataSet.ColumnNames;
_dataSet = dataSet;
diff --git a/src/Apache.IoTDB/ConcurrentClientQueue.cs
b/src/Apache.IoTDB/ConcurrentClientQueue.cs
index e53d3f3..6090c19 100644
--- a/src/Apache.IoTDB/ConcurrentClientQueue.cs
+++ b/src/Apache.IoTDB/ConcurrentClientQueue.cs
@@ -28,6 +28,7 @@ namespace Apache.IoTDB
public class ConcurrentClientQueue
{
public ConcurrentQueue<Client> ClientQueue { get; }
+ internal IPoolDiagnosticReporter DiagnosticReporter { get; set; }
public ConcurrentClientQueue(List<Client> clients)
{
@@ -42,55 +43,62 @@ namespace Apache.IoTDB
public void Return(Client client)
{
Monitor.Enter(ClientQueue);
- ClientQueue.Enqueue(client);
- Monitor.PulseAll(ClientQueue); // wake up all threads waiting on
the queue, refresh the waiting time
- Monitor.Exit(ClientQueue);
- Thread.Sleep(0);
- }
- int _ref = 0;
- public void AddRef()
- {
- lock (this)
+ try
{
- _ref++;
+ ClientQueue.Enqueue(client);
+ Monitor.PulseAll(ClientQueue); // wake up all threads waiting
on the queue, refresh the waiting time
}
- }
- public int GetRef()
- {
- return _ref;
- }
- public void RemoveRef()
- {
- lock (this)
+ finally
{
- _ref--;
+ Monitor.Exit(ClientQueue);
}
+ Thread.Sleep(0);
}
+ private int _ref = 0;
+ public void AddRef() => Interlocked.Increment(ref _ref);
+ public int GetRef() => Volatile.Read(ref _ref);
+ public void RemoveRef() => Interlocked.Decrement(ref _ref);
public int Timeout { get; set; } = 10;
public Client Take()
{
Client client = null;
Monitor.Enter(ClientQueue);
- while (true)
+ try
{
- bool timeout = false;
- if (ClientQueue.IsEmpty)
+ while (true)
{
- timeout = !Monitor.Wait(ClientQueue,
TimeSpan.FromSeconds(Timeout));
- }
- ClientQueue.TryDequeue(out client);
+ bool timeout = false;
+ if (ClientQueue.IsEmpty)
+ {
+ timeout = !Monitor.Wait(ClientQueue,
TimeSpan.FromSeconds(Timeout));
+ }
+ ClientQueue.TryDequeue(out client);
- if (client != null || timeout)
- {
- break;
+ if (client != null || timeout)
+ {
+ break;
+ }
}
}
- Monitor.Exit(ClientQueue);
+ finally
+ {
+ Monitor.Exit(ClientQueue);
+ }
if (client == null)
{
- throw new TimeoutException($"Connection pool is empty and wait
time out({Timeout}s)!");
+ var reasonPhrase = $"Connection pool is empty and wait time
out({Timeout}s)";
+ if (DiagnosticReporter != null)
+ {
+ throw
DiagnosticReporter.BuildDepletionException(reasonPhrase);
+ }
+ throw new TimeoutException(reasonPhrase);
}
return client;
}
}
+
+ internal interface IPoolDiagnosticReporter
+ {
+ SessionPoolDepletedException BuildDepletionException(string
reasonPhrase);
+ }
}
diff --git a/src/Apache.IoTDB/DataStructure/ByteBuffer.cs
b/src/Apache.IoTDB/DataStructure/ByteBuffer.cs
index a3e1823..091080c 100644
--- a/src/Apache.IoTDB/DataStructure/ByteBuffer.cs
+++ b/src/Apache.IoTDB/DataStructure/ByteBuffer.cs
@@ -18,7 +18,6 @@
*/
using System;
-using System.Linq;
using System.Text;
namespace Apache.IoTDB.DataStructure
@@ -70,9 +69,12 @@ namespace Apache.IoTDB.DataStructure
public int GetInt()
{
var intBuff = _buffer[_readPos..(_readPos + 4)];
- if (_isLittleEndian) intBuff = intBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(intBuff);
+ }
#if NET461_OR_GREATER || NETSTANDARD2_0
- var intValue = BitConverter.ToInt32(intBuff,0);
+ var intValue = BitConverter.ToInt32(intBuff, 0);
#else
var intValue = BitConverter.ToInt32(intBuff);
#endif
@@ -84,10 +86,12 @@ namespace Apache.IoTDB.DataStructure
public long GetLong()
{
var longBuff = _buffer[_readPos..(_readPos + 8)];
-
- if (_isLittleEndian) longBuff = longBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(longBuff);
+ }
#if NET461_OR_GREATER || NETSTANDARD2_0
- var longValue = BitConverter.ToInt64(longBuff,0);
+ var longValue = BitConverter.ToInt64(longBuff, 0);
#else
var longValue = BitConverter.ToInt64(longBuff);
#endif
@@ -99,10 +103,12 @@ namespace Apache.IoTDB.DataStructure
public float GetFloat()
{
var floatBuff = _buffer[_readPos..(_readPos + 4)];
-
- if (_isLittleEndian) floatBuff = floatBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(floatBuff);
+ }
#if NET461_OR_GREATER || NETSTANDARD2_0
- var floatValue = BitConverter.ToSingle(floatBuff,0);
+ var floatValue = BitConverter.ToSingle(floatBuff, 0);
#else
var floatValue = BitConverter.ToSingle(floatBuff);
#endif
@@ -113,10 +119,12 @@ namespace Apache.IoTDB.DataStructure
public double GetDouble()
{
var doubleBuff = _buffer[_readPos..(_readPos + 8)];
-
- if (_isLittleEndian) doubleBuff = doubleBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(doubleBuff);
+ }
#if NET461_OR_GREATER || NETSTANDARD2_0
- var doubleValue = BitConverter.ToDouble(doubleBuff,0);
+ var doubleValue = BitConverter.ToDouble(doubleBuff, 0);
#else
var doubleValue = BitConverter.ToDouble(doubleBuff);
#endif
@@ -162,8 +170,10 @@ namespace Apache.IoTDB.DataStructure
public void AddBool(bool value)
{
var boolBuffer = BitConverter.GetBytes(value);
-
- if (_isLittleEndian) boolBuffer = boolBuffer.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(boolBuffer);
+ }
ExtendBuffer(boolBuffer.Length);
boolBuffer.CopyTo(_buffer, _writePos);
@@ -173,8 +183,10 @@ namespace Apache.IoTDB.DataStructure
public void AddInt(int value)
{
var intBuff = BitConverter.GetBytes(value);
-
- if (_isLittleEndian) intBuff = intBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(intBuff);
+ }
ExtendBuffer(intBuff.Length);
intBuff.CopyTo(_buffer, _writePos);
@@ -184,8 +196,10 @@ namespace Apache.IoTDB.DataStructure
public void AddLong(long value)
{
var longBuff = BitConverter.GetBytes(value);
-
- if (_isLittleEndian) longBuff = longBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(longBuff);
+ }
ExtendBuffer(longBuff.Length);
longBuff.CopyTo(_buffer, _writePos);
@@ -195,8 +209,10 @@ namespace Apache.IoTDB.DataStructure
public void AddFloat(float value)
{
var floatBuff = BitConverter.GetBytes(value);
-
- if (_isLittleEndian) floatBuff = floatBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(floatBuff);
+ }
ExtendBuffer(floatBuff.Length);
floatBuff.CopyTo(_buffer, _writePos);
@@ -206,8 +222,10 @@ namespace Apache.IoTDB.DataStructure
public void AddDouble(double value)
{
var doubleBuff = BitConverter.GetBytes(value);
-
- if (_isLittleEndian) doubleBuff = doubleBuff.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(doubleBuff);
+ }
ExtendBuffer(doubleBuff.Length);
doubleBuff.CopyTo(_buffer, _writePos);
@@ -237,8 +255,10 @@ namespace Apache.IoTDB.DataStructure
public void AddChar(char value)
{
var charBuf = BitConverter.GetBytes(value);
-
- if (_isLittleEndian) charBuf = charBuf.Reverse().ToArray();
+ if (_isLittleEndian)
+ {
+ Array.Reverse(charBuf);
+ }
ExtendBuffer(charBuf.Length);
charBuf.CopyTo(_buffer, _writePos);
diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
index 539de1f..75f47d0 100644
--- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
+++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
@@ -52,7 +52,22 @@ namespace Apache.IoTDB.DataStructure
private int Flag => 0x80;
private int DefaultTimeout => 10000;
public int FetchSize { get; set; }
+
+ /// <summary>
+ /// Gets the number of rows in the current fetched batch.
+ /// Note: This is NOT the total row count of the query result. Use
HasNext() to check for more data.
+ /// </summary>
+ /// <returns>The number of rows in the current batch.</returns>
+ public int CurrentBatchRowCount() => _currentBatchRowCount;
+
+ /// <summary>
+ /// Gets the number of rows in the current fetched batch.
+ /// </summary>
+ /// <returns>The number of rows in the current batch.</returns>
+ [Obsolete("Use CurrentBatchRowCount() instead. This property returns
batch size, not total row count.")]
public int RowCount { get; set; }
+
+ private int _currentBatchRowCount;
public SessionDataSet(string sql, TSExecuteStatementResp resp, Client
client, ConcurrentClientQueue clientQueue, long statementId)
{
_clientQueue = clientQueue;
@@ -74,7 +89,8 @@ namespace Apache.IoTDB.DataStructure
// some internal variable
_hasCatchedResult = false;
_rowIndex = 0;
- RowCount = _queryDataset.Time.Length / sizeof(long);
+ _currentBatchRowCount = _queryDataset.Time.Length / sizeof(long);
+ RowCount = _currentBatchRowCount;
_columnNames = resp.Columns;
_columnTypeLst = resp.DataTypeList;
diff --git a/src/Apache.IoTDB/PoolHealthMetrics.cs
b/src/Apache.IoTDB/PoolHealthMetrics.cs
new file mode 100644
index 0000000..52bcc5f
--- /dev/null
+++ b/src/Apache.IoTDB/PoolHealthMetrics.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+
+namespace Apache.IoTDB
+{
+ /// <summary>
+ /// Encapsulates real-time health statistics for connection pool
monitoring.
+ /// Thread-safe implementation for concurrent access patterns.
+ /// </summary>
+ internal class PoolHealthMetrics
+ {
+ private int _reconnectionFailureTally;
+ private readonly int _configuredMaxSize;
+
+ public PoolHealthMetrics(int configuredMaxSize)
+ {
+ _configuredMaxSize = configuredMaxSize;
+ }
+
+ public void IncrementReconnectionFailures()
+ {
+ Interlocked.Increment(ref _reconnectionFailureTally);
+ }
+
+ public void ResetAllCounters()
+ {
+ Interlocked.Exchange(ref _reconnectionFailureTally, 0);
+ }
+
+ public int GetReconnectionFailureTally() => Volatile.Read(ref
_reconnectionFailureTally);
+
+ public int GetConfiguredMaxSize() => _configuredMaxSize;
+ }
+}
diff --git a/src/Apache.IoTDB/ReconnectionFailedException.cs
b/src/Apache.IoTDB/ReconnectionFailedException.cs
new file mode 100644
index 0000000..db80d6b
--- /dev/null
+++ b/src/Apache.IoTDB/ReconnectionFailedException.cs
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Thrift;
+
+namespace Apache.IoTDB
+{
+ /// <summary>
+ /// Exception thrown when all reconnection attempts to the server have
failed.
+ /// This exception is used internally to distinguish reconnection failures
from other errors.
+ /// </summary>
+ internal class ReconnectionFailedException : TException
+ {
+ internal ReconnectionFailedException(string message)
+ : base(message, null)
+ {
+ }
+
+ internal ReconnectionFailedException(string message, Exception
innerException)
+ : base(message, innerException)
+ {
+ }
+ }
+}
diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs
index 3cb3e08..2d3e4a1 100644
--- a/src/Apache.IoTDB/SessionPool.cs
+++ b/src/Apache.IoTDB/SessionPool.cs
@@ -35,9 +35,10 @@ using Thrift.Transport.Client;
namespace Apache.IoTDB
{
- public partial class SessionPool : IDisposable
+ public partial class SessionPool : IDisposable, IPoolDiagnosticReporter
{
private static readonly TSProtocolVersion ProtocolVersion =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
+ private const string DepletionReasonReconnectFailed = "Reconnection
failed";
private readonly string _username;
private readonly string _password;
@@ -56,8 +57,25 @@ namespace Apache.IoTDB
private bool _isClose = true;
private ConcurrentClientQueue _clients;
private ILogger _logger;
+ private PoolHealthMetrics _healthMetrics;
+
public delegate Task<TResult> AsyncOperation<TResult>(Client client);
+ /// <summary>
+ /// Retrieves current count of idle clients ready for operations.
+ /// </summary>
+ public int AvailableClients => _clients?.ClientQueue.Count ?? 0;
+
+ /// <summary>
+ /// Retrieves the configured maximum capacity of the session pool.
+ /// </summary>
+ public int TotalPoolSize => _healthMetrics?.GetConfiguredMaxSize() ??
_poolSize;
+
+ /// <summary>
+ /// Retrieves cumulative tally of reconnection failures since pool was
opened.
+ /// </summary>
+ public int FailedReconnections =>
_healthMetrics?.GetReconnectionFailureTally() ?? 0;
+
public SessionPool(string host, int port, int poolSize)
: this(host, port, "root", "root", 1024, "UTC+08:00",
poolSize, true, 60)
@@ -132,52 +150,62 @@ namespace Apache.IoTDB
public async Task<TResult>
ExecuteClientOperationAsync<TResult>(AsyncOperation<TResult> operation, string
errMsg, bool retryOnFailure = true, bool putClientBack = true)
{
Client client = _clients.Take();
+ bool shouldReturnClient = true;
+ bool operationSucceeded = false;
try
{
var resp = await operation(client);
+ operationSucceeded = true;
return resp;
}
- catch (TException ex)
+ catch (Exception ex)
{
if (retryOnFailure)
{
+ // Try to reconnect
try
{
client = await Reconnect(client);
- return await operation(client);
+ // Reconnect succeeded, client is now a new healthy
connection
}
- catch (TException retryEx)
+ catch (ReconnectionFailedException reconnectEx)
{
- throw new TException(errMsg, retryEx);
+ // Reconnection failed - original client was closed by
Reconnect
+ shouldReturnClient = false;
+ throw new
SessionPoolDepletedException(DepletionReasonReconnectFailed, AvailableClients,
TotalPoolSize, FailedReconnections, reconnectEx);
}
- }
- else
- {
- throw new TException(errMsg, ex);
- }
- }
- catch (Exception ex)
- {
- if (retryOnFailure)
- {
+
+ // Reconnect succeeded, try the operation again
try
{
- client = await Reconnect(client);
- return await operation(client);
+ var resp = await operation(client);
+ operationSucceeded = true;
+ return resp;
}
- catch (TException retryEx)
+ catch (Exception retryEx)
{
- throw new TException(errMsg, retryEx);
+ // Retry operation failed, but client is healthy and
should be returned to pool
+ // shouldReturnClient remains true
+ string detailedMsg = $"{errMsg}. {retryEx.Message}";
+ throw new TException(detailedMsg, retryEx);
}
}
else
{
- throw new TException(errMsg, ex);
+ // Preserve original error message from server
+ string detailedMsg = $"{errMsg}. {ex.Message}";
+ throw new TException(detailedMsg, ex);
}
}
finally
{
- if (putClientBack)
+ // Return client to pool if:
+ // 1. putClientBack is true (normal operations - client should
always be returned), OR
+ // 2. putClientBack is false (query operations) BUT operation
failed, meaning SessionDataSet
+ // wasn't created and won't manage the client
+ // Do NOT return if reconnection failed (shouldReturnClient is
false) because client was closed by Reconnect
+ bool shouldReturnForQueryFailure = !putClientBack &&
!operationSucceeded;
+ if (shouldReturnClient && (putClientBack ||
shouldReturnForQueryFailure))
{
_clients.Add(client);
}
@@ -212,8 +240,10 @@ namespace Apache.IoTDB
public async Task Open(CancellationToken cancellationToken = default)
{
+ _healthMetrics = new PoolHealthMetrics(_poolSize);
_clients = new ConcurrentClientQueue();
_clients.Timeout = _timeout * 5;
+ _clients.DiagnosticReporter = this;
if (_nodeUrls.Count == 0)
{
@@ -225,10 +255,7 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Currently connecting to
{0}:{1} failed", _host, _port);
- }
+ _logger?.LogWarning(e, "Failed to create connection
{0}/{1} to {2}:{3}", index + 1, _poolSize, _host, _port);
}
}
}
@@ -252,10 +279,7 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Currently connecting to
{0}:{1} failed", endPoint.Ip, endPoint.Port);
- }
+ _logger?.LogWarning(e, "Failed to create
connection to {0}:{1}", endPoint.Ip, endPoint.Port);
}
}
if (!isConnected) // current client could not connect to
any endpoint
@@ -288,10 +312,7 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Attempt reconnecting to
{0}:{1} failed", _host, _port);
- }
+ _logger?.LogWarning(e, "Reconnection attempt {0}/{1}
to {2}:{3} failed", attempt, RetryNum, _host, _port);
}
}
}
@@ -315,16 +336,14 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Attempt connecting to
{0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
- }
+ _logger?.LogWarning(e, "Reconnection attempt
{0}/{1} to {2}:{3} failed", attempt, RetryNum, _endPoints[j].Ip,
_endPoints[j].Port);
}
}
}
}
- throw new TException("Error occurs when reconnecting session pool.
Could not connect to any server", null);
+ _healthMetrics?.IncrementReconnectionFailures();
+ throw new ReconnectionFailedException("Error occurs when
reconnecting session pool. Could not connect to any server");
}
@@ -1695,5 +1714,8 @@ namespace Apache.IoTDB
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
+
+ SessionPoolDepletedException
IPoolDiagnosticReporter.BuildDepletionException(string reasonPhrase)
+ => new SessionPoolDepletedException(reasonPhrase,
AvailableClients, TotalPoolSize, FailedReconnections);
}
}
diff --git a/src/Apache.IoTDB/SessionPoolDepletedException.cs
b/src/Apache.IoTDB/SessionPoolDepletedException.cs
new file mode 100644
index 0000000..e489ba0
--- /dev/null
+++ b/src/Apache.IoTDB/SessionPoolDepletedException.cs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Thrift;
+
+namespace Apache.IoTDB
+{
+ /// <summary>
+ /// Specialized exception raised when SessionPool cannot allocate a client
connection.
+ /// Includes diagnostic data for troubleshooting capacity and connectivity
problems.
+ /// </summary>
+ public class SessionPoolDepletedException : TException
+ {
+ /// <summary>
+ /// Descriptive explanation of what caused the pool depletion event.
+ /// </summary>
+ public string DepletionReason { get; }
+
+ /// <summary>
+ /// Number of clients available for use at the moment of exception.
+ /// </summary>
+ public int AvailableClients { get; }
+
+ /// <summary>
+ /// Maximum configured pool size limit.
+ /// </summary>
+ public int TotalPoolSize { get; }
+
+ /// <summary>
+ /// Accumulated number of unsuccessful reconnection attempts.
+ /// </summary>
+ public int FailedReconnections { get; }
+
+ internal SessionPoolDepletedException(
+ string depletionReason,
+ int availableClients,
+ int totalPoolSize,
+ int failedReconnections)
+ : this(depletionReason, availableClients, totalPoolSize,
failedReconnections, null)
+ {
+ }
+
+ internal SessionPoolDepletedException(
+ string depletionReason,
+ int availableClients,
+ int totalPoolSize,
+ int failedReconnections,
+ Exception causedBy)
+ : base($"SessionPool depletion detected: {depletionReason}",
causedBy)
+ {
+ DepletionReason = depletionReason;
+ AvailableClients = availableClients;
+ TotalPoolSize = totalPoolSize;
+ FailedReconnections = failedReconnections;
+ }
+ }
+}