This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch 2.0.x
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/2.0.x by this push:
new cd89a79 AMQNET-724 Allow to specify consumer credit
new 52671fd Merge pull request #75 from lukeabsent/AMQNET-724
cd89a79 is described below
commit cd89a797760b87691e098b14cb1b4cb497d20724
Author: lukeabsent <[email protected]>
AuthorDate: Thu Jul 29 19:47:56 2021 +0200
AMQNET-724 Allow to specify consumer credit
---
docs/configuration.md | 5 +
src/NMS.AMQP/Meta/NmsConnectionInfo.cs | 44 ++++++++
src/NMS.AMQP/Meta/NmsConsumerInfo.cs | 4 +-
src/NMS.AMQP/NmsConnectionFactory.cs | 8 +-
src/NMS.AMQP/NmsMessageConsumer.cs | 6 +-
src/NMS.AMQP/Util/PropertyUtil.cs | 39 +++++--
.../AmqpTestSupport.cs | 8 +-
.../NmsMessageConsumerTest.cs | 40 +++++++
test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs | 43 ++++++-
.../Async/ConsumerIntegrationTestAsync.cs | 124 ++++++++++++++++++++
.../Async/FailoverIntegrationTestAsync.cs | 3 +-
.../Async/NMSConsumerIntegrationTestAsync.cs | 125 +++++++++++++++++++++
.../Integration/ConsumerIntegrationTest.cs | 125 +++++++++++++++++++++
.../Integration/FailoverIntegrationTest.cs | 3 +-
.../Integration/NMSConsumerIntegrationTest.cs | 125 +++++++++++++++++++++
15 files changed, 679 insertions(+), 23 deletions(-)
diff --git a/docs/configuration.md b/docs/configuration.md
index 3375d45..2c81986 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -24,6 +24,11 @@ The options apply to the behavior of the NMS objects such as
Connection, Session
- **nms.clientIdPrefix** Optional prefix value that is used for generated
Client ID values when a new Connection is created for the JMS
ConnectionFactory. The default prefix is 'ID:'.
- **nms.connectionIdPrefix** Optional prefix value that is used for generated
Connection ID values when a new Connection is created for the JMS
ConnectionFactory. This connection ID is used when logging some information
from the JMS Connection object so a configurable prefix can make breadcrumbing
the logs easier. The default prefix is 'ID:'.
- **nms.maxNewConnectionRatePerSec** Allowed approximated rate for how fast
connection factory is allowed to create new connection. If there is more
request, they will have to wait. Default value is -1 which means unlimited.
+- **nms.prefetchPolicy.all** Link credit value that will be assigned to new
consumers from each category. It is pretty much the number of messages consumer
can read without acknowledging. The default value is 1000.
+- **nms.prefetchPolicy.queuePrefetch** Link credit value that will be assigned
to new consumers of queue. The default value is 1000.
+- **nms.prefetchPolicy.topicPrefetch** Link credit value that will be assigned
to new consumers of topic. The default value is 1000.
+- **nms.prefetchPolicy.queueBrowserPrefetch** Link credit value that will be
assigned to new queue browser. The default value is 1000..
+- **nms.prefetchPolicy.durableTopicPrefetch** Link credit value that will be
assigned to new consumers of durable topic. The default value is 1000..
### TCP Transport Configuration options
When connected to a remote using plain TCP these options configure the
behaviour of the underlying socket. These options are appended to the
connection URI along with the other configuration options, for example:
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
index b656037..103a4cb 100644
--- a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -34,6 +34,13 @@ namespace Apache.NMS.AMQP.Meta
public static readonly int DEFAULT_IDLE_TIMEOUT;
public static readonly ushort DEFAULT_CHANNEL_MAX;
public static readonly int DEFAULT_MAX_FRAME_SIZE;
+ public static readonly PrefetchPolicyInfo DEFAULT_PREFETCH_POLICY =
new PrefetchPolicyInfo()
+ {
+ QueuePrefetch = 1000,
+ TopicPrefetch = 1000,
+ DurableTopicPrefetch = 1000,
+ QueueBrowserPrefetch = 1000
+ };
public static double DEFAULT_MAX_NEW_CONNECTION_RATE_PER_SEC = -1;
static NmsConnectionInfo()
@@ -70,6 +77,11 @@ namespace Apache.NMS.AMQP.Meta
public bool DelayedDeliverySupported { get; set; }
public bool SharedSubsSupported { get; set; }
+
+ public PrefetchPolicyInfo PrefetchPolicy { get; set; } =
DEFAULT_PREFETCH_POLICY;
+
+
+
public void SetClientId(string clientId, bool explicitClientId)
@@ -101,4 +113,36 @@ namespace Apache.NMS.AMQP.Meta
return $"[{nameof(NmsConnectionInfo)}] {nameof(Id)}: {Id},
{nameof(ConfiguredUri)}: {ConfiguredUri}";
}
}
+
+ public class PrefetchPolicyInfo
+ {
+ public int QueuePrefetch { get; set; }
+ public int TopicPrefetch { get; set; }
+ public int QueueBrowserPrefetch { get; set; }
+ public int DurableTopicPrefetch { get; set; }
+
+ public int All
+ {
+ set => QueuePrefetch = TopicPrefetch = QueueBrowserPrefetch =
DurableTopicPrefetch = value;
+ }
+
+ internal PrefetchPolicyInfo Clone()
+ {
+ return (PrefetchPolicyInfo) this.MemberwiseClone();
+ }
+
+ public int GetLinkCredit(IDestination destination, bool browser, bool
durable)
+ {
+ if (destination.IsTopic)
+ {
+ if (durable) return DurableTopicPrefetch;
+ else return TopicPrefetch;
+ }
+ else
+ {
+ if (browser) return QueueBrowserPrefetch;
+ else return QueuePrefetch;
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
index 70cf529..b5506c3 100644
--- a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
+++ b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
@@ -21,8 +21,6 @@ namespace Apache.NMS.AMQP.Meta
{
public class NmsConsumerInfo : INmsResource<NmsConsumerId>
{
- public static readonly int DEFAULT_CREDIT = 200;
-
public NmsConsumerInfo(NmsConsumerId consumerId)
{
Id = consumerId ?? throw new
ArgumentNullException(nameof(consumerId), "Consumer ID cannot be null");
@@ -39,7 +37,7 @@ namespace Apache.NMS.AMQP.Meta
public bool IsShared { get; set; }
public bool LocalMessageExpiry { get; set; }
public bool IsBrowser { get; set; }
- public int LinkCredit { get; set; } = DEFAULT_CREDIT;
+ public int LinkCredit { get; set; }
public bool HasSelector() => !string.IsNullOrWhiteSpace(Selector);
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs
b/src/NMS.AMQP/NmsConnectionFactory.cs
index 6729bee..6212ddc 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -169,6 +169,11 @@ namespace Apache.NMS.AMQP
public string ClientId { get; set; }
/// <summary>
+ /// Sets and gets the prefetch values for consumers
+ /// </summary>
+ public PrefetchPolicyInfo PrefetchPolicy { get; set; } =
NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.Clone();
+
+ /// <summary>
/// Sets the desired max rate of creating new connections by this
factory.
///
/// NOTE: During creating new connection if the rate is too high
system will
@@ -333,7 +338,8 @@ namespace Apache.NMS.AMQP
RequestTimeout = RequestTimeout,
SendTimeout = SendTimeout,
CloseTimeout = CloseTimeout,
- LocalMessageExpiry = LocalMessageExpiry
+ LocalMessageExpiry = LocalMessageExpiry,
+ PrefetchPolicy = PrefetchPolicy.Clone()
};
bool userSpecifiedClientId = ClientId != null;
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs
b/src/NMS.AMQP/NmsMessageConsumer.cs
index 63f6896..c214ee0 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -50,7 +50,7 @@ namespace Apache.NMS.AMQP
{
session.Connection.CheckConsumeFromTemporaryDestination((NmsTemporaryDestination)
destination);
}
-
+
Info = new NmsConsumerInfo(consumerId)
{
Destination = destination,
@@ -61,8 +61,8 @@ namespace Apache.NMS.AMQP
IsShared = IsSharedSubscription,
IsDurable = IsDurableSubscription,
IsBrowser = IsBrowser,
- LocalMessageExpiry =
Session.Connection.ConnectionInfo.LocalMessageExpiry
-
+ LocalMessageExpiry =
Session.Connection.ConnectionInfo.LocalMessageExpiry,
+ LinkCredit =
Session.Connection.ConnectionInfo.PrefetchPolicy.GetLinkCredit(destination,
IsBrowser, IsDurableSubscription)
};
deliveryTask = new MessageDeliveryTask(this);
}
diff --git a/src/NMS.AMQP/Util/PropertyUtil.cs
b/src/NMS.AMQP/Util/PropertyUtil.cs
index db7cd65..ac26ec7 100644
--- a/src/NMS.AMQP/Util/PropertyUtil.cs
+++ b/src/NMS.AMQP/Util/PropertyUtil.cs
@@ -52,26 +52,43 @@ namespace Apache.NMS.AMQP.Util
public static void SetProperties(object obj, StringDictionary
properties, string propertyPrefix = PROPERTY_PREFIX)
{
- Dictionary<string, PropertyInfo> props =
GetPropertiesForClass(obj);
foreach (string rawkey in properties.Keys)
{
- string key = RemovePrefix(propertyPrefix, rawkey);
- Tracer.DebugFormat("Searching for Property: \"{0}\"", key);
- if (props.ContainsKey(key))
+ Tracer.DebugFormat("Searching for Property: \"{0}\"", rawkey);
+ var (currentObject, propertyInfo) = GetPropertyInfo(obj,
propertyPrefix, rawkey);
+
+ if (propertyInfo != null)
{
- Tracer.DebugFormat(
- "Assigning Property {0} to {1}.{2} with value {3}",
- key, obj.GetType().Namespace, obj.GetType().Name,
properties[rawkey]
- );
#if NET40
- if (props[key].GetSetMethod() != null)
+ if (propertyInfo.GetSetMethod() != null)
#else
- if(props[key].SetMethod!=null)
+ if (propertyInfo.SetMethod != null)
#endif
- props[key].SetValue(obj,
ConvertType(props[key].PropertyType, properties[rawkey]), null);
+ {
+ Tracer.DebugFormat(
+ "Assigning Property {0} to {1}.{2} with value {3}",
+ rawkey, obj.GetType().Namespace,
obj.GetType().Name, properties[rawkey]
+ );
+ propertyInfo.SetValue(currentObject,
ConvertType(propertyInfo.PropertyType, properties[rawkey]), null);
+ }
+
}
+ }
+ }
+ private static (object,PropertyInfo) GetPropertyInfo(object obj,
string propertyPrefix, string rawkey)
+ {
+ Object currentObject = null;
+ PropertyInfo propertyInfo = null;
+
+ foreach (var propertyName in rawkey.Split(new[] {'.'},
StringSplitOptions.RemoveEmptyEntries))
+ {
+ currentObject = currentObject == null ? obj :
propertyInfo.GetValue(currentObject);
+ string key = RemovePrefix(propertyPrefix, propertyName);
+ propertyInfo = GetPropertiesForClass(currentObject)[key];
}
+
+ return (currentObject, propertyInfo);
}
public static StringDictionary GetProperties(object obj, string
propertyPrefix = PROPERTY_PREFIX)
diff --git a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
index bf309c4..c742457 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/AmqpTestSupport.cs
@@ -48,10 +48,14 @@ namespace NMS.AMQP.Test
connection.Start();
return connection;
}
-
- protected IConnection CreateAmqpConnection(string clientId = null)
+
+ protected IConnection CreateAmqpConnection(string clientId = null,
string options = null)
{
string brokerUri =
Environment.GetEnvironmentVariable("NMS_AMQP_TEST_URI") ??
"amqp://127.0.0.1:5672";
+ if (options != null)
+ {
+ brokerUri += "?" + options;
+ }
string userName =
Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CU") ?? "admin";
string password =
Environment.GetEnvironmentVariable("NMS_AMQP_TEST_CPWD") ?? "admin";
diff --git a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
index dc8c260..7b2de68 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/NmsMessageConsumerTest.cs
@@ -60,7 +60,47 @@ namespace NMS.AMQP.Test
messageConsumer.Close();
}
+
+ [Test, Timeout(60_000)]
+ public void TestConsumerCredit()
+ {
+ PurgeQueue(TimeSpan.FromMilliseconds(500));
+
+ Connection = CreateAmqpConnection(options:
"nms.prefetchPolicy.all=3");
+ Connection.Start();
+
+ ISession session =
Connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+ IQueue queue = session.GetQueue(TestName);
+ IMessageConsumer messageConsumer = session.CreateConsumer(queue);
+ ConcurrentBag<IMessage> messages = new ConcurrentBag<IMessage>();
+ CountdownEvent countdownReceived = new CountdownEvent(4);
+ messageConsumer.Listener += message =>
+ {
+ messages.Add(message);
+ try
+ {
+ countdownReceived.Signal();
+ }
+ catch (Exception)
+ {
+ // if it gets below zero, we dont care
+ }
+ };
+
+ IMessageProducer producer = session.CreateProducer(queue);
+ Enumerable.Range(0, 100).ToList().ForEach(nr =>
producer.Send(session.CreateTextMessage("hello")));
+
+ // Wait for at least four messages are read, which should never
happen
+ Assert.IsFalse(countdownReceived.Wait(500));
+ Assert.AreEqual(3, messages.Count);
+
+ // Once we ack messages we should start receiving another 3
+ messages.ToList().ForEach(m => m.Acknowledge());
+ // We just wait to see if 4th message arrived
+ Assert.IsTrue(countdownReceived.Wait(500));
+ }
+
[Test, Timeout(60_000)]
public void TestSelectorsWithJMSType()
{
diff --git a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
index 3962e36..096e53f 100644
--- a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
@@ -120,7 +120,44 @@ namespace NMS.AMQP.Test
"&nms.requestTimeout=1000" +
"&nms.sendTimeout=1000" +
"&nms.closeTimeout=2000" +
- "&nms.localMessageExpiry=false";
+ "&nms.localMessageExpiry=false" +
+ "&nms.prefetchPolicy.all=55";
+
+ NmsConnectionFactory factory = new NmsConnectionFactory(new
Uri(configuredUri));
+
+ Assert.AreEqual("user", factory.UserName);
+ Assert.AreEqual("password", factory.Password);
+ Assert.AreEqual("client", factory.ClientId);
+ Assert.AreEqual("ID:TEST", factory.ConnectionIdPrefix);
+ Assert.AreEqual("clientId", factory.ClientIdPrefix);
+ Assert.AreEqual(1000, factory.RequestTimeout);
+ Assert.AreEqual(1000, factory.SendTimeout);
+ Assert.AreEqual(2000, factory.CloseTimeout);
+ Assert.AreEqual(55, factory.PrefetchPolicy.QueuePrefetch);
+ Assert.AreEqual(55, factory.PrefetchPolicy.TopicPrefetch);
+ Assert.AreEqual(55, factory.PrefetchPolicy.DurableTopicPrefetch);
+ Assert.AreEqual(55, factory.PrefetchPolicy.QueueBrowserPrefetch);
+ Assert.IsFalse(factory.LocalMessageExpiry);
+ }
+
+ [Test]
+ public void TestSetPrefetchPolicyPropertiesFromUri()
+ {
+ string baseUri = "amqp://localhost:1234";
+ string configuredUri = baseUri +
+ "?nms.username=user" +
+ "&nms.password=password" +
+ "&nms.clientId=client" +
+ "&nms.connectionIdPrefix=ID:TEST" +
+ "&nms.clientIDPrefix=clientId" +
+ "&nms.requestTimeout=1000" +
+ "&nms.sendTimeout=1000" +
+ "&nms.closeTimeout=2000" +
+ "&nms.localMessageExpiry=false" +
+ "&nms.prefetchPolicy.queuePrefetch=11" +
+ "&nms.prefetchPolicy.topicPrefetch=22" +
+
"&nms.prefetchPolicy.durableTopicPrefetch=33" +
+
"&nms.prefetchPolicy.queueBrowserPrefetch=44";
NmsConnectionFactory factory = new NmsConnectionFactory(new
Uri(configuredUri));
@@ -132,6 +169,10 @@ namespace NMS.AMQP.Test
Assert.AreEqual(1000, factory.RequestTimeout);
Assert.AreEqual(1000, factory.SendTimeout);
Assert.AreEqual(2000, factory.CloseTimeout);
+ Assert.AreEqual(11, factory.PrefetchPolicy.QueuePrefetch);
+ Assert.AreEqual(22, factory.PrefetchPolicy.TopicPrefetch);
+ Assert.AreEqual(33, factory.PrefetchPolicy.DurableTopicPrefetch);
+ Assert.AreEqual(44, factory.PrefetchPolicy.QueueBrowserPrefetch);
Assert.IsFalse(factory.LocalMessageExpiry);
}
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
index b4f9885..5445d5e 100644
---
a/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
+++
b/test/Apache-NMS-AMQP-Test/Integration/Async/ConsumerIntegrationTestAsync.cs
@@ -59,6 +59,130 @@ namespace NMS.AMQP.Test.Integration.Async
}
[Test, Timeout(20_000)]
+ public async Task TestConsumerCreditAll()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = await
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.all=5");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(5, credit));
+
+ ISession session = await
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = await session.GetQueueAsync("myQueue");
+ IMessageConsumer consumer = await
session.CreateConsumerAsync(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await connection.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditQueuePrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = await
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.queuePrefetch=6");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(6, credit));
+
+ ISession session = await
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = await session.GetQueueAsync("myQueue");
+ IMessageConsumer consumer = await
session.CreateConsumerAsync(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await connection.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = await
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.topicPrefetch=7");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(7, credit));
+
+ ISession session = await
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+ ITopic topic = await session.GetTopicAsync("myTopic");
+ IMessageConsumer consumer = await
session.CreateConsumerAsync(topic);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await connection.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditDurableTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = await
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.durableTopicPrefetch=8");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(8, credit));
+
+ ISession session = await
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+ ITopic topic = await session.GetTopicAsync("myTopic");
+ IMessageConsumer consumer = await
session.CreateDurableConsumerAsync(topic, "durableName");
+
+ testPeer.ExpectDetach(expectClosed: false, sendResponse: true,
replyClosed: false);
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await connection.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditQueueBrowserPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = await
EstablishConnectionAsync(testPeer, "nms.prefetchPolicy.queueBrowserPrefetch=9");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach(Assert.IsNotNull,
Assert.IsNotNull, Assert.IsNotNull, true);
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(9, credit));
+
+ ISession session = await
connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = await session.GetQueueAsync("myQueue");
+ IQueueBrowser consumer = await
session.CreateBrowserAsync(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+
+ // To cause actual creation of consumer, after iteration
consumer would be closed
+ foreach (var o in consumer)
+ {
+ }
+
+ testPeer.ExpectClose();
+ await connection.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
public async Task TestRemotelyCloseConsumer()
{
Mock<INmsConnectionListener> mockConnectionListener = new
Mock<INmsConnectionListener>();
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
b/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
index 9cae2d9..0f5397d 100644
---
a/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
+++
b/test/Apache-NMS-AMQP-Test/Integration/Async/FailoverIntegrationTestAsync.cs
@@ -23,6 +23,7 @@ using Amqp.Framing;
using Amqp.Types;
using Apache.NMS;
using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
using Moq;
using NLog;
using NMS.AMQP.Test.TestAmqp;
@@ -617,7 +618,7 @@ namespace NMS.AMQP.Test.Integration.Async
finalPeer.ExpectBegin();
finalPeer.ExpectBegin();
finalPeer.ExpectReceiverAttach();
- finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse:
false, creditMatcher: credit => Assert.AreEqual(credit, 200));
+ finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse:
false, creditMatcher: credit =>
Assert.AreEqual(NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.QueuePrefetch,
credit));
finalPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
finalPeer.ExpectClose();
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
index a27b268..87659da 100644
---
a/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
+++
b/test/Apache-NMS-AMQP-Test/Integration/Async/NMSConsumerIntegrationTestAsync.cs
@@ -57,6 +57,131 @@ namespace NMS.AMQP.Test.Integration.Async
testPeer.WaitForAllMatchersToComplete(1000);
}
}
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditAll()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = await EstablishNMSContextAsync(testPeer,
"nms.prefetchPolicy.all=5");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(5, credit));
+
+ IQueue queue = await context.GetQueueAsync("myQueue");
+ INMSConsumer consumer = await
context.CreateConsumerAsync(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await context.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditQueuePrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = await EstablishNMSContextAsync(testPeer,
"nms.prefetchPolicy.queuePrefetch=6");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(6, credit));
+
+ IQueue queue = await context.GetQueueAsync("myQueue");
+ INMSConsumer consumer = await
context.CreateConsumerAsync(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await context.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = await EstablishNMSContextAsync(testPeer,
"nms.prefetchPolicy.topicPrefetch=7");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(7, credit));
+
+ ITopic topic = await context.GetTopicAsync("myTopic");
+ INMSConsumer consumer = await
context.CreateConsumerAsync(topic);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await context.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditDurableTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = await EstablishNMSContextAsync(testPeer,
"nms.prefetchPolicy.durableTopicPrefetch=8");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(8, credit));
+
+ ITopic topic = await context.GetTopicAsync("myTopic");
+ INMSConsumer consumer = await
context.CreateDurableConsumerAsync(topic, "durableName");
+
+ testPeer.ExpectDetach(expectClosed: false, sendResponse: true,
replyClosed: false);
+ testPeer.ExpectEnd();
+ await consumer.CloseAsync();
+
+ testPeer.ExpectClose();
+ await context.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public async Task TestConsumerCreditQueueBrowserPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = await EstablishNMSContextAsync(testPeer,
"nms.prefetchPolicy.queueBrowserPrefetch=9");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach(Assert.IsNotNull,
Assert.IsNotNull, Assert.IsNotNull, true);
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(9, credit));
+
+ IQueue queue = await context.GetQueueAsync("myQueue");
+ IQueueBrowser consumer = await
context.CreateBrowserAsync(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+
+ // To cause actual creation of consumer, after iteration
consumer would be closed
+ foreach (var o in consumer)
+ {
+ }
+
+ testPeer.ExpectClose();
+ await context.CloseAsync();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
// TODO No connection Listener in NMSContext
// [Test, Timeout(20_000)]
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
index f42ba51..6bfb1f5 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/ConsumerIntegrationTest.cs
@@ -59,6 +59,131 @@ namespace NMS.AMQP.Test.Integration
}
[Test, Timeout(20_000)]
+ public void TestConsumerCreditAll()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer,
"nms.prefetchPolicy.all=5");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(5, credit));
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditQueuePrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer,
"nms.prefetchPolicy.queuePrefetch=6");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(6, credit));
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageConsumer consumer = session.CreateConsumer(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer,
"nms.prefetchPolicy.topicPrefetch=7");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(7, credit));
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITopic topic = session.GetTopic("myTopic");
+ IMessageConsumer consumer = session.CreateConsumer(topic);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditDurableTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer,
"nms.prefetchPolicy.durableTopicPrefetch=8");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(8, credit));
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ ITopic topic = session.GetTopic("myTopic");
+ IMessageConsumer consumer =
session.CreateDurableConsumer(topic, "durableName");
+
+ testPeer.ExpectDetach(expectClosed: false, sendResponse: true,
replyClosed: false);
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditQueueBrowserPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = EstablishConnection(testPeer,
"nms.prefetchPolicy.queueBrowserPrefetch=9");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach(Assert.IsNotNull,
Assert.IsNotNull, Assert.IsNotNull, true);
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(9, credit));
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IQueueBrowser consumer = session.CreateBrowser(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+
+ // To cause actual creation of consumer, after iteration
consumer would be closed
+ foreach (var o in consumer)
+ {
+ }
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+
+ [Test, Timeout(20_000)]
public void TestRemotelyCloseConsumer()
{
Mock<INmsConnectionListener> mockConnectionListener = new
Mock<INmsConnectionListener>();
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 42919cc..6d53c45 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -23,6 +23,7 @@ using Amqp.Framing;
using Amqp.Types;
using Apache.NMS;
using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
using Moq;
using NLog;
using NMS.AMQP.Test.TestAmqp;
@@ -617,7 +618,7 @@ namespace NMS.AMQP.Test.Integration
finalPeer.ExpectBegin();
finalPeer.ExpectBegin();
finalPeer.ExpectReceiverAttach();
- finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse:
false, creditMatcher: credit => Assert.AreEqual(credit, 200));
+ finalPeer.ExpectLinkFlow(drain: false, sendDrainFlowResponse:
false, creditMatcher: credit =>
Assert.AreEqual(NmsConnectionInfo.DEFAULT_PREFETCH_POLICY.QueuePrefetch,
credit));
finalPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
finalPeer.ExpectClose();
diff --git
a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
index 542f720..0230f44 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/NMSConsumerIntegrationTest.cs
@@ -58,6 +58,131 @@ namespace NMS.AMQP.Test.Integration
}
}
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditAll()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = EstablishNMSContext(testPeer,
"nms.prefetchPolicy.all=5");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(5, credit));
+
+ IQueue queue = context.GetQueue("myQueue");
+ INMSConsumer consumer = context.CreateConsumer(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditQueuePrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = EstablishNMSContext(testPeer,
"nms.prefetchPolicy.queuePrefetch=6");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(6, credit));
+
+ IQueue queue = context.GetQueue("myQueue");
+ INMSConsumer consumer = context.CreateConsumer(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = EstablishNMSContext(testPeer,
"nms.prefetchPolicy.topicPrefetch=7");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(7, credit));
+
+ ITopic topic = context.GetTopic("myTopic");
+ INMSConsumer consumer = context.CreateConsumer(topic);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditDurableTopicPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = EstablishNMSContext(testPeer,
"nms.prefetchPolicy.durableTopicPrefetch=8");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(8, credit));
+
+ ITopic topic = context.GetTopic("myTopic");
+ INMSConsumer consumer = context.CreateDurableConsumer(topic,
"durableName");
+
+ testPeer.ExpectDetach(expectClosed: false, sendResponse: true,
replyClosed: false);
+ testPeer.ExpectEnd();
+ consumer.Close();
+
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestConsumerCreditQueueBrowserPrefetch()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ INMSContext context = EstablishNMSContext(testPeer,
"nms.prefetchPolicy.queueBrowserPrefetch=9");
+ testPeer.ExpectBegin();
+ testPeer.ExpectReceiverAttach(Assert.IsNotNull,
Assert.IsNotNull, Assert.IsNotNull, true);
+ testPeer.ExpectLinkFlow(false, false, credit =>
Assert.AreEqual(9, credit));
+
+ IQueue queue = context.GetQueue("myQueue");
+ IQueueBrowser consumer = context.CreateBrowser(queue);
+
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: true,
replyClosed: true);
+ testPeer.ExpectEnd();
+
+ // To cause actual creation of consumer, after iteration
consumer would be closed
+ foreach (var o in consumer)
+ {
+ }
+
+ testPeer.ExpectClose();
+ context.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
+
// TODO No connection Listener in context
// [Test, Timeout(20_000)]
// public void TestRemotelyCloseConsumer()