Author: tabish
Date: Thu Aug 13 18:01:06 2015
New Revision: 1695745
URL: http://svn.apache.org/r1695745
Log:
Merge fixes for
AMQNET-505
AMQNET-506
AMQNET-507
AMQNET-508
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/ (props
changed)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 13 18:01:06 2015
@@ -1,4 +1,4 @@
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874,1177390,1177395,1186568,1187123,1238881,1293360,1294890,1295257,1311395,1312026,1374469,1375295,1376782
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:1689517
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:1689517,1695609-1695737
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
Thu Aug 13 18:01:06 2015
@@ -507,7 +507,7 @@ namespace Apache.NMS.ActiveMQ
this.session.Scheduler.Cancel(this.optimizedAckTask);
}
- if (this.session.IsClientAcknowledge)
+ if (this.session.IsClientAcknowledge ||
this.session.IsIndividualAcknowledge)
{
if (!this.info.Browser)
{
@@ -1035,7 +1035,7 @@ namespace Apache.NMS.ActiveMQ
{
return null;
}
- else if(!IgnoreExpiration && dispatch.Message.IsExpired())
+ else if(ConsumeExpiredMessage(dispatch))
{
Tracer.DebugFormat("Consumer[{0}] received expired
message: {1}",
ConsumerId, dispatch.Message.MessageId);
@@ -1058,6 +1058,8 @@ namespace Apache.NMS.ActiveMQ
timeout = deadline - dispatchTime;
}
}
+
+ SendPullRequest((long) timeout.TotalMilliseconds);
}
else if (RedeliveryExceeded(dispatch))
{
@@ -1065,6 +1067,25 @@ namespace Apache.NMS.ActiveMQ
ConsumerId, dispatch);
PosionAck(dispatch, "dispatch to " + ConsumerId + "
exceeds redelivery " +
"policy limit:" +
redeliveryPolicy.MaximumRedeliveries);
+
+ // Refresh the dispatch time
+ dispatchTime = DateTime.Now;
+
+ if(timeout > TimeSpan.Zero &&
!this.unconsumedMessages.Closed)
+ {
+ if(dispatchTime > deadline)
+ {
+ // Out of time.
+ timeout = TimeSpan.Zero;
+ }
+ else
+ {
+ // Adjust the timeout to the remaining time.
+ timeout = deadline - dispatchTime;
+ }
+ }
+
+ SendPullRequest((long) timeout.TotalMilliseconds);
}
else
{
@@ -1073,6 +1094,16 @@ namespace Apache.NMS.ActiveMQ
}
}
+ private bool ConsumeExpiredMessage(MessageDispatch dispatch)
+ {
+ if (dispatch.Message.IsExpired())
+ {
+ return !info.Browser && !IgnoreExpiration;
+ }
+
+ return false;
+ }
+
public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
{
dispatch.DeliverySequenceId = session.NextDeliveryId;
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
Thu Aug 13 18:01:06 2015
@@ -260,5 +260,62 @@ namespace Apache.NMS.ActiveMQ.Test
Assert.IsNull(msg);
session.Close();
}
+
+ [Test]
+ public void TestIndividualAcksWithClosedConsumerAndAuditSync()
+ {
+ const int MSG_COUNT = 20;
+ const string QUEUE_NAME =
"TEST.TestIndividualAcksWithClosedConsumerAndAuditSync";
+
+ ProduceSomeMessages(MSG_COUNT, QUEUE_NAME);
+
+ string uri = "failover:(tcp://${activemqhost}:61616)";
+ IConnectionFactory factory = new
ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+
+ using (IConnection connection = factory.CreateConnection() as
Connection)
+ using (ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+ using (IQueue queue = session.GetQueue(QUEUE_NAME))
+ {
+ connection.Start();
+
+ // Consume all messages with no ACK
+ using (IMessageConsumer consumer =
session.CreateConsumer(queue))
+ {
+ for (int i = 0; i < MSG_COUNT; ++i)
+ {
+ IMessage message =
consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(message);
+ Tracer.DebugFormat("Received message: {0}",
message.NMSMessageId);
+ }
+ }
+
+ // Consumer the same batch again.
+ using (IMessageConsumer consumer =
session.CreateConsumer(queue))
+ {
+ for (int i = 0; i < MSG_COUNT; ++i)
+ {
+ IMessage message =
consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.NotNull(message);
+ Tracer.DebugFormat("Received message: {0}",
message.NMSMessageId);
+ }
+ }
+
+ session.DeleteDestination(queue);
+ }
+ }
+
+ private void ProduceSomeMessages(int count, string queueName)
+ {
+ using (IConnection connection = CreateConnection())
+ using (ISession session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+ using (IQueue queue = session.GetQueue(queueName))
+ using (IMessageProducer producer = session.CreateProducer(queue))
+ {
+ for (int i = 0; i < count; ++i)
+ {
+ producer.Send(session.CreateMessage());
+ }
+ }
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
Thu Aug 13 18:01:06 2015
@@ -205,6 +205,80 @@ namespace Apache.NMS.ActiveMQ.Test
IQueueBrowser browser = session.CreateBrowser(queue);
browser.Close();
}
- }
+ }
+
+ [Test]
+ public void TestBrowsingExpiration()
+ {
+ const int MESSAGES_TO_SEND = 50;
+ const string QUEUE_NAME = "TEST.TestBrowsingExpiration";
+
+ // Browse the queue.
+ using (Connection connection = CreateConnection() as Connection)
+ using (ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (IQueue queue = session.GetQueue(QUEUE_NAME))
+ {
+ session.DeleteDestination(queue);
+
+ SendTestMessages(MESSAGES_TO_SEND, QUEUE_NAME);
+
+ connection.Start();
+ int browsed = Browse(QUEUE_NAME, connection);
+
+ // The number of messages browsed should be equal to the
number of
+ // messages sent.
+ Assert.AreEqual(MESSAGES_TO_SEND, browsed);
+
+ // Broker expired message period is 30 seconds by default
+ for (int i = 0; i < 12; ++i)
+ {
+ Thread.Sleep(5000);
+ browsed = Browse(QUEUE_NAME, connection);
+ }
+
+ session.DeleteDestination(session.GetQueue(QUEUE_NAME));
+
+ Assert.AreEqual(0, browsed);
+ }
+ }
+
+ private int Browse(String queueName, Connection connection)
+ {
+ int browsed = 0;
+
+ using (ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (IQueue queue = session.GetQueue(queueName))
+ using (IQueueBrowser browser = session.CreateBrowser(queue))
+ {
+ IEnumerator enumeration = browser.GetEnumerator();
+ while (enumeration.MoveNext())
+ {
+ ITextMessage message = enumeration.Current as ITextMessage;
+ Tracer.DebugFormat("Browsed message: {0}",
message.NMSMessageId);
+ browsed++;
+ }
+ }
+
+ return browsed;
+ }
+
+ protected void SendTestMessages(int count, String queueName)
+ {
+ // Send the messages to the Queue.
+ using (Connection connection = CreateConnection() as Connection)
+ using (ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (IQueue queue = session.GetQueue(queueName))
+ using (IMessageProducer producer = session.CreateProducer(queue))
+ {
+ for (int i = 1; i <= count; i++)
+ {
+ String msgStr = "Message: " + i;
+ producer.Send(session.CreateTextMessage(msgStr),
+ MsgDeliveryMode.NonPersistent,
+ MsgPriority.Normal,
+ TimeSpan.FromMilliseconds(1500));
+ }
+ }
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs
Thu Aug 13 18:01:06 2015
@@ -154,6 +154,54 @@ namespace Apache.NMS.ActiveMQ.Test
Assert.IsNull(answer, "Should have not received a message!");
}
+ [Test]
+ public void TestConsumerReceivePrefetchZeroRedeliveryZero()
+ {
+ const string QUEUE_NAME =
"TEST.TestConsumerReceivePrefetchZeroRedeliveryZero";
+
+ using (Connection connection = CreateConnection() as Connection)
+ using (ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using (IQueue queue = session.GetQueue(QUEUE_NAME))
+ {
+ session.DeleteDestination(queue);
+
+ using (IMessageProducer producer =
session.CreateProducer(queue))
+ {
+ ITextMessage textMessage = session.CreateTextMessage("test
Message");
+ producer.Send(textMessage);
+ }
+ }
+
+ // consume and rollback - increase redelivery counter on message
+ using (Connection connection = CreateConnection() as Connection)
+ using (ISession session =
connection.CreateSession(AcknowledgementMode.Transactional))
+ using (IQueue queue = session.GetQueue(QUEUE_NAME))
+ using (IMessageConsumer consumer = session.CreateConsumer(queue))
+ {
+ connection.Start();
+ IMessage message =
consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.IsNotNull(message);
+ session.Rollback();
+ }
+
+ // try consume with timeout - expect it to timeout and return NULL
message
+ using (Connection connection = CreateConnection() as Connection)
+ {
+ connection.PrefetchPolicy.All = 0;
+ connection.RedeliveryPolicy.MaximumRedeliveries = 0;
+ connection.Start();
+
+ ISession session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IQueue queue = session.GetQueue(QUEUE_NAME);
+
+ using (IMessageConsumer consumer =
session.CreateConsumer(queue))
+ {
+ IMessage message =
consumer.Receive(TimeSpan.FromMilliseconds(3000));
+ Assert.IsNull(message);
+ }
+ }
+ }
+
[SetUp]
public override void SetUp()
{