This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-csharp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5211c9c  Handle IOException and SocketException (#14)
5211c9c is described below

commit 5211c9c7f1c9f0d39452d0ac3e6f0b82331e6d9d
Author: Zhan Lu <[email protected]>
AuthorDate: Wed Aug 28 11:10:37 2024 +0800

    Handle IOException and SocketException (#14)
---
 src/Apache.IoTDB/SessionPool.cs | 90 +++++++++++++++++++++++++++++------------
 1 file changed, 65 insertions(+), 25 deletions(-)

diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs
index 7b9a09b..5e5ddb5 100644
--- a/src/Apache.IoTDB/SessionPool.cs
+++ b/src/Apache.IoTDB/SessionPool.cs
@@ -122,13 +122,27 @@ namespace Apache.IoTDB
             {
                 if (retryOnFailure)
                 {
-                    client = await Reconnect(client);
-                    try
+                    try{
+                        client = await Reconnect(client);
+                        return await operation(client);
+                    } catch (TException retryEx)
                     {
-                        var resp = await operation(client);
-                        return resp;
+                        throw new TException(errMsg, retryEx);
                     }
-                    catch (TException retryEx)
+                }
+                else
+                {
+                    throw new TException(errMsg, ex);
+                }
+            }
+            catch (Exception ex)
+            {
+                if (retryOnFailure)
+                {
+                    try{
+                        client = await Reconnect(client);
+                        return await operation(client);
+                    } catch (TException retryEx)
                     {
                         throw new TException(errMsg, retryEx);
                     }
@@ -179,7 +193,17 @@ namespace Apache.IoTDB
             {
                 for (var index = 0; index < _poolSize; index++)
                 {
-                    _clients.Add(await CreateAndOpen(_host, _port, 
_enableRpcCompression, _timeout, cancellationToken));
+                    try
+                    {
+                        _clients.Add(await CreateAndOpen(_host, _port, 
_enableRpcCompression, _timeout, cancellationToken));
+                    }
+                    catch (Exception e)
+                    {
+                        if (_debugMode)
+                        {
+                            _logger.LogWarning(e, "Currently connecting to 
{0}:{1} failed", _host, _port);
+                        }
+                    }
                 }
             }
             else
@@ -217,7 +241,7 @@ namespace Apache.IoTDB
 
             if (_clients.ClientQueue.Count != _poolSize)
             {
-                throw new TException(string.Format("Error occurs when opening 
session pool. Client pool size is not equal to the expected size. Client pool 
size: {0}, expected size: {1}", _clients.ClientQueue.Count, _poolSize), null);
+                throw new TException(string.Format("Error occurs when opening 
session pool. Client pool size is not equal to the expected size. Client pool 
size: {0}, expected size: {1}, Please check the server status", 
_clients.ClientQueue.Count, _poolSize), null);
             }
             _isClose = false;
         }
@@ -225,39 +249,55 @@ namespace Apache.IoTDB
 
         public async Task<Client> Reconnect(Client originalClient = null, 
CancellationToken cancellationToken = default)
         {
-            if (_nodeUrls.Count == 0)
-            {
-                await Open(_enableRpcCompression);
-                return _clients.Take();
-            }
-
-            originalClient.Transport.Close();
+            originalClient?.Transport.Close();
 
-            int startIndex = _endPoints.FindIndex(x => x.Ip == 
originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
-            if (startIndex == -1)
-            {
-                throw new ArgumentException($"The original client is not in 
the list of endpoints. Original client: 
{originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
-            }
-
-            for (int attempt = 1; attempt <= RetryNum; attempt++)
+            if (_nodeUrls.Count == 0)
             {
-                for (int i = 0; i < _endPoints.Count; i++)
+                for (int attempt = 1; attempt <= RetryNum; attempt++)
                 {
-                    int j = (startIndex + i) % _endPoints.Count;
                     try
                     {
-                        var client = await CreateAndOpen(_endPoints[j].Ip, 
_endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
+                        var client = await CreateAndOpen(_host, _port, 
_enableRpcCompression, _timeout, cancellationToken);
                         return client;
                     }
                     catch (Exception e)
                     {
                         if (_debugMode)
                         {
-                            _logger.LogWarning(e, "Attempt connecting to 
{0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
+                            _logger.LogWarning(e, "Attempt reconnecting to 
{0}:{1} failed", _host, _port);
+                        }
+                    }
+                }
+            }
+            else
+            {
+                int startIndex = _endPoints.FindIndex(x => x.Ip == 
originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
+                if (startIndex == -1)
+                {
+                    throw new ArgumentException($"The original client is not 
in the list of endpoints. Original client: 
{originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
+                }
+
+                for (int attempt = 1; attempt <= RetryNum; attempt++)
+                {
+                    for (int i = 0; i < _endPoints.Count; i++)
+                    {
+                        int j = (startIndex + i) % _endPoints.Count;
+                        try
+                        {
+                            var client = await CreateAndOpen(_endPoints[j].Ip, 
_endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
+                            return client;
+                        }
+                        catch (Exception e)
+                        {
+                            if (_debugMode)
+                            {
+                                _logger.LogWarning(e, "Attempt connecting to 
{0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
+                            }
                         }
                     }
                 }
             }
+
             throw new TException("Error occurs when reconnecting session pool. 
Could not connect to any server", null);
         }
 

Reply via email to