This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch 2.0.x
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/2.0.x by this push:
     new cd89a79  AMQNET-724 Allow to specify consumer credit
     new 52671fd  Merge pull request #75 from lukeabsent/AMQNET-724
cd89a79 is described below

commit cd89a797760b87691e098b14cb1b4cb497d20724
Author: lukeabsent <[email protected]>
AuthorDate: Thu Jul 29 19:47:56 2021 +0200

    AMQNET-724 Allow to specify consumer credit
---
 docs/configuration.md                              |   5 +
 src/NMS.AMQP/Meta/NmsConnectionInfo.cs             |  44 ++++++++
 src/NMS.AMQP/Meta/NmsConsumerInfo.cs               |   4 +-
 src/NMS.AMQP/NmsConnectionFactory.cs               |   8 +-
 src/NMS.AMQP/NmsMessageConsumer.cs                 |   6 +-
 src/NMS.AMQP/Util/PropertyUtil.cs                  |  39 +++++--
 .../AmqpTestSupport.cs                             |   8 +-
 .../NmsMessageConsumerTest.cs                      |  40 +++++++
 test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs |  43 ++++++-
 .../Async/ConsumerIntegrationTestAsync.cs          | 124 ++++++++++++++++++++
 .../Async/FailoverIntegrationTestAsync.cs          |   3 +-
 .../Async/NMSConsumerIntegrationTestAsync.cs       | 125 +++++++++++++++++++++
 .../Integration/ConsumerIntegrationTest.cs         | 125 +++++++++++++++++++++
 .../Integration/FailoverIntegrationTest.cs         |   3 +-
 .../Integration/NMSConsumerIntegrationTest.cs      | 125 +++++++++++++++++++++
 15 files changed, 679 insertions(+), 23 deletions(-)

diff --git a/docs/configuration.md b/docs/configuration.md
index 3375d45..2c81986 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -24,6 +24,11 @@ The options apply to the behavior of the NMS objects such as 
Connection, Session
 - **nms.clientIdPrefix** Optional prefix value that is used for generated 
Client ID values when a new Connection is created for the JMS 
ConnectionFactory. The default prefix is 'ID:'.
 - **nms.connectionIdPrefix** Optional prefix value that is used for generated 
Connection ID values when a new Connection is created for the JMS 
ConnectionFactory. This connection ID is used when logging some information 
from the JMS Connection object so a configurable prefix can make breadcrumbing 
the logs easier. The default prefix is 'ID:'.
 - **nms.maxNewConnectionRatePerSec** Allowed approximated rate for how fast 
connection factory is allowed to create new connection. If there is more 
request, they will have to wait. Default value is -1 which means unlimited.
+- **nms.prefetchPolicy.all** Link credit value that will be assigned to new 
consumers from each category. It is pretty much the number of messages consumer 
can read without acknowledging. The default value is 1000.
+- **nms.prefetchPolicy.queuePrefetch** Link credit value that will be assigned 
to new consumers of queue. The default value is 1000.
+- **nms.prefetchPolicy.topicPrefetch** Link credit value that will be assigned 
to new consumers of topic. The default value is 1000.
+- **nms.prefetchPolicy.queueBrowserPrefetch** Link credit value that will be 
assigned to new queue browser. The default value is 1000..
+- **nms.prefetchPolicy.durableTopicPrefetch** Link credit value that will be 
assigned to new consumers of durable topic. The default value is 1000..
 
 ### TCP Transport Configuration options
 When connected to a remote using plain TCP these options configure the 
behaviour of the underlying socket. These options are appended to the 
connection URI along with the other configuration options, for example:
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs 
b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index b656037..103a4cb 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -34,6 +34,13 @@ namespace Apache.NMS.AMQP.Meta
         public static readonly int DEFAULT_IDLE_TIMEOUT;
         public static readonly ushort DEFAULT_CHANNEL_MAX;
         public static readonly int DEFAULT_MAX_FRAME_SIZE;
+        public static readonly PrefetchPolicyInfo DEFAULT_PREFETCH_POLICY = 
new PrefetchPolicyInfo()
+        {
+            QueuePrefetch = 1000,
+            TopicPrefetch = 1000,
+            DurableTopicPrefetch = 1000,
+            QueueBrowserPrefetch = 1000
+        };
         public static double DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC = -1;
 
         static NmsConnectionInfo()
@@ -70,6 +77,11 @@ namespace Apache.NMS.AMQP.Meta
         public bool DelayedDeliverySupported { get; set; }
         
         public bool SharedSubsSupported { get; set; }
+
+        public PrefetchPolicyInfo PrefetchPolicy { get; set; } = 
DEFAULT_PREFETCH_POLICY;
+        
+        
+        
         
 
         public void SetClientId(string clientId, bool explicitClientId)
@@ -101,4 +113,36 @@ namespace Apache.NMS.AMQP.Meta
             return $"[{nameof(NmsConnectionInfo)}] {nameof(Id)}: {Id}, 
{nameof(ConfiguredUri)}: {ConfiguredUri}";
         }
     }
+
+    public class PrefetchPolicyInfo
+    {
+        public int QueuePrefetch { get; set; }
+        public int TopicPrefetch { get; set; }
+        public int QueueBrowserPrefetch { get; set; }
+        public int DurableTopicPrefetch { get; set; }
+
+        public int All
+        {
+            set => QueuePrefetch = TopicPrefetch = QueueBrowserPrefetch = 
DurableTopicPrefetch = value;
+        }
+
+        internal PrefetchPolicyInfo Clone()
+        {
+            return (PrefetchPolicyInfo) this.MemberwiseClone();
+        }
+        
+        public int GetLinkCredit(IDestination destination, bool browser, bool 
durable)
+        {
+            if (destination.IsTopic)
+            {
+                if (durable) return DurableTopicPrefetch;
+                else return TopicPrefetch;
+            }
+            else
+            {
+                if (browser) return QueueBrowserPrefetch;
+                else return QueuePrefetch;
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs 
b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
index 70cf529..b5506c3 100644
--- a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
@@ -21,8 +21,6 @@ namespace Apache.NMS.AMQP.Meta
 {
     public class NmsConsumerInfo : INmsResource<NmsConsumerId>
     {
-        public static readonly int DEFAULT_CREDIT = 200;
-        
         public NmsConsumerInfo(NmsConsumerId consumerId)
         {
             Id = consumerId ?? throw new 
ArgumentNullException(nameof(consumerId), "Consumer ID cannot be null");
@@ -39,7 +37,7 @@ namespace Apache.NMS.AMQP.Meta
         public bool IsShared { get; set; }
         public bool LocalMessageExpiry { get; set; }
         public bool IsBrowser { get; set; }
-        public int LinkCredit { get; set; } = DEFAULT_CREDIT;
+        public int LinkCredit { get; set; }
         
         public bool HasSelector() => !string.IsNullOrWhiteSpace(Selector);
 
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs 
b/src/NMS.AMQP/NmsConnectionFactory.cs
index 6729bee..6212ddc 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -169,6 +169,11 @@ namespace Apache.NMS.AMQP
         public string ClientId { get; set; }
 
         /// <summary>
+        /// Sets and gets the prefetch values for consumers
+        /// </summary>
+        public PrefetchPolicyInfo PrefetchPolicy { get; set; } = 
NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.Clone();
+
+        /// <summary>
         /// Sets the desired max rate of creating new connections by this 
factory.
         ///
         /// NOTE: During creating new connection if the rate is too high 
system will
@@ -333,7 +338,8 @@ namespace Apache.NMS.AMQP
                 RequestTimeout = RequestTimeout,
                 SendTimeout = SendTimeout,
                 CloseTimeout = CloseTimeout,
-                LocalMessageExpiry = LocalMessageExpiry
+                LocalMessageExpiry = LocalMessageExpiry,
+                PrefetchPolicy = PrefetchPolicy.Clone()
             };
 
             bool userSpecifiedClientId = ClientId != null;
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs 
b/src/NMS.AMQP/NmsMessageConsumer.cs
index 63f6896..c214ee0 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -50,7 +50,7 @@ namespace Apache.NMS.AMQP
             {
                 
session.Connection.CheckConsumeFromTemporaryDestination((NmsTemporaryDestination)
 destination);
             }
-
+         
             Info = new NmsConsumerInfo(consumerId)
             {
                 Destination = destination,
@@ -61,8 +61,8 @@ namespace Apache.NMS.AMQP
                 IsShared = IsSharedSubscription,
                 IsDurable = IsDurableSubscription,
                 IsBrowser =  IsBrowser,
-                LocalMessageExpiry = 
Session.Connection.ConnectionInfo.LocalMessageExpiry
-
+                LocalMessageExpiry = 
Session.Connection.ConnectionInfo.LocalMessageExpiry,
+                LinkCredit = 
Session.Connection.ConnectionInfo.PrefetchPolicy.GetLinkCredit(destination, 
IsBrowser, IsDurableSubscription)
             };
             deliveryTask = new MessageDeliveryTask(this);
         }
diff --git a/src/NMS.AMQP/Util/PropertyUtil.cs 
b/src/NMS.AMQP/Util/PropertyUtil.cs
index db7cd65..ac26ec7 100644
--- a/src/NMS.AMQP/Util/PropertyUtil.cs
+++ b/src/NMS.AMQP/Util/PropertyUtil.cs
@@ -52,26 +52,43 @@ namespace Apache.NMS.AMQP.Util
 
         public static void SetProperties(object obj, StringDictionary 
properties, string propertyPrefix = PROPERTY_PREFIX)
         {
-            Dictionary<string, PropertyInfo> props = 
GetPropertiesForClass(obj);
             foreach (string rawkey in properties.Keys)
             {
-                string key = RemovePrefix(propertyPrefix, rawkey);
-                Tracer.DebugFormat("Searching for Property: \"{0}\"", key);
-                if (props.ContainsKey(key))
+                Tracer.DebugFormat("Searching for Property: \"{0}\"", rawkey);
+                var (currentObject, propertyInfo) = GetPropertyInfo(obj, 
propertyPrefix, rawkey);
+
+                if (propertyInfo != null)
                 {
-                    Tracer.DebugFormat(
-                        "Assigning Property {0} to {1}.{2} with value {3}",
-                        key, obj.GetType().Namespace, obj.GetType().Name, 
properties[rawkey]
-                        );
 #if NET40
-                    if (props[key].GetSetMethod() != null)
+                    if (propertyInfo.GetSetMethod() != null)
 #else
-                    if(props[key].SetMethod!=null)
+                    if (propertyInfo.SetMethod != null)
 #endif
-                        props[key].SetValue(obj, 
ConvertType(props[key].PropertyType, properties[rawkey]), null);
+                    {
+                        Tracer.DebugFormat(
+                            "Assigning Property {0} to {1}.{2} with value {3}",
+                            rawkey, obj.GetType().Namespace, 
obj.GetType().Name, properties[rawkey]
+                        );
+                        propertyInfo.SetValue(currentObject, 
ConvertType(propertyInfo.PropertyType, properties[rawkey]), null);
+                    }
+
                 }
+            }
+        }
 
+        private static (object,PropertyInfo) GetPropertyInfo(object obj, 
string propertyPrefix, string rawkey)
+        {
+            Object currentObject = null;
+            PropertyInfo propertyInfo = null;
+            
+            foreach (var propertyName in rawkey.Split(new[] {'.'}, 
StringSplitOptions.RemoveEmptyEntries))
+            {
+                currentObject = currentObject == null ? obj : 
propertyInfo.GetValue(currentObject);
+                string key = RemovePrefix(propertyPrefix, propertyName);
+                propertyInfo = GetPropertiesForClass(currentObject)[key];
             }
+
+            return (currentObject, propertyInfo);
         }
 
         public static StringDictionary GetProperties(object obj, string 
propertyPrefix = PROPERTY_PREFIX)
diff --git a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs 
b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
index bf309c4..c742457 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
@@ -48,10 +48,14 @@ namespace NMS.AMQP.Test
             connection.Start();
             return connection;
         }
-        
-        protected IConnection CreateAmqpConnection(string clientId = null)
+
+        protected IConnection CreateAmqpConnection(string clientId = null, 
string options = null)
         {
             string brokerUri = 
Environment.GetEnvironmentVariable("NMS_AMQP_TEST_URI") ?? 
"amqp://127.0.0.1:5672";
+            if (options != null)
+            {
+                brokerUri += "?" + options;
+            }
             string userName = 
Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CU") ?? "admin";
             string password = 
Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CPWD") ?? "admin";
 
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs 
b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
index dc8c260..7b2de68 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -60,7 +60,47 @@ namespace NMS.AMQP.Test
             messageConsumer.Close();
             
         }
+        
+        [Test, Timeout(60_000)]
+        public void TestConsumerCredit()
+        {
+            PurgeQueue(TimeSpan.FromMilliseconds(500));
+
+            Connection = CreateAmqpConnection(options: 
"nms.prefetchPolicy.all=3");
+            Connection.Start();
+
+            ISession session = 
Connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+            IQueue queue = session.GetQueue(TestName);
+            IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+            ConcurrentBag<IMessage> messages = new ConcurrentBag<IMessage>();
+            CountdownEvent countdownReceived = new CountdownEvent(4);
+            messageConsumer.Listener += message =>
+            {
+                messages.Add(message);
+                try
+                {
+                    countdownReceived.Signal();
+                }
+                catch (Exception)
+                {
+                    // if it gets below zero, we dont care
+                }
+            };
+            
+            IMessageProducer producer = session.CreateProducer(queue);
+            Enumerable.Range(0, 100).ToList().ForEach(nr => 
producer.Send(session.CreateTextMessage("hello")));
+            
+            // Wait for at least four messages are read, which should never 
happen
+            Assert.IsFalse(countdownReceived.Wait(500));
+            Assert.AreEqual(3, messages.Count);
+            
+            // Once we ack messages we should start receiving another 3
+            messages.ToList().ForEach(m => m.Acknowledge());
+            // We just wait to see if 4th message arrived
+            Assert.IsTrue(countdownReceived.Wait(500));
+        }
 
+        
         [Test, Timeout(60_000)]
         public void TestSelectorsWithJMSType()
         {
diff --git a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs 
b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
index 3962e36..096e53f 100644
--- a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
@@ -120,7 +120,44 @@ namespace NMS.AMQP.Test
                                 "&nms.requestTimeout=1000" +
                                 "&nms.sendTimeout=1000" +
                                 "&nms.closeTimeout=2000" +
-                                "&nms.localMessageExpiry=false";
+                                "&nms.localMessageExpiry=false" +
+                                "&nms.prefetchPolicy.all=55";
+
+            NmsConnectionFactory factory = new NmsConnectionFactory(new 
Uri(configuredUri));
+
+            Assert.AreEqual("user", factory.UserName);
+            Assert.AreEqual("password", factory.Password);
+            Assert.AreEqual("client", factory.ClientId);
+            Assert.AreEqual("ID:TEST", factory.ConnectionIdPrefix);
+            Assert.AreEqual("clientId", factory.ClientIdPrefix);
+            Assert.AreEqual(1000, factory.RequestTimeout);
+            Assert.AreEqual(1000, factory.SendTimeout);
+            Assert.AreEqual(2000, factory.CloseTimeout);
+            Assert.AreEqual(55, factory.PrefetchPolicy.QueuePrefetch);
+            Assert.AreEqual(55, factory.PrefetchPolicy.TopicPrefetch);
+            Assert.AreEqual(55, factory.PrefetchPolicy.DurableTopicPrefetch);
+            Assert.AreEqual(55, factory.PrefetchPolicy.QueueBrowserPrefetch);
+            Assert.IsFalse(factory.LocalMessageExpiry);
+        }
+        
+        [Test]
+        public void TestSetPrefetchPolicyPropertiesFromUri()
+        {
+            string baseUri = "amqp://localhost:1234";
+            string configuredUri = baseUri +
+                                   "?nms.username=user" +
+                                   "&nms.password=password" +
+                                   "&nms.clientId=client" +
+                                   "&nms.connectionIdPrefix=ID:TEST" +
+                                   "&nms.clientIDPrefix=clientId" +
+                                   "&nms.requestTimeout=1000" +
+                                   "&nms.sendTimeout=1000" +
+                                   "&nms.closeTimeout=2000" +
+                                   "&nms.localMessageExpiry=false" +
+                                   "&nms.prefetchPolicy.queuePrefetch=11" +
+                                   "&nms.prefetchPolicy.topicPrefetch=22" +
+                                   
"&nms.prefetchPolicy.durableTopicPrefetch=33" +
+                                   
"&nms.prefetchPolicy.queueBrowserPrefetch=44";
 
             NmsConnectionFactory factory = new NmsConnectionFactory(new 
Uri(configuredUri));
 
@@ -132,6 +169,10 @@ namespace NMS.AMQP.Test
             Assert.AreEqual(1000, factory.RequestTimeout);
             Assert.AreEqual(1000, factory.SendTimeout);
             Assert.AreEqual(2000, factory.CloseTimeout);
+            Assert.AreEqual(11, factory.PrefetchPolicy.QueuePrefetch);
+            Assert.AreEqual(22, factory.PrefetchPolicy.TopicPrefetch);
+            Assert.AreEqual(33, factory.PrefetchPolicy.DurableTopicPrefetch);
+            Assert.AreEqual(44, factory.PrefetchPolicy.QueueBrowserPrefetch);
             Assert.IsFalse(factory.LocalMessageExpiry);
         }
 
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs 
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
index b4f9885..5445d5e 100644
--- 
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
@@ -59,6 +59,130 @@ namespace NMS.AMQP.Test.Integration.Async
         }
 
         [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await 
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(5, credit));
+
+                ISession session = await 
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = await session.GetQueueAsync("myQueue");
+                IMessageConsumer consumer = await 
session.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await 
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(6, credit));
+
+                ISession session = await 
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = await session.GetQueueAsync("myQueue");
+                IMessageConsumer consumer = await 
session.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await 
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(7, credit));
+
+                ISession session = await 
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = await session.GetTopicAsync("myTopic");
+                IMessageConsumer consumer = await 
session.CreateConsumerAsync(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await 
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(8, credit));
+
+                ISession session = await 
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = await session.GetTopicAsync("myTopic");
+                IMessageConsumer consumer = await 
session.CreateDurableConsumerAsync(topic, "durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, 
replyClosed: false);
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = await 
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, 
Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(9, credit));
+
+                ISession session = await 
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = await session.GetQueueAsync("myQueue");
+                IQueueBrowser consumer = await 
session.CreateBrowserAsync(queue);
+                
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                
+                // To cause actual creation of consumer, after iteration 
consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                await connection.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+        [Test, Timeout(20_000)]
         public async Task TestRemotelyCloseConsumer()
         {
             Mock<INmsConnectionListener> mockConnectionListener = new 
Mock<INmsConnectionListener>();
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs 
b/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
index 9cae2d9..0f5397d 100644
--- 
a/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
@@ -23,6 +23,7 @@ using Amqp.Framing;
 using Amqp.Types;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
 using Moq;
 using NLog;
 using NMS.AMQP.Test.TestAmqp;
@@ -617,7 +618,7 @@ namespace NMS.AMQP.Test.Integration.Async
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectReceiverAttach();
-                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: 
false, creditMatcher: credit => Assert.AreEqual(credit, 200));
+                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: 
false, creditMatcher: credit => 
Assert.AreEqual(NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.QueuePrefetch, 
credit));
                 finalPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
                 finalPeer.ExpectClose();
 
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
 
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
index a27b268..87659da 100644
--- 
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
+++ 
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
@@ -57,6 +57,131 @@ namespace NMS.AMQP.Test.Integration.Async
                 testPeer.WaitForAllMatchersToComplete(1000);
             }
         }
+        
+             [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, 
"nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(5, credit));
+
+                IQueue queue = await context.GetQueueAsync("myQueue");
+                INMSConsumer consumer = await 
context.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, 
"nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(6, credit));
+
+                IQueue queue = await context.GetQueueAsync("myQueue");
+                INMSConsumer consumer = await 
context.CreateConsumerAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, 
"nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(7, credit));
+
+                ITopic topic = await context.GetTopicAsync("myTopic");
+                INMSConsumer consumer = await 
context.CreateConsumerAsync(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, 
"nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(8, credit));
+
+                ITopic topic = await context.GetTopicAsync("myTopic");
+                INMSConsumer consumer = await 
context.CreateDurableConsumerAsync(topic, "durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, 
replyClosed: false);
+                testPeer.ExpectEnd();
+                await consumer.CloseAsync();
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public async Task TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = await EstablishNMSContextAsync(testPeer, 
"nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, 
Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(9, credit));
+
+                IQueue queue = await context.GetQueueAsync("myQueue");
+                IQueueBrowser consumer = await 
context.CreateBrowserAsync(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+
+                // To cause actual creation of consumer, after iteration 
consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                await context.CloseAsync();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
 
         // TODO No connection Listener in NMSContext
         // [Test, Timeout(20_000)]
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs 
b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
index f42ba51..6bfb1f5 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
@@ -59,6 +59,131 @@ namespace NMS.AMQP.Test.Integration
         }
 
         [Test, Timeout(20_000)]
+        public void TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, 
"nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(5, credit));
+
+                ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, 
"nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(6, credit));
+
+                ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, 
"nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(7, credit));
+
+                ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = session.GetTopic("myTopic");
+                IMessageConsumer consumer = session.CreateConsumer(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, 
"nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(8, credit));
+
+                ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                ITopic topic = session.GetTopic("myTopic");
+                IMessageConsumer consumer = 
session.CreateDurableConsumer(topic, "durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, 
replyClosed: false);
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                IConnection connection = EstablishConnection(testPeer, 
"nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, 
Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(9, credit));
+
+                ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                IQueue queue = session.GetQueue("myQueue");
+                IQueueBrowser consumer = session.CreateBrowser(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+
+                // To cause actual creation of consumer, after iteration 
consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                connection.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+
+        [Test, Timeout(20_000)]
         public void TestRemotelyCloseConsumer()
         {
             Mock<INmsConnectionListener> mockConnectionListener = new 
Mock<INmsConnectionListener>();
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs 
b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 42919cc..6d53c45 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -23,6 +23,7 @@ using Amqp.Framing;
 using Amqp.Types;
 using Apache.NMS;
 using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
 using Moq;
 using NLog;
 using NMS.AMQP.Test.TestAmqp;
@@ -617,7 +618,7 @@ namespace NMS.AMQP.Test.Integration
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectBegin();
                 finalPeer.ExpectReceiverAttach();
-                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: 
false, creditMatcher: credit => Assert.AreEqual(credit, 200));
+                finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse: 
false, creditMatcher: credit => 
Assert.AreEqual(NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.QueuePrefetch, 
credit));
                 finalPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
                 finalPeer.ExpectClose();
 
diff --git 
a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs 
b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
index 542f720..0230f44 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
@@ -58,6 +58,131 @@ namespace NMS.AMQP.Test.Integration
             }
         }
 
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditAll()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, 
"nms.prefetchPolicy.all=5");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(5, credit));
+
+                IQueue queue = context.GetQueue("myQueue");
+                INMSConsumer consumer = context.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueuePrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, 
"nms.prefetchPolicy.queuePrefetch=6");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(6, credit));
+
+                IQueue queue = context.GetQueue("myQueue");
+                INMSConsumer consumer = context.CreateConsumer(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, 
"nms.prefetchPolicy.topicPrefetch=7");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(7, credit));
+
+                ITopic topic = context.GetTopic("myTopic");
+                INMSConsumer consumer = context.CreateConsumer(topic);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditDurableTopicPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, 
"nms.prefetchPolicy.durableTopicPrefetch=8");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach();
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(8, credit));
+
+                ITopic topic = context.GetTopic("myTopic");
+                INMSConsumer consumer = context.CreateDurableConsumer(topic, 
"durableName");
+
+                testPeer.ExpectDetach(expectClosed: false, sendResponse: true, 
replyClosed: false);
+                testPeer.ExpectEnd();
+                consumer.Close();
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+
+        [Test, Timeout(20_000)]
+        public void TestConsumerCreditQueueBrowserPrefetch()
+        {
+            using (TestAmqpPeer testPeer = new TestAmqpPeer())
+            {
+                INMSContext context = EstablishNMSContext(testPeer, 
"nms.prefetchPolicy.queueBrowserPrefetch=9");
+                testPeer.ExpectBegin();
+                testPeer.ExpectReceiverAttach(Assert.IsNotNull, 
Assert.IsNotNull, Assert.IsNotNull, true);
+                testPeer.ExpectLinkFlow(false, false, credit => 
Assert.AreEqual(9, credit));
+
+                IQueue queue = context.GetQueue("myQueue");
+                IQueueBrowser consumer = context.CreateBrowser(queue);
+
+                testPeer.ExpectDetach(expectClosed: true, sendResponse: true, 
replyClosed: true);
+                testPeer.ExpectEnd();
+
+                // To cause actual creation of consumer, after iteration 
consumer would be closed
+                foreach (var o in consumer)
+                {
+                }
+
+                testPeer.ExpectClose();
+                context.Close();
+
+                testPeer.WaitForAllMatchersToComplete(1000);
+            }
+        }
+        
+       
         // TODO No connection Listener in context
         // [Test, Timeout(20_000)]
         // public void TestRemotelyCloseConsumer()

Reply via email to