This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch 2.0.x
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/2.0.x by this push:
     new 34d9df0  AMQNET-722 Connection timing new options
     new 02b9171  Merge pull request #69 from lukeabsent/AMQNET-722
34d9df0 is described below

commit 34d9df006373262e77f6648051a54b92b47969d7
Author: lukeabsent <[email protected]>
AuthorDate: Fri Jul 16 12:39:43 2021 +0200

    AMQNET-722 Connection timing new options
---
 docs/configuration.md                              |  1 +
 src/NMS.AMQP/Meta/NmsConnectionInfo.cs             |  1 +
 src/NMS.AMQP/NmsConnectionFactory.cs               | 61 +++++++++++++--
 src/NMS.AMQP/Provider/Failover/FailoverProvider.cs |  8 +-
 test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs | 66 +++++++++++++++-
 .../Provider/FailoverProviderFactoryTest.cs        |  5 +-
 .../Provider/FailoverProviderTest.cs               | 87 +++++++++++++++++++++-
 .../Provider/Mock/MockProviderStats.cs             |  3 +
 8 files changed, 220 insertions(+), 12 deletions(-)

diff --git a/docs/configuration.md b/docs/configuration.md
index 2b3046e..3375d45 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -23,6 +23,7 @@ The options apply to the behavior of the NMS objects such as 
Connection, Session
 - **nms.requestTimeout** Timeout value that controls how long the client waits 
on completion of various synchronous interactions, such as opening a producer 
or consumer, before returning an error. Does not affect synchronous message 
sends. By default the client will wait indefinitely for a request to complete.
 - **nms.clientIdPrefix** Optional prefix value that is used for generated 
Client ID values when a new Connection is created for the JMS 
ConnectionFactory. The default prefix is 'ID:'.
 - **nms.connectionIdPrefix** Optional prefix value that is used for generated 
Connection ID values when a new Connection is created for the JMS 
ConnectionFactory. This connection ID is used when logging some information 
from the JMS Connection object so a configurable prefix can make breadcrumbing 
the logs easier. The default prefix is 'ID:'.
+- **nms.maxNewConnectionRatePerSec** Allowed approximated rate for how fast 
connection factory is allowed to create new connection. If there is more 
request, they will have to wait. Default value is -1 which means unlimited.
 
 ### TCP Transport Configuration options
 When connected to a remote using plain TCP these options configure the 
behaviour of the underlying socket. These options are appended to the 
connection URI along with the other configuration options, for example:
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs 
b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index f57c5d4..b656037 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -34,6 +34,7 @@ namespace Apache.NMS.AMQP.Meta
         public static readonly int DEFAULT_IDLE_TIMEOUT;
         public static readonly ushort DEFAULT_CHANNEL_MAX;
         public static readonly int DEFAULT_MAX_FRAME_SIZE;
+        public static double DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC = -1;
 
         static NmsConnectionInfo()
         {
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs 
b/src/NMS.AMQP/NmsConnectionFactory.cs
index 66543df..6729bee 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Specialized;
+using System.Threading;
 using System.Threading.Tasks;
 using Apache.NMS.AMQP.Meta;
 using Apache.NMS.AMQP.Provider;
@@ -36,6 +37,9 @@ namespace Apache.NMS.AMQP
         private IdGenerator clientIdGenerator;
         private IdGenerator connectionIdGenerator;
         private readonly object syncRoot = new object();
+        
+        DateTime nextAllowedConnectionCreationTime = DateTime.MinValue;
+
 
         public NmsConnectionFactory(string userName, string password)
         {
@@ -110,7 +114,7 @@ namespace Apache.NMS.AMQP
         /// User name value used to authenticate the connection
         /// </summary>
         public string UserName { get; set; }
-        
+
         /// <summary>
         /// The password value used to authenticate the connection
         /// </summary>
@@ -121,7 +125,7 @@ namespace Apache.NMS.AMQP
         /// before returning an error. Does not affect synchronous message 
sends. By default the client will wait indefinitely for a request to complete.
         /// </summary>
         public long RequestTimeout { get; set; } = 
NmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
-        
+
         /// <summary>
         /// Timeout value that controls how long the client waits on 
completion of a synchronous message send before returning an error.
         /// By default the client will wait indefinitely for a send to 
complete.
@@ -146,7 +150,7 @@ namespace Apache.NMS.AMQP
         /// the logs easier. The default prefix is 'ID:'.
         /// </summary>
         public string ConnectionIdPrefix { get; set; }
-        
+
         /// <summary>
         /// Optional prefix value that is used for generated Client ID values 
when a new Connection is created for the JMS ConnectionFactory.
         /// The default prefix is 'ID:'.
@@ -164,7 +168,15 @@ namespace Apache.NMS.AMQP
         /// </summary>
         public string ClientId { get; set; }
 
-        public IConnection CreateConnection()
+        /// <summary>
+        /// Sets the desired max rate of creating new connections by this 
factory.
+        ///
+        /// NOTE: During creating new connection if the rate is too high 
system will
+        /// try to suspend creation execution to force the desired max rate of 
new connection creation
+        /// </summary>
+        public double MaxNewConnectionRatePerSec { get; set; } = 
NmsConnectionInfo.DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC;
+        
+    public IConnection CreateConnection()
         {
             return CreateConnection(UserName, Password);
         }
@@ -174,15 +186,17 @@ namespace Apache.NMS.AMQP
             return CreateConnectionAsync(UserName, Password);
         }
 
-        public Task<IConnection> CreateConnectionAsync(string userName, string 
password)
+        public IConnection CreateConnection(string userName, string password)
         {
-            return Task.FromResult(CreateConnection(userName, password));
+            return CreateConnectionAsync(userName, password).GetAsyncResult();
         }
-       
-        public IConnection CreateConnection(string userName, string password)
+
+        public async Task<IConnection> CreateConnectionAsync(string userName, 
string password)
         {
             try
             {
+                await CheckMaxNewConnectionRate();
+
                 NmsConnectionInfo connectionInfo = 
ConfigureConnectionInfo(userName, password);
                 IProvider provider = ProviderFactory.Create(BrokerUri);
                 return new NmsConnection(connectionInfo, provider);
@@ -192,7 +206,38 @@ namespace Apache.NMS.AMQP
                 throw NMSExceptionSupport.Create(e);
             }
         }
+        
+        private async Task CheckMaxNewConnectionRate()
+        {
+            if (MaxNewConnectionRatePerSec != 
NmsConnectionInfo.DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC)
+            {
+                TimeSpan waitTime = TimeSpan.Zero;
+                lock (syncRoot)
+                {
+                    TimeSpan waitTimeForNewConnection = 
TimeSpan.FromMilliseconds(1_000.0 / MaxNewConnectionRatePerSec);
+                    
+                    DateTime now = DateTime.Now;
+
+                    if (nextAllowedConnectionCreationTime > now)
+                    {
+                        waitTime = (nextAllowedConnectionCreationTime - now) + 
waitTimeForNewConnection;
+                    }
+                    else
+                    {
+                        waitTime = TimeSpan.Zero;
+                        nextAllowedConnectionCreationTime = now;
+                    }
+
+                    nextAllowedConnectionCreationTime += 
waitTimeForNewConnection;
+                }
 
+                if (waitTime > TimeSpan.Zero)
+                {
+                    await Task.Delay(waitTime);
+                }
+            }
+        }
+        
         public INMSContext CreateContext()
         {
             return new NmsContext((NmsConnection)CreateConnection(), 
AcknowledgementMode.AutoAcknowledge);
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs 
b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index 80d4b89..25f6529 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -35,6 +35,7 @@ namespace Apache.NMS.AMQP.Provider.Failover
 
         public static int DEFAULT_INITIAL_RECONNECT_DELAY = 0;
         public static long DEFAULT_RECONNECT_DELAY = 10;
+        public static double DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR = 0.0d;
         public static double DEFAULT_RECONNECT_BACKOFF_MULTIPLIER = 2.0d;
         public static long DEFAULT_MAX_RECONNECT_DELAY = (long) 
Math.Round(TimeSpan.FromSeconds(30).TotalMilliseconds);
         public static int DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS = UNDEFINED;
@@ -69,6 +70,7 @@ namespace Apache.NMS.AMQP.Provider.Failover
 
         public long InitialReconnectDelay { get; set; } = 
DEFAULT_INITIAL_RECONNECT_DELAY;
         public long ReconnectDelay { get; set; } = DEFAULT_RECONNECT_DELAY;
+        public double ReconnectDelayRandomFactor { get; set; } = 
DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR;
         public bool UseReconnectBackOff { get; set; } = 
DEFAULT_USE_RECONNECT_BACKOFF;
         public double ReconnectBackOffMultiplier { get; set; } = 
DEFAULT_RECONNECT_BACKOFF_MULTIPLIER;
         public long MaxReconnectDelay { get; set; } = 
DEFAULT_MAX_RECONNECT_DELAY;
@@ -597,6 +599,7 @@ namespace Apache.NMS.AMQP.Provider.Failover
             private volatile bool recoveryRequired;
             private long reconnectAttempts;
             private long nextReconnectDelay = -1;
+            private Random random = new Random();
 
             public ReconnectControls(FailoverProvider failoverProvider)
             {
@@ -671,7 +674,10 @@ namespace Apache.NMS.AMQP.Provider.Failover
                     }
                 }
 
-                return nextReconnectDelay;
+                long randomFactor = (long) ((1 - 2 * random.NextDouble()) *
+                                            
failoverProvider.ReconnectDelayRandomFactor * nextReconnectDelay);
+
+                return Math.Min(failoverProvider.MaxReconnectDelay, 
nextReconnectDelay + randomFactor);
             }
 
             public long RecordNextAttempt()
diff --git a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs 
b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
index 49a241a..3962e36 100644
--- a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
@@ -16,8 +16,12 @@
  */
 
 using System;
+using System.Diagnostics;
+using System.Threading.Tasks;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Provider;
+using NMS.AMQP.Test.Provider.Mock;
 using NUnit.Framework;
 
 namespace NMS.AMQP.Test
@@ -28,6 +32,16 @@ namespace NMS.AMQP.Test
         private static readonly string USER = "USER";
         private static readonly string PASSWORD = "PASSWORD";
 
+        private MockRemotePeer mockPeer;
+
+        [SetUp]
+        public void SetUp()
+        {
+            mockPeer = new MockRemotePeer();
+            mockPeer.Start();
+            ProviderFactory.RegisterProviderFactory("mock", new 
MockProviderFactory());
+        }
+        
         [Test]
         public void TestConnectionFactoryCreate()
         {
@@ -77,7 +91,8 @@ namespace NMS.AMQP.Test
                                 "&nms.clientIDPrefix=clientId" +
                                 "&nms.requestTimeout=1000" +
                                 "&nms.sendTimeout=1000" +
-                                "&nms.localMessageExpiry=false";
+                                "&nms.localMessageExpiry=false" +
+                                "&nms.maxNewConnectionRatePerSec=4";
 
             NmsConnectionFactory factory = new 
NmsConnectionFactory(configuredUri);
 
@@ -88,6 +103,7 @@ namespace NMS.AMQP.Test
             Assert.AreEqual("clientId", factory.ClientIdPrefix);
             Assert.AreEqual(1000, factory.RequestTimeout);
             Assert.AreEqual(1000, factory.SendTimeout);
+            Assert.AreEqual(4, factory.MaxNewConnectionRatePerSec);
             Assert.IsFalse(factory.LocalMessageExpiry);
         }
         
@@ -136,5 +152,53 @@ namespace NMS.AMQP.Test
             NmsConnectionFactory factory = new 
NmsConnectionFactory("bad://127.0.0.1:5763");
             Assert.Throws<NMSException>(() => factory.CreateConnection());
         }
+        
+        [Test, Timeout(12000)]
+        public void TestMaxNewConnectionRatePerSec()
+        {
+            double desiredRatePerSec = 5;
+            
+            NmsConnectionFactory factory = new NmsConnectionFactory(
+                "failover:(mock://localhost?mock.failOnConnect=true)" +
+                "?failover.maxReconnectAttempts=0" +
+                "&nms.maxNewConnectionRatePerSec="+desiredRatePerSec);
+
+            int testTimeMs = 5000;
+
+            int mainCounter = 0;
+
+            Parallel.For(0, 4, (i) =>
+            {
+                Stopwatch st = Stopwatch.StartNew();
+                IConnection connection = null;
+                int counter = -1;
+                do
+                {
+                    try
+                    {
+                        counter++;
+                        connection = factory.CreateConnection();
+                        connection.Start();
+                        Assert.Fail("Should have stopped after predefined 
number of retries.");
+                    }
+                    catch (NMSException)
+                    {
+                    }
+                    finally
+                    {
+                        connection?.Close();
+                    }
+                } while (st.ElapsedMilliseconds < testTimeMs);
+
+                lock (factory)
+                {
+                    mainCounter += counter;
+                }
+            });
+
+            double ratePerSec = 1000.0 * mainCounter / testTimeMs;
+            
+            Assert.AreEqual(desiredRatePerSec, ratePerSec, 1);
+        }
     }
 }
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs 
b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
index 52a9f56..ffc17df 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderFactoryTest.cs
@@ -69,7 +69,9 @@ namespace NMS.AMQP.Test.Provider
                                     "&failover.maxReconnectAttempts=" + 
(FailoverProvider.DEFAULT_MAX_RECONNECT_ATTEMPTS + 5) +
                                     "&failover.warnAfterReconnectAttempts=" + 
(FailoverProvider.DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS + 6) +
                                     "&failover.useReconnectBackOff=" + 
(!FailoverProvider.DEFAULT_USE_RECONNECT_BACKOFF) +
-                                    "&failover.reconnectBackOffMultiplier=" + 
(FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d));
+                                    "&failover.reconnectBackOffMultiplier=" + 
(FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d) +
+                                    "&failover.reconnectDelayRandomFactor=" 
+(FailoverProvider.DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR + 1.0d)
+            );
 
             FailoverProvider failover = ProviderFactory.Create(configured) as 
FailoverProvider;
             Assert.IsNotNull(failover);
@@ -82,6 +84,7 @@ namespace NMS.AMQP.Test.Provider
             
Assert.AreEqual(FailoverProvider.DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS + 6, 
failover.WarnAfterReconnectAttempts);
             Assert.AreEqual(!FailoverProvider.DEFAULT_USE_RECONNECT_BACKOFF, 
failover.UseReconnectBackOff);
             
Assert.AreEqual(FailoverProvider.DEFAULT_RECONNECT_BACKOFF_MULTIPLIER + 1.0d, 
failover.ReconnectBackOffMultiplier, 0.0);
+            
Assert.AreEqual(FailoverProvider.DEFAULT_RECONNECT_DELAY_RANDOM_FACTOR + 1.0d, 
failover.ReconnectDelayRandomFactor, 0.0);
         }
 
     }
diff --git a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs 
b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
index 1d7410e..ce7d28a 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
@@ -17,13 +17,13 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading.Tasks;
 using Apache.NMS;
 using Apache.NMS.AMQP;
 using Apache.NMS.AMQP.Meta;
 using Apache.NMS.AMQP.Provider;
 using Apache.NMS.AMQP.Provider.Failover;
-using Apache.NMS.AMQP.Util;
 using Moq;
 using NMS.AMQP.Test.Provider.Mock;
 using NUnit.Framework;
@@ -205,7 +205,92 @@ namespace NMS.AMQP.Test.Provider
             Assert.AreEqual(5, mockPeer.ContextStats.ConnectionAttempts);
             Assert.AreEqual(5, mockPeer.ContextStats.CloseAttempts);
         }
+        
+        [Test, Timeout(5000)]
+        public void TestMaxReconnectAttemptsWithBackOffAndMaxReconnectDelay()
+        {
+            NmsConnectionFactory factory = new NmsConnectionFactory(
+                "failover:(mock://localhost?mock.failOnConnect=true)" +
+                "?failover.maxReconnectAttempts=6" +
+                "&failover.maxReconnectDelay=800" +
+                "&failover.reconnectDelay=100" +
+                "&failover.useReconnectBackOff=true");
+
+            IConnection connection = null;
+            try
+            {
+                connection = factory.CreateConnection();
+                connection.Start();
+                Assert.Fail("Should have stopped after five retries.");
+            }
+            catch (NMSException)
+            {
+            }
+            finally
+            {
+                connection?.Close();
+            }
 
+            Assert.AreEqual(6, mockPeer.ContextStats.ProvidersCreated);
+            Assert.AreEqual(6, mockPeer.ContextStats.ConnectionAttempts);
+            Assert.AreEqual(6, mockPeer.ContextStats.CloseAttempts);
+            
+            // Verify if reconnect backoff was performed in expected growing 
delays
+            IEnumerable<double> expectedDelays = new double[] {100, 200, 400, 
800, 800}; // At the end it should actually stop growing cause 
MaxReconnectDelay should kick in
+            var actualDelays = GetActualReconnectDelays();
+
+            Enumerable.Zip(expectedDelays, actualDelays, (expected, actual) => 
new { expected, actual })
+            .ToList()
+            .ForEach(p => Assert.AreEqual(p.expected, p.actual, 100));
+        }
+
+        [Test, Timeout(10000)]
+        public void TestMaxReconnectAttemptsWithBackOffAndRandomDelay()
+        {
+            NmsConnectionFactory factory = new NmsConnectionFactory(
+                "failover:(mock://localhost?mock.failOnConnect=true)" +
+                "?failover.maxReconnectAttempts=7" +
+                "&failover.maxReconnectDelay=3200" +
+                "&failover.reconnectDelay=100" +
+                "&failover.useReconnectBackOff=true"+
+                "&failover.reconnectDelayRandomFactor=0.9");
+
+            IConnection connection = null;
+            try
+            {
+                connection = factory.CreateConnection();
+                connection.Start();
+                Assert.Fail("Should have stopped after predefined number of 
retries.");
+            }
+            catch (NMSException)
+            {
+            }
+            finally
+            {
+                connection?.Close();
+            }
+
+            Assert.AreEqual(7, mockPeer.ContextStats.ProvidersCreated);
+            Assert.AreEqual(7, mockPeer.ContextStats.ConnectionAttempts);
+            Assert.AreEqual(7, mockPeer.ContextStats.CloseAttempts);
+            
+            // Verify if reconnect backoff was performed in expected growing 
delays
+            IEnumerable<double> expectedDelays = new double[] {100, 200, 400, 
800, 1600, 3200};
+            var actualDelays = GetActualReconnectDelays();
+
+            double difference = Enumerable.Zip(expectedDelays, actualDelays, 
(expected, actual) => Math.Abs(expected - actual)).Max();
+            Assert.GreaterOrEqual(difference,80);
+        }
+        
+        private IEnumerable<double> GetActualReconnectDelays()
+        {
+            IEnumerable<double> actualDelays = Enumerable
+                .Range(1, 
mockPeer.ContextStats.ConnectionAttemptsTimestamps.Count - 1)
+                .Select(i => 
mockPeer.ContextStats.ConnectionAttemptsTimestamps[i] - 
mockPeer.ContextStats.ConnectionAttemptsTimestamps[i - 1])
+                .Select(a => a.TotalMilliseconds);
+            return actualDelays;
+        }
+       
         [Test]
         public void TestFailureOnCloseIsSwallowed()
         {
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs 
b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
index 7090468..6a15ba5 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
@@ -39,6 +39,8 @@ namespace NMS.AMQP.Test.Provider.Mock
         public int CloseAttempts { get; private set; }
         public int RecoverCalls { get; set; }
 
+        public List<DateTime> ConnectionAttemptsTimestamps { get; set; } = new 
List<DateTime>(); 
+
         public int GetCreateResourceCalls<T>() where T : INmsResource => 
createResourceCalls[typeof(T)];
 
         public int GetDestroyResourceCalls<T>() where T : INmsResource => 
destroyResourceCalls[typeof(T)];
@@ -55,6 +57,7 @@ namespace NMS.AMQP.Test.Provider.Mock
         {
             parent?.RecordConnectAttempt();
             ConnectionAttempts++;
+            ConnectionAttemptsTimestamps.Add(DateTime.Now);
         }
 
         public void RecordCloseAttempt()

Reply via email to