[ https://issues.apache.org/jira/browse/ARTEMIS-3110?focusedWorklogId=970975&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-970975 ]
ASF GitHub Bot logged work on ARTEMIS-3110: ------------------------------------------- Author: ASF GitHub Bot Created on: 28/May/25 18:03 Start Date: 28/May/25 18:03 Worklog Time Spent: 10m Work Description: tabish121 opened a new pull request, #5713: URL: https://github.com/apache/activemq-artemis/pull/5713 Ensure that outstanding credit is reduced when an openwire pull consumer issues a pull command and no message is dispatched. Pull consumers should only have an open credit window while the pull command window is open. Issue Time Tracking ------------------- Worklog Id: (was: 970975) Remaining Estimate: 0h Time Spent: 10m > PullMessage timeout is ignored > ------------------------------ > > Key: ARTEMIS-3110 > URL: https://issues.apache.org/jira/browse/ARTEMIS-3110 > Project: ActiveMQ Artemis > Issue Type: Bug > Components: OpenWire > Affects Versions: 2.16.0 > Reporter: Svetlana Undalova > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > I ran into an issue trying to use Apache Artemis (2.16). > I have slow consumers and went with prefetch size = 0 which according to docs > results in pulling messages by the client. All works well if consumer starts > after messages are already in the queue, but if it becomes idle after a while > and manages to call consumer.Receive() N times then next time when messages > appear in the queue I can see that all N are sent to the client (via checking > UnconsumedMessageCount) > I checked the same set up with ActiveMQ (5.16) and there both scenarios work > correctly. I think that issue may happen because Artemis broker ignores > PullMessage timeout. > Here is my code > > {code:java} > static async Task Main(string[] args) > { > var connectionfactory = new > Apache.NMS.ActiveMQ.ConnectionFactory("tcp://localhost:61616?connection.PrefetchPolicy.queuePrefetch=0"); > > var slowConsumer = ConsumerProcess("slow consumer", > TimeSpan.FromSeconds(120), connectionfactory); > var fastConsumer = ConsumerProcess("fast consumer", > TimeSpan.FromSeconds(10), connectionfactory); > > // To make sure several pull requests are sent > await Task.Delay(TimeSpan.FromSeconds(5)); > > // Adding messages > using (var connection = connectionfactory.CreateConnection("user", > "password")) { > connection.Start(); > using (var session = connection.CreateSession(acknowledgementMode: > Apache.NMS.AcknowledgementMode.AutoAcknowledge)) > { > var destination = > Apache.NMS.Util.SessionUtil.GetDestination(session, "queue://cookies"); > using (var producer = session.CreateProducer(destination)) > { > for (int i = 0; i < 10; i++) > { > var message = producer.CreateTextMessage(); > message.Text = > $"{i}_{DateTimeOffset.Now}_{Guid.NewGuid()}"; > Console.WriteLine($"{DateTimeOffset.Now}\tSending > message {message.Text}"); > producer.Send(message); > } > } > } > } > Console.WriteLine("Press enter to exit"); > Console.ReadLine(); > } > private static async Task ConsumerProcess(string name, TimeSpan > processingTime, Apache.NMS.IConnectionFactory connectionfactory) > { > using (var connection = connectionfactory.CreateConnection("user", > "password")) { > connection.Start(); > using (var session = connection.CreateSession(acknowledgementMode: > Apache.NMS.AcknowledgementMode.IndividualAcknowledge)) > { > var destination = > Apache.NMS.Util.SessionUtil.GetDestination(session, "queue://cookies"); > using (var consumer = session.CreateConsumer(destination)) > { > while (true) > { > var message = > consumer.Receive(TimeSpan.FromSeconds(1)); > if (message is Apache.NMS.ITextMessage textMessage) > > { > Console.WriteLine($"{DateTimeOffset.Now}\t{name} > received message - {textMessage.Text}"); > Console.WriteLine($"{DateTimeOffset.Now}\t{name} > unconsumed messages after receive - > {((Apache.NMS.ActiveMQ.MessageConsumer)consumer).UnconsumedMessageCount}"); > await Task.Delay(processingTime); > > message.Acknowledge(); > } > else > { > await Task.Delay(TimeSpan.FromSeconds(1)); > } > } > } > } > } > } > {code} > > > Here is output > {code:java} > 05.02.2021 16:04:55 +01:00 Sending message 5_05.02.2021 16:04:55 > +01:00_97a6857b-d003-4c63-ba11-16a6825e6595 > 05.02.2021 16:04:55 +01:00 Sending message 6_05.02.2021 16:04:55 > +01:00_4581b335-2483-42ac-a9c5-fe530a870ee1 > 05.02.2021 16:04:55 +01:00 Sending message 7_05.02.2021 16:04:55 > +01:00_2ad2c9d4-aab6-47be-b51c-a1faacd6a03c > 05.02.2021 16:04:55 +01:00 Sending message 8_05.02.2021 16:04:55 > +01:00_866bca6b-9237-491b-981f-d73ae7c025fb > 05.02.2021 16:04:55 +01:00 Sending message 9_05.02.2021 16:04:55 > +01:00_a60b926c-96b2-4989-8f5f-ec85e5cfb071 > Press enter to exit > 05.02.2021 16:04:56 +01:00 slow consumer received message - 0_05.02.2021 > 16:04:55 +01:00_ba96ec80-3008-4c95-987c-5f82a1aaa239 > 05.02.2021 16:04:56 +01:00 slow consumer unconsumed messages after > receive - 3 > 05.02.2021 16:05:06 +01:00 fast consumer received message - 3_05.02.2021 > 16:04:55 +01:00_34df251c-da75-4145-8da4-efeecea3318d > 05.02.2021 16:05:06 +01:00 fast consumer unconsumed messages after > receive - 2 > 05.02.2021 16:05:17 +01:00 fast consumer received message - 5_05.02.2021 > 16:04:55 +01:00_97a6857b-d003-4c63-ba11-16a6825e6595 > 05.02.2021 16:05:17 +01:00 fast consumer unconsumed messages after > receive - 1 > 05.02.2021 16:05:28 +01:00 fast consumer received message - 7_05.02.2021 > 16:04:55 +01:00_2ad2c9d4-aab6-47be-b51c-a1faacd6a03c > 05.02.2021 16:05:28 +01:00 fast consumer unconsumed messages after > receive - 0 > 05.02.2021 16:05:39 +01:00 fast consumer received message - 8_05.02.2021 > 16:04:55 +01:00_866bca6b-9237-491b-981f-d73ae7c025fb > 05.02.2021 16:05:39 +01:00 fast consumer unconsumed messages after > receive - 0 > 05.02.2021 16:05:50 +01:00 fast consumer received message - 9_05.02.2021 > 16:04:55 +01:00_a60b926c-96b2-4989-8f5f-ec85e5cfb071 > 05.02.2021 16:05:50 +01:00 fast consumer unconsumed messages after > receive - 0 > 05.02.2021 16:06:57 +01:00 slow consumer received message - 2_05.02.2021 > 16:04:55 +01:00_86962504-1b4b-4f2e-b359-585e535042e9 > 05.02.2021 16:06:57 +01:00 slow consumer unconsumed messages after > receive - 2 > 05.02.2021 16:08:58 +01:00 slow consumer received message - 4_05.02.2021 > 16:04:55 +01:00_2d7ce61f-0240-4278-a779-936fd4f3a3e7 > 05.02.2021 16:08:58 +01:00 slow consumer unconsumed messages after > receive - 1 > 05.02.2021 16:10:59 +01:00 slow consumer received message - 6_05.02.2021 > 16:04:55 +01:00_4581b335-2483-42ac-a9c5-fe530a870ee1 > 05.02.2021 16:10:59 +01:00 slow consumer unconsumed messages after > receive - 0 > > {code} > As you may see consumers received more than 1 message (according to > unconsumed message count which is amount of messages that were sent to the > client). I was playing with it and usually amount of messages received are > equals to amount of Receive() calls I made. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@activemq.apache.org For additional commands, e-mail: issues-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact