This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-csharp.git
The following commit(s) were added to refs/heads/main by this push:
new 5211c9c Handle IOException and SocketException (#14)
5211c9c is described below
commit 5211c9c7f1c9f0d39452d0ac3e6f0b82331e6d9d
Author: Zhan Lu <[email protected]>
AuthorDate: Wed Aug 28 11:10:37 2024 +0800
Handle IOException and SocketException (#14)
---
src/Apache.IoTDB/SessionPool.cs | 90 +++++++++++++++++++++++++++++------------
1 file changed, 65 insertions(+), 25 deletions(-)
diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs
index 7b9a09b..5e5ddb5 100644
--- a/src/Apache.IoTDB/SessionPool.cs
+++ b/src/Apache.IoTDB/SessionPool.cs
@@ -122,13 +122,27 @@ namespace Apache.IoTDB
{
if (retryOnFailure)
{
- client = await Reconnect(client);
- try
+ try{
+ client = await Reconnect(client);
+ return await operation(client);
+ } catch (TException retryEx)
{
- var resp = await operation(client);
- return resp;
+ throw new TException(errMsg, retryEx);
}
- catch (TException retryEx)
+ }
+ else
+ {
+ throw new TException(errMsg, ex);
+ }
+ }
+ catch (Exception ex)
+ {
+ if (retryOnFailure)
+ {
+ try{
+ client = await Reconnect(client);
+ return await operation(client);
+ } catch (TException retryEx)
{
throw new TException(errMsg, retryEx);
}
@@ -179,7 +193,17 @@ namespace Apache.IoTDB
{
for (var index = 0; index < _poolSize; index++)
{
- _clients.Add(await CreateAndOpen(_host, _port,
_enableRpcCompression, _timeout, cancellationToken));
+ try
+ {
+ _clients.Add(await CreateAndOpen(_host, _port,
_enableRpcCompression, _timeout, cancellationToken));
+ }
+ catch (Exception e)
+ {
+ if (_debugMode)
+ {
+ _logger.LogWarning(e, "Currently connecting to
{0}:{1} failed", _host, _port);
+ }
+ }
}
}
else
@@ -217,7 +241,7 @@ namespace Apache.IoTDB
if (_clients.ClientQueue.Count != _poolSize)
{
- throw new TException(string.Format("Error occurs when opening
session pool. Client pool size is not equal to the expected size. Client pool
size: {0}, expected size: {1}", _clients.ClientQueue.Count, _poolSize), null);
+ throw new TException(string.Format("Error occurs when opening
session pool. Client pool size is not equal to the expected size. Client pool
size: {0}, expected size: {1}, Please check the server status",
_clients.ClientQueue.Count, _poolSize), null);
}
_isClose = false;
}
@@ -225,39 +249,55 @@ namespace Apache.IoTDB
public async Task<Client> Reconnect(Client originalClient = null,
CancellationToken cancellationToken = default)
{
- if (_nodeUrls.Count == 0)
- {
- await Open(_enableRpcCompression);
- return _clients.Take();
- }
-
- originalClient.Transport.Close();
+ originalClient?.Transport.Close();
- int startIndex = _endPoints.FindIndex(x => x.Ip ==
originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
- if (startIndex == -1)
- {
- throw new ArgumentException($"The original client is not in
the list of endpoints. Original client:
{originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
- }
-
- for (int attempt = 1; attempt <= RetryNum; attempt++)
+ if (_nodeUrls.Count == 0)
{
- for (int i = 0; i < _endPoints.Count; i++)
+ for (int attempt = 1; attempt <= RetryNum; attempt++)
{
- int j = (startIndex + i) % _endPoints.Count;
try
{
- var client = await CreateAndOpen(_endPoints[j].Ip,
_endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
+ var client = await CreateAndOpen(_host, _port,
_enableRpcCompression, _timeout, cancellationToken);
return client;
}
catch (Exception e)
{
if (_debugMode)
{
- _logger.LogWarning(e, "Attempt connecting to
{0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
+ _logger.LogWarning(e, "Attempt reconnecting to
{0}:{1} failed", _host, _port);
+ }
+ }
+ }
+ }
+ else
+ {
+ int startIndex = _endPoints.FindIndex(x => x.Ip ==
originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
+ if (startIndex == -1)
+ {
+ throw new ArgumentException($"The original client is not
in the list of endpoints. Original client:
{originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
+ }
+
+ for (int attempt = 1; attempt <= RetryNum; attempt++)
+ {
+ for (int i = 0; i < _endPoints.Count; i++)
+ {
+ int j = (startIndex + i) % _endPoints.Count;
+ try
+ {
+ var client = await CreateAndOpen(_endPoints[j].Ip,
_endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
+ return client;
+ }
+ catch (Exception e)
+ {
+ if (_debugMode)
+ {
+ _logger.LogWarning(e, "Attempt connecting to
{0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
+ }
}
}
}
}
+
throw new TException("Error occurs when reconnecting session pool.
Could not connect to any server", null);
}