This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-csharp.git
The following commit(s) were added to refs/heads/main by this push:
new 6f451bd Fix SessionPool client leak on reconnection and query
failures, and preserve server error messages (#44)
6f451bd is described below
commit 6f451bdff65a816f1ac59a266875b41ad2610af8
Author: CritasWang <[email protected]>
AuthorDate: Tue Feb 10 15:33:24 2026 +0800
Fix SessionPool client leak on reconnection and query failures, and
preserve server error messages (#44)
* Fix SessionPool client leak on reconnection and query failures, and
preserve server error messages
- Add SessionPoolDepletedException with diagnostic properties for pool
depletion scenarios
- Add ReconnectionFailedException for type-safe reconnection failure
detection
- Fix client leak when reconnection succeeds but retry operation fails
- Add PoolHealthMetrics for thread-safe health monitoring
- Add try-finally protection for Monitor locks in ConcurrentClientQueue
- Remove silent failure pattern - always log connection failures
- Add CurrentBatchRowCount() method with Obsolete attribute on RowCount()
- Improve database switch error handling with partial failure detection
* Update src/Apache.IoTDB/SessionPool.cs
Co-authored-by: Copilot <[email protected]>
* use try-finally for Return Client
* Feature/dotnet format ci (#46)
* switch to dotnet format
* format
---------
Co-authored-by: Copilot <[email protected]>
---
.editorconfig | 4 +
.github/workflows/pre-commit-format.yml | 30 +-
README.md | 18 +
README_ZH.md | 18 +
docs/SessionPool_Exception_Handling.md | 487 +++++++++++++++++++++++
src/Apache.IoTDB.Data/IoTDBDataReader.cs | 4 +-
src/Apache.IoTDB/ConcurrentClientQueue.cs | 68 ++--
src/Apache.IoTDB/DataStructure/SessionDataSet.cs | 13 +
src/Apache.IoTDB/PoolHealthMetrics.cs | 52 +++
src/Apache.IoTDB/ReconnectionFailedException.cs | 41 ++
src/Apache.IoTDB/Rpc/TSStatusCode.cs | 2 +-
src/Apache.IoTDB/SessionPool.cs | 114 ++++--
src/Apache.IoTDB/SessionPoolDepletedException.cs | 74 ++++
13 files changed, 825 insertions(+), 100 deletions(-)
diff --git a/.editorconfig b/.editorconfig
index 7f6de4d..3c8687f 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -37,6 +37,10 @@ indent_size = 2
[*{_AssemblyInfo.cs,.notsupported.cs,*/obj/*/External/**/*,*/obj/dotnet-new.IntegrationTests/*/TemplatePackagesPaths.cs}]
generated_code = true
+# Thrift generated code
+[src/Apache.IoTDB/Rpc/Generated/**]
+generated_code = true
+
# C# files
[*.cs]
# New line preferences
diff --git a/.github/workflows/pre-commit-format.yml
b/.github/workflows/pre-commit-format.yml
index 3b7589c..7ef53be 100644
--- a/.github/workflows/pre-commit-format.yml
+++ b/.github/workflows/pre-commit-format.yml
@@ -7,15 +7,13 @@ on:
- "**"
merge_group:
branches: [main]
- # schedule:
- # - cron: "0 0 * * *"
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
- formatting-checks:
+ dotnet-format:
runs-on: ubuntu-22.04
env:
DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1
@@ -29,27 +27,9 @@ jobs:
with:
dotnet-version: "9.0.x"
- - name: Setup Python environment (for pre-commit)
- uses: actions/setup-python@v5
- with:
- python-version: "3.10"
-
- - name: Clean dotnet temporary folder
- run: |
- sudo rm -rf /tmp/.dotnet
- mkdir -p ${{ runner.temp }}/dotnet-home
- mkdir -p ${{ runner.temp }}/xdg-runtime
-
- - name: Install pre-commit and dependencies
- run: |
- pip install pre-commit
- pre-commit install-hooks
+ - name: Restore dependencies
+ run: dotnet restore
- - name: Run pre-commit checks
- env:
- TMPDIR: ${{ runner.temp }}
- DOTNET_CLI_HOME: ${{ runner.temp }}/dotnet-home
- XDG_RUNTIME_DIR: ${{ runner.temp }}/xdg-runtime
- NUGET_PACKAGES: ${{ runner. temp }}/nuget-packages
+ - name: Check formatting
run: |
- pre-commit run --all-files
+ dotnet format --verify-no-changes --verbosity diagnostic
diff --git a/README.md b/README.md
index 7f46166..f395763 100644
--- a/README.md
+++ b/README.md
@@ -82,5 +82,23 @@ NLog >= 4.7.9
* dotnet CLI
* Thrift
+## Code Formatting
+
+This project uses `dotnet format` to enforce consistent code style based on
the [.editorconfig](.editorconfig) rules.
+
+### Check formatting locally
+
+```bash
+dotnet format --verify-no-changes
+```
+
+### Auto-fix formatting issues
+
+```bash
+dotnet format
+```
+
+The CI pipeline will automatically check code formatting on all pull requests.
Please ensure your code is properly formatted before submitting a PR.
+
## Publish your own client on nuget.org
You can find out how to publish from this [doc](./PUBLISH.md).
\ No newline at end of file
diff --git a/README_ZH.md b/README_ZH.md
index 440477c..df2f500 100644
--- a/README_ZH.md
+++ b/README_ZH.md
@@ -81,5 +81,23 @@ NLog >= 4.7.9
* dotnet CLI
* Thrift
+## 代码格式化
+
+本项目使用 `dotnet format` 基于 [.editorconfig](.editorconfig) 规则来强制执行一致的代码风格。
+
+### 本地检查格式
+
+```bash
+dotnet format --verify-no-changes
+```
+
+### 自动修复格式问题
+
+```bash
+dotnet format
+```
+
+CI 流水线会在所有 Pull Request 上自动检查代码格式。请确保在提交 PR 之前代码格式正确。
+
## 在 nuget.org 上发布你自己的客户端
你可以在这个[文档](./PUBLISH.md)中找到如何发布
\ No newline at end of file
diff --git a/docs/SessionPool_Exception_Handling.md
b/docs/SessionPool_Exception_Handling.md
new file mode 100644
index 0000000..75fd019
--- /dev/null
+++ b/docs/SessionPool_Exception_Handling.md
@@ -0,0 +1,487 @@
+# SessionPool Exception Handling and Health Monitoring
+
+## Overview
+
+The Apache IoTDB C# client library provides comprehensive exception handling
and health monitoring capabilities for SessionPool operations. This document
explains how to handle pool depletion scenarios, monitor pool health, and
implement recovery strategies.
+
+## SessionPoolDepletedException
+
+### Description
+
+`SessionPoolDepletedException` is a specialized exception thrown when the
SessionPool cannot provide a client connection. This indicates that:
+
+- All clients in the pool are currently in use, OR
+- Client connections have failed and reconnection attempts were unsuccessful,
OR
+- The pool wait timeout has been exceeded
+
+### Exception Properties
+
+The exception provides detailed diagnostic information through the following
properties:
+
+| Property | Type | Description
|
+| --------------------- | ------ |
-------------------------------------------------------------------------- |
+| `DepletionReason` | string | A human-readable description of why the
pool was depleted |
+| `AvailableClients` | int | Number of currently available clients in
the pool at the time of exception |
+| `TotalPoolSize` | int | The total configured size of the session
pool |
+| `FailedReconnections` | int | Number of failed reconnection attempts
since the pool was opened |
+
+### Example Usage
+
+```csharp
+using Apache.IoTDB;
+using System;
+
+try
+{
+ var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(4)
+ .Build();
+
+ await sessionPool.Open();
+
+ // Perform operations...
+ await sessionPool.InsertRecordAsync("root.sg.d1", record);
+}
+catch (SessionPoolDepletedException ex)
+{
+ Console.WriteLine($"Pool depleted: {ex.DepletionReason}");
+ Console.WriteLine($"Available clients:
{ex.AvailableClients}/{ex.TotalPoolSize}");
+ Console.WriteLine($"Failed reconnections: {ex.FailedReconnections}");
+
+ // Implement recovery strategy (see below)
+}
+```
+
+## Pool Health Metrics
+
+### Monitoring Pool Status
+
+The `SessionPool` class exposes real-time health metrics that can be used for
monitoring and alerting:
+
+```csharp
+var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(8)
+ .Build();
+
+await sessionPool.Open();
+
+// Check pool health
+Console.WriteLine($"Available Clients: {sessionPool.AvailableClients}");
+Console.WriteLine($"Total Pool Size: {sessionPool.TotalPoolSize}");
+Console.WriteLine($"Failed Reconnections: {sessionPool.FailedReconnections}");
+```
+
+### Health Metrics
+
+| Metric | Property | Description
| Recommended Threshold |
+| -------------------- | --------------------- |
------------------------------------------------ | --------------------------- |
+| Available Clients | `AvailableClients` | Number of idle clients ready
for use | Alert if < 25% of pool size |
+| Total Pool Size | `TotalPoolSize` | Configured maximum pool size
| N/A (constant) |
+| Failed Reconnections | `FailedReconnections` | Cumulative count of failed
reconnection attempts | Alert if > 0 and increasing |
+
+## Failure Scenarios and Recovery Strategies
+
+### Scenario 1: Pool Exhaustion (High Load)
+
+**Symptoms:**
+
+- `SessionPoolDepletedException` with reason "Connection pool is empty and
wait time out"
+- `AvailableClients` = 0
+- `FailedReconnections` = 0 or low
+
+**Root Cause:** Application workload exceeds pool capacity
+
+**Recovery Strategies:**
+
+1. **Increase Pool Size:**
+
+```csharp
+var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(16) // Increased from 8
+ .Build();
+```
+
+2. **Implement Connection Retry with Backoff:**
+
+```csharp
+int maxRetries = 3;
+int retryDelayMs = 1000;
+
+for (int i = 0; i < maxRetries; i++)
+{
+ try
+ {
+ await sessionPool.InsertRecordAsync(deviceId, record);
+ break; // Success
+ }
+ catch (SessionPoolDepletedException ex) when (i < maxRetries - 1)
+ {
+ await Task.Delay(retryDelayMs * (i + 1)); // Exponential backoff
+ }
+}
+```
+
+3. **Optimize Operation Duration:**
+ - Reduce the time each client is held
+ - Batch multiple operations together
+ - Use async operations efficiently
+
+### Scenario 2: Network Connectivity Issues
+
+**Symptoms:**
+
+- `SessionPoolDepletedException` with reason "Reconnection failed"
+- `AvailableClients` decreases over time
+- `FailedReconnections` > 0 and increasing
+
+**Root Cause:** IoTDB server unreachable or network issues
+
+**Recovery Strategies:**
+
+1. **Reinitialize SessionPool:**
+
+```csharp
+catch (SessionPoolDepletedException ex) when (ex.FailedReconnections > 5)
+{
+ Console.WriteLine($"Critical: {ex.FailedReconnections} failed
reconnections");
+
+ // Close existing pool
+ await sessionPool.Close();
+
+ // Wait for network recovery
+ await Task.Delay(5000);
+
+ // Create new pool
+ sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(8)
+ .Build();
+
+ await sessionPool.Open();
+}
+```
+
+2. **Implement Circuit Breaker Pattern:**
+
+```csharp
+public class SessionPoolCircuitBreaker
+{
+ private SessionPool _pool;
+ private int _failureCount = 0;
+ private const int FailureThreshold = 5;
+ private bool _circuitOpen = false;
+ private DateTime _lastFailureTime;
+
+ public async Task<T> ExecuteAsync<T>(Func<SessionPool, Task<T>> operation)
+ {
+ if (_circuitOpen && DateTime.Now - _lastFailureTime <
TimeSpan.FromMinutes(1))
+ {
+ throw new Exception("Circuit breaker is open");
+ }
+
+ try
+ {
+ var result = await operation(_pool);
+ _failureCount = 0; // Reset on success
+ _circuitOpen = false;
+ return result;
+ }
+ catch (SessionPoolDepletedException ex)
+ {
+ _failureCount++;
+ _lastFailureTime = DateTime.Now;
+
+ if (_failureCount >= FailureThreshold)
+ {
+ _circuitOpen = true;
+ Console.WriteLine("Circuit breaker opened - too many
failures");
+ }
+ throw;
+ }
+ }
+}
+```
+
+### Scenario 3: Server Overload
+
+**Symptoms:**
+
+- Intermittent `SessionPoolDepletedException`
+- Both connection timeouts and reconnection failures
+
+**Root Cause:** IoTDB server is overloaded
+
+**Recovery Strategies:**
+
+1. **Implement Rate Limiting:**
+
+```csharp
+using System.Threading;
+
+private SemaphoreSlim _rateLimiter = new SemaphoreSlim(10, 10); // Max 10
concurrent operations
+
+public async Task RateLimitedInsert(string deviceId, RowRecord record)
+{
+ await _rateLimiter.WaitAsync();
+ try
+ {
+ await sessionPool.InsertRecordAsync(deviceId, record);
+ }
+ finally
+ {
+ _rateLimiter.Release();
+ }
+}
+```
+
+2. **Add Timeout Configuration:**
+
+```csharp
+var sessionPool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .Timeout(120) // Increased timeout for slow server
+ .Build();
+```
+
+## Monitoring and Alerting Recommendations
+
+### Health Check Implementation
+
+```csharp
+public class SessionPoolHealthCheck
+{
+ private readonly SessionPool _pool;
+
+ public SessionPoolHealthCheck(SessionPool pool)
+ {
+ _pool = pool;
+ }
+
+ public HealthStatus CheckHealth()
+ {
+ var availableRatio = (double)_pool.AvailableClients /
_pool.TotalPoolSize;
+
+ if (_pool.FailedReconnections > 10)
+ {
+ return new HealthStatus
+ {
+ Status = "Critical",
+ Message = $"High reconnection failures:
{_pool.FailedReconnections}",
+ Recommendation = "Check IoTDB server availability"
+ };
+ }
+
+ if (availableRatio < 0.25)
+ {
+ return new HealthStatus
+ {
+ Status = "Warning",
+ Message = $"Low available clients:
{_pool.AvailableClients}/{_pool.TotalPoolSize}",
+ Recommendation = "Consider increasing pool size"
+ };
+ }
+
+ return new HealthStatus
+ {
+ Status = "Healthy",
+ Message = $"Pool healthy:
{_pool.AvailableClients}/{_pool.TotalPoolSize} available"
+ };
+ }
+}
+
+public class HealthStatus
+{
+ public string Status { get; set; }
+ public string Message { get; set; }
+ public string Recommendation { get; set; }
+}
+```
+
+### Metrics Collection for Monitoring Systems
+
+```csharp
+// Example: Export metrics to Prometheus, StatsD, or similar
+public class SessionPoolMetricsCollector
+{
+ private readonly SessionPool _pool;
+
+ public void CollectMetrics()
+ {
+ // Gauge: Current available clients
+ MetricsCollector.Set("iotdb_pool_available_clients",
_pool.AvailableClients);
+
+ // Gauge: Total pool size
+ MetricsCollector.Set("iotdb_pool_total_size", _pool.TotalPoolSize);
+
+ // Counter: Failed reconnections
+ MetricsCollector.Set("iotdb_pool_failed_reconnections",
_pool.FailedReconnections);
+
+ // Calculated: Pool utilization percentage
+ var utilization = (1.0 - (double)_pool.AvailableClients /
_pool.TotalPoolSize) * 100;
+ MetricsCollector.Set("iotdb_pool_utilization_percent", utilization);
+ }
+}
+```
+
+### Recommended Alert Rules
+
+1. **Critical Alerts:**
+ - `FailedReconnections > 10`: Server connectivity issues
+ - `AvailableClients == 0` for > 30 seconds: Complete pool exhaustion
+
+2. **Warning Alerts:**
+ - `AvailableClients < TotalPoolSize * 0.25`: Pool under pressure
+ - `FailedReconnections > 0` and increasing: Network instability
+
+3. **Info Alerts:**
+ - Pool utilization > 75% for extended periods: Consider scaling
+
+## Best Practices
+
+1. **Pool Sizing:**
+ - Start with poolSize = 2 × expected concurrent operations
+ - Monitor and adjust based on actual usage patterns
+ - Larger pools use more server resources but provide better throughput
+
+2. **Error Handling:**
+ - Always catch `SessionPoolDepletedException` specifically
+ - Log exception properties for debugging
+ - Implement appropriate retry logic based on depletion reason
+
+3. **Monitoring:**
+ - Continuously monitor `AvailableClients` metric
+ - Track `FailedReconnections` as a leading indicator of problems
+ - Set up alerts before pool is completely depleted
+
+4. **Resource Management:**
+ - Always call `sessionPool.Close()` when done
+ - Use `using` statements or try-finally blocks for proper cleanup
+ - Don't create multiple SessionPool instances unnecessarily
+
+## Example: Complete Production-Ready Implementation
+
+```csharp
+using Apache.IoTDB;
+using System;
+using System.Threading.Tasks;
+
+public class ProductionSessionPoolManager
+{
+ private SessionPool _pool;
+ private readonly object _lock = new object();
+
+ public async Task Initialize()
+ {
+ _pool = new SessionPool.Builder()
+ .Host("127.0.0.1")
+ .Port(6667)
+ .PoolSize(8)
+ .Timeout(60)
+ .Build();
+
+ await _pool.Open();
+
+ // Start health monitoring
+ _ = Task.Run(MonitorHealth);
+ }
+
+ public async Task<T> ExecuteWithRetry<T>(Func<SessionPool, Task<T>>
operation)
+ {
+ const int maxRetries = 3;
+ const int baseDelayMs = 1000;
+
+ for (int attempt = 0; attempt < maxRetries; attempt++)
+ {
+ try
+ {
+ return await operation(_pool);
+ }
+ catch (SessionPoolDepletedException ex)
+ {
+ Console.WriteLine($"Attempt {attempt + 1} failed:
{ex.Message}");
+ Console.WriteLine($"Pool state - Available:
{ex.AvailableClients}/{ex.TotalPoolSize}, Failed reconnections:
{ex.FailedReconnections}");
+
+ if (attempt == maxRetries - 1)
+ {
+ // Last attempt failed
+ if (ex.FailedReconnections > 5)
+ {
+ // Reinitialize pool
+ await ReinitializePool();
+ }
+ throw;
+ }
+
+ // Exponential backoff
+ await Task.Delay(baseDelayMs * (int)Math.Pow(2, attempt));
+ }
+ }
+
+ throw new InvalidOperationException("Should not reach here");
+ }
+
+ private async Task ReinitializePool()
+ {
+ lock (_lock)
+ {
+ try
+ {
+ _pool?.Close().Wait();
+ }
+ catch { }
+ }
+
+ await Task.Delay(5000); // Wait for server recovery
+ await Initialize();
+ }
+
+ private async Task MonitorHealth()
+ {
+ while (true)
+ {
+ await Task.Delay(10000); // Check every 10 seconds
+
+ try
+ {
+ var availableRatio = (double)_pool.AvailableClients /
_pool.TotalPoolSize;
+
+ if (_pool.FailedReconnections > 10)
+ {
+ Console.WriteLine($"CRITICAL: {_pool.FailedReconnections}
failed reconnections");
+ }
+ else if (availableRatio < 0.25)
+ {
+ Console.WriteLine($"WARNING: Low available clients -
{_pool.AvailableClients}/{_pool.TotalPoolSize}");
+ }
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Health check failed: {ex.Message}");
+ }
+ }
+ }
+
+ public async Task Cleanup()
+ {
+ await _pool?.Close();
+ }
+}
+```
+
+## Summary
+
+The SessionPool exception handling and health monitoring features provide
comprehensive tools for building robust IoTDB applications:
+
+- Use `SessionPoolDepletedException` to understand and react to pool issues
+- Monitor `AvailableClients`, `TotalPoolSize`, and `FailedReconnections`
metrics
+- Implement appropriate recovery strategies based on failure scenarios
+- Set up proactive monitoring and alerting to prevent issues
+- Follow best practices for pool sizing and resource management
diff --git a/src/Apache.IoTDB.Data/IoTDBDataReader.cs
b/src/Apache.IoTDB.Data/IoTDBDataReader.cs
index 8f96841..fe946b9 100644
--- a/src/Apache.IoTDB.Data/IoTDBDataReader.cs
+++ b/src/Apache.IoTDB.Data/IoTDBDataReader.cs
@@ -56,8 +56,8 @@ namespace Apache.IoTDB.Data
_command = IoTDBCommand;
_closeConnection = closeConnection;
_fieldCount = dataSet.GetColumnNames().Count;
- _hasRows = dataSet.RowCount() > 0;
- _recordsAffected = dataSet.RowCount();
+ _hasRows = dataSet.CurrentBatchRowCount() > 0;
+ _recordsAffected = -1; // Total row count is unknown; use -1 per
ADO.NET convention
_closed = _closeConnection;
_metas = dataSet.GetColumnNames();
diff --git a/src/Apache.IoTDB/ConcurrentClientQueue.cs
b/src/Apache.IoTDB/ConcurrentClientQueue.cs
index e53d3f3..6090c19 100644
--- a/src/Apache.IoTDB/ConcurrentClientQueue.cs
+++ b/src/Apache.IoTDB/ConcurrentClientQueue.cs
@@ -28,6 +28,7 @@ namespace Apache.IoTDB
public class ConcurrentClientQueue
{
public ConcurrentQueue<Client> ClientQueue { get; }
+ internal IPoolDiagnosticReporter DiagnosticReporter { get; set; }
public ConcurrentClientQueue(List<Client> clients)
{
@@ -42,55 +43,62 @@ namespace Apache.IoTDB
public void Return(Client client)
{
Monitor.Enter(ClientQueue);
- ClientQueue.Enqueue(client);
- Monitor.PulseAll(ClientQueue); // wake up all threads waiting on
the queue, refresh the waiting time
- Monitor.Exit(ClientQueue);
- Thread.Sleep(0);
- }
- int _ref = 0;
- public void AddRef()
- {
- lock (this)
+ try
{
- _ref++;
+ ClientQueue.Enqueue(client);
+ Monitor.PulseAll(ClientQueue); // wake up all threads waiting
on the queue, refresh the waiting time
}
- }
- public int GetRef()
- {
- return _ref;
- }
- public void RemoveRef()
- {
- lock (this)
+ finally
{
- _ref--;
+ Monitor.Exit(ClientQueue);
}
+ Thread.Sleep(0);
}
+ private int _ref = 0;
+ public void AddRef() => Interlocked.Increment(ref _ref);
+ public int GetRef() => Volatile.Read(ref _ref);
+ public void RemoveRef() => Interlocked.Decrement(ref _ref);
public int Timeout { get; set; } = 10;
public Client Take()
{
Client client = null;
Monitor.Enter(ClientQueue);
- while (true)
+ try
{
- bool timeout = false;
- if (ClientQueue.IsEmpty)
+ while (true)
{
- timeout = !Monitor.Wait(ClientQueue,
TimeSpan.FromSeconds(Timeout));
- }
- ClientQueue.TryDequeue(out client);
+ bool timeout = false;
+ if (ClientQueue.IsEmpty)
+ {
+ timeout = !Monitor.Wait(ClientQueue,
TimeSpan.FromSeconds(Timeout));
+ }
+ ClientQueue.TryDequeue(out client);
- if (client != null || timeout)
- {
- break;
+ if (client != null || timeout)
+ {
+ break;
+ }
}
}
- Monitor.Exit(ClientQueue);
+ finally
+ {
+ Monitor.Exit(ClientQueue);
+ }
if (client == null)
{
- throw new TimeoutException($"Connection pool is empty and wait
time out({Timeout}s)!");
+ var reasonPhrase = $"Connection pool is empty and wait time
out({Timeout}s)";
+ if (DiagnosticReporter != null)
+ {
+ throw
DiagnosticReporter.BuildDepletionException(reasonPhrase);
+ }
+ throw new TimeoutException(reasonPhrase);
}
return client;
}
}
+
+ internal interface IPoolDiagnosticReporter
+ {
+ SessionPoolDepletedException BuildDepletionException(string
reasonPhrase);
+ }
}
diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
index 3d08843..bc7cde4 100644
--- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
+++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs
@@ -108,7 +108,20 @@ namespace Apache.IoTDB.DataStructure
public IReadOnlyList<string> GetColumnNames() =>
_rpcDataSet._columnNameList;
public IReadOnlyList<string> GetColumnTypes() =>
_rpcDataSet._columnTypeList;
+ /// <summary>
+ /// Gets the number of rows in the current fetched batch (tsBlock).
+ /// Note: This is NOT the total row count of the query result. Use
HasNext() to check for more data.
+ /// </summary>
+ /// <returns>The number of rows in the current batch.</returns>
+ public int CurrentBatchRowCount() => _rpcDataSet._tsBlockSize;
+
+ /// <summary>
+ /// Gets the number of rows in the current fetched batch.
+ /// </summary>
+ /// <returns>The number of rows in the current batch.</returns>
+ [Obsolete("Use CurrentBatchRowCount() instead. This method returns
batch size, not total row count.")]
public int RowCount() => _rpcDataSet._tsBlockSize;
+
public void ShowTableNames()
{
IReadOnlyList<string> columns = GetColumnNames();
diff --git a/src/Apache.IoTDB/PoolHealthMetrics.cs
b/src/Apache.IoTDB/PoolHealthMetrics.cs
new file mode 100644
index 0000000..52bcc5f
--- /dev/null
+++ b/src/Apache.IoTDB/PoolHealthMetrics.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Threading;
+
+namespace Apache.IoTDB
+{
+ /// <summary>
+ /// Encapsulates real-time health statistics for connection pool
monitoring.
+ /// Thread-safe implementation for concurrent access patterns.
+ /// </summary>
+ internal class PoolHealthMetrics
+ {
+ private int _reconnectionFailureTally;
+ private readonly int _configuredMaxSize;
+
+ public PoolHealthMetrics(int configuredMaxSize)
+ {
+ _configuredMaxSize = configuredMaxSize;
+ }
+
+ public void IncrementReconnectionFailures()
+ {
+ Interlocked.Increment(ref _reconnectionFailureTally);
+ }
+
+ public void ResetAllCounters()
+ {
+ Interlocked.Exchange(ref _reconnectionFailureTally, 0);
+ }
+
+ public int GetReconnectionFailureTally() => Volatile.Read(ref
_reconnectionFailureTally);
+
+ public int GetConfiguredMaxSize() => _configuredMaxSize;
+ }
+}
diff --git a/src/Apache.IoTDB/ReconnectionFailedException.cs
b/src/Apache.IoTDB/ReconnectionFailedException.cs
new file mode 100644
index 0000000..db80d6b
--- /dev/null
+++ b/src/Apache.IoTDB/ReconnectionFailedException.cs
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Thrift;
+
+namespace Apache.IoTDB
+{
+ /// <summary>
+ /// Exception thrown when all reconnection attempts to the server have
failed.
+ /// This exception is used internally to distinguish reconnection failures
from other errors.
+ /// </summary>
+ internal class ReconnectionFailedException : TException
+ {
+ internal ReconnectionFailedException(string message)
+ : base(message, null)
+ {
+ }
+
+ internal ReconnectionFailedException(string message, Exception
innerException)
+ : base(message, innerException)
+ {
+ }
+ }
+}
diff --git a/src/Apache.IoTDB/Rpc/TSStatusCode.cs
b/src/Apache.IoTDB/Rpc/TSStatusCode.cs
index e2eaca8..3f6cee9 100644
--- a/src/Apache.IoTDB/Rpc/TSStatusCode.cs
+++ b/src/Apache.IoTDB/Rpc/TSStatusCode.cs
@@ -285,4 +285,4 @@ namespace Apache.IoTDB
}
}
-}
\ No newline at end of file
+}
diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs
index fc2eca0..1d2aecf 100644
--- a/src/Apache.IoTDB/SessionPool.cs
+++ b/src/Apache.IoTDB/SessionPool.cs
@@ -21,9 +21,9 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
+using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
-using System.Security.Cryptography.X509Certificates;
using Apache.IoTDB.DataStructure;
using Microsoft.Extensions.Logging;
using Thrift;
@@ -34,9 +34,10 @@ using Thrift.Transport.Client;
namespace Apache.IoTDB
{
- public partial class SessionPool : IDisposable
+ public partial class SessionPool : IDisposable, IPoolDiagnosticReporter
{
private static readonly TSProtocolVersion ProtocolVersion =
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
+ private const string DepletionReasonReconnectFailed = "Reconnection
failed";
private readonly string _username;
private readonly string _password;
@@ -62,8 +63,25 @@ namespace Apache.IoTDB
private bool _isClose = true;
private ConcurrentClientQueue _clients;
private ILogger _logger;
+ private PoolHealthMetrics _healthMetrics;
+
public delegate Task<TResult> AsyncOperation<TResult>(Client client);
+ /// <summary>
+ /// Retrieves current count of idle clients ready for operations.
+ /// </summary>
+ public int AvailableClients => _clients?.ClientQueue.Count ?? 0;
+
+ /// <summary>
+ /// Retrieves the configured maximum capacity of the session pool.
+ /// </summary>
+ public int TotalPoolSize => _healthMetrics?.GetConfiguredMaxSize() ??
_poolSize;
+
+ /// <summary>
+ /// Retrieves cumulative tally of reconnection failures since pool was
opened.
+ /// </summary>
+ public int FailedReconnections =>
_healthMetrics?.GetReconnectionFailureTally() ?? 0;
+
[Obsolete("This method is deprecated, please use new
SessionPool.Builder().")]
public SessionPool(string host, int port, int poolSize)
@@ -157,52 +175,62 @@ namespace Apache.IoTDB
public async Task<TResult>
ExecuteClientOperationAsync<TResult>(AsyncOperation<TResult> operation, string
errMsg, bool retryOnFailure = true, bool putClientBack = true)
{
Client client = _clients.Take();
+ bool shouldReturnClient = true;
+ bool operationSucceeded = false;
try
{
var resp = await operation(client);
+ operationSucceeded = true;
return resp;
}
- catch (TException ex)
+ catch (Exception ex)
{
if (retryOnFailure)
{
+ // Try to reconnect
try
{
client = await Reconnect(client);
- return await operation(client);
+ // Reconnect succeeded, client is now a new healthy
connection
}
- catch (TException retryEx)
+ catch (ReconnectionFailedException reconnectEx)
{
- throw new TException(errMsg, retryEx);
+ // Reconnection failed - original client was closed by
Reconnect
+ shouldReturnClient = false;
+ throw new
SessionPoolDepletedException(DepletionReasonReconnectFailed, AvailableClients,
TotalPoolSize, FailedReconnections, reconnectEx);
}
- }
- else
- {
- throw new TException(errMsg, ex);
- }
- }
- catch (Exception ex)
- {
- if (retryOnFailure)
- {
+
+ // Reconnect succeeded, try the operation again
try
{
- client = await Reconnect(client);
- return await operation(client);
+ var resp = await operation(client);
+ operationSucceeded = true;
+ return resp;
}
- catch (TException retryEx)
+ catch (Exception retryEx)
{
- throw new TException(errMsg, retryEx);
+ // Retry operation failed, but client is healthy and
should be returned to pool
+ // shouldReturnClient remains true
+ string detailedMsg = $"{errMsg}. {retryEx.Message}";
+ throw new TException(detailedMsg, retryEx);
}
}
else
{
- throw new TException(errMsg, ex);
+ // Preserve original error message from server
+ string detailedMsg = $"{errMsg}. {ex.Message}";
+ throw new TException(detailedMsg, ex);
}
}
finally
{
- if (putClientBack)
+ // Return client to pool if:
+ // 1. putClientBack is true (normal operations - client should
always be returned), OR
+ // 2. putClientBack is false (query operations) BUT operation
failed, meaning SessionDataSet
+ // wasn't created and won't manage the client
+ // Do NOT return if reconnection failed (shouldReturnClient is
false) because client was closed by Reconnect
+ bool shouldReturnForQueryFailure = !putClientBack &&
!operationSucceeded;
+ if (shouldReturnClient && (putClientBack ||
shouldReturnForQueryFailure))
{
_clients.Add(client);
}
@@ -237,8 +265,10 @@ namespace Apache.IoTDB
public async Task Open(CancellationToken cancellationToken = default)
{
+ _healthMetrics = new PoolHealthMetrics(_poolSize);
_clients = new ConcurrentClientQueue();
_clients.Timeout = _timeout * 5;
+ _clients.DiagnosticReporter = this;
if (_nodeUrls.Count == 0)
{
@@ -250,10 +280,7 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Currently connecting to
{0}:{1} failed", _host, _port);
- }
+ _logger?.LogWarning(e, "Failed to create connection
{0}/{1} to {2}:{3}", index + 1, _poolSize, _host, _port);
}
}
}
@@ -277,10 +304,7 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Currently connecting to
{0}:{1} failed", endPoint.Ip, endPoint.Port);
- }
+ _logger?.LogWarning(e, "Failed to create
connection to {0}:{1}", endPoint.Ip, endPoint.Port);
}
}
if (!isConnected) // current client could not connect to
any endpoint
@@ -313,10 +337,7 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Attempt reconnecting to
{0}:{1} failed", _host, _port);
- }
+ _logger?.LogWarning(e, "Reconnection attempt {0}/{1}
to {2}:{3} failed", attempt, RetryNum, _host, _port);
}
}
}
@@ -340,16 +361,14 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- if (_debugMode)
- {
- _logger.LogWarning(e, "Attempt connecting to
{0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
- }
+ _logger?.LogWarning(e, "Reconnection attempt
{0}/{1} to {2}:{3} failed", attempt, RetryNum, _endPoints[j].Ip,
_endPoints[j].Port);
}
}
}
}
- throw new TException("Error occurs when reconnecting session pool.
Could not connect to any server", null);
+ _healthMetrics?.IncrementReconnectionFailures();
+ throw new ReconnectionFailedException("Error occurs when
reconnecting session pool. Could not connect to any server");
}
public bool IsOpen() => !_isClose;
@@ -1408,7 +1427,8 @@ namespace Apache.IoTDB
if (_database != previousDB)
{
// all client should switch to the same database
- foreach (var c in _clients.ClientQueue)
+ var failedClients = new List<(long SessionId,
Exception Error)>();
+ foreach (var c in _clients.ClientQueue.AsEnumerable())
{
try
{
@@ -1420,10 +1440,17 @@ namespace Apache.IoTDB
}
catch (Exception e)
{
- _logger.LogError("switch database from {0} to
{1} failed for {2}, error: {3}", previousDB, _database, c.SessionId, e.Message);
+ failedClients.Add((c.SessionId, e));
+ _logger?.LogError("switch database from {0} to
{1} failed for {2}, error: {3}", previousDB, _database, c.SessionId, e.Message);
}
}
- _logger.LogInformation("switch database from {0} to
{1}", previousDB, _database);
+
+ if (failedClients.Count > 0)
+ {
+ throw new TException($"Database switch partially
failed: {failedClients.Count} client(s) could not switch from {previousDB} to
{_database}", failedClients[0].Error);
+ }
+
+ _logger?.LogInformation("switch database from {0} to
{1}", previousDB, _database);
}
if (_debugMode)
@@ -1802,5 +1829,8 @@ namespace Apache.IoTDB
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
+
+ SessionPoolDepletedException
IPoolDiagnosticReporter.BuildDepletionException(string reasonPhrase)
+ => new SessionPoolDepletedException(reasonPhrase,
AvailableClients, TotalPoolSize, FailedReconnections);
}
}
diff --git a/src/Apache.IoTDB/SessionPoolDepletedException.cs
b/src/Apache.IoTDB/SessionPoolDepletedException.cs
new file mode 100644
index 0000000..e489ba0
--- /dev/null
+++ b/src/Apache.IoTDB/SessionPoolDepletedException.cs
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Thrift;
+
+namespace Apache.IoTDB
+{
+ /// <summary>
+ /// Specialized exception raised when SessionPool cannot allocate a client
connection.
+ /// Includes diagnostic data for troubleshooting capacity and connectivity
problems.
+ /// </summary>
+ public class SessionPoolDepletedException : TException
+ {
+ /// <summary>
+ /// Descriptive explanation of what caused the pool depletion event.
+ /// </summary>
+ public string DepletionReason { get; }
+
+ /// <summary>
+ /// Number of clients available for use at the moment of exception.
+ /// </summary>
+ public int AvailableClients { get; }
+
+ /// <summary>
+ /// Maximum configured pool size limit.
+ /// </summary>
+ public int TotalPoolSize { get; }
+
+ /// <summary>
+ /// Accumulated number of unsuccessful reconnection attempts.
+ /// </summary>
+ public int FailedReconnections { get; }
+
+ internal SessionPoolDepletedException(
+ string depletionReason,
+ int availableClients,
+ int totalPoolSize,
+ int failedReconnections)
+ : this(depletionReason, availableClients, totalPoolSize,
failedReconnections, null)
+ {
+ }
+
+ internal SessionPoolDepletedException(
+ string depletionReason,
+ int availableClients,
+ int totalPoolSize,
+ int failedReconnections,
+ Exception causedBy)
+ : base($"SessionPool depletion detected: {depletionReason}",
causedBy)
+ {
+ DepletionReason = depletionReason;
+ AvailableClients = availableClients;
+ TotalPoolSize = totalPoolSize;
+ FailedReconnections = failedReconnections;
+ }
+ }
+}