[
https://issues.apache.org/jira/browse/ARTEMIS-4047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Justin Bertram resolved ARTEMIS-4047.
-------------------------------------
Resolution: Cannot Reproduce
> Artemis does not send message to consumer AMQP
> ----------------------------------------------
>
> Key: ARTEMIS-4047
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4047
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: AMQP, Broker
> Affects Versions: 2.25.0, 2.26.0
> Reporter: daves
> Priority: Major
> Attachments: 1.PNG, 2.PNG, 3.PNG, 4.PNG, 5.PNG, All.zip
>
>
> The broker does not send messages from one of many existing queues to the
> connected consumer.
> According to the UI the queue does contain ~15k messages.
> I’m not able to consume any of these messages. I also tried to read a message
> using the browse function of the UI/console but that does not work eighter.
> The message was created by a AMQP client and should be consumed by another
> AMQP client.
> I tried to capture the situation in a few screenshots…
> I don’t know which data can help you to understand the situation, so I’ve
> collected everything:
> * Logs
> * Broker
> * Data
> Please let me know if there are any other data I should add to the ticket.
> I don’t think that the code of my client is relevant since the problem only
> exist for a single queue…but here it is anyway:
> {code:java}
> using Amqp;
> using Amqp.Framing;
> using Amqp.Types;
> namespace Test;
> public sealed class MessageConsumer
> {
> private readonly String _address;
> private readonly CancellationToken _cancellationToken;
> private readonly String _consumerName;
> private readonly String[] _destinations;
> public MessageConsumer( String address, String consumerName, String[]
> destinations, CancellationToken cancellationToken )
> {
> _address = address;
> _consumerName = consumerName;
> _destinations = destinations;
> _cancellationToken = cancellationToken;
> }
> public async Task StartReceivingMessages()
> {
> await Task.Yield();
> while ( !_cancellationToken.IsCancellationRequested )
> {
> var connectionFactory = new ConnectionFactory();
> var address = new Address( _address );
> try
> {
> var connection = await connectionFactory.CreateAsync( address
> );
> var session = ( (IConnection) connection ).CreateSession();
> var receivers = new List<IReceiverLink>();
> foreach ( var destination in _destinations )
> {
> var receiver = session.CreateReceiver(
> $"{_consumerName}_{destination}",
> new Source
> {
> Address =
> destination,
> Capabilities =
> new[] { new Symbol( "queue" ) }
> } );
> receivers.Add( receiver );
> }
> while ( !_cancellationToken.IsCancellationRequested )
> foreach ( var receiver in receivers )
> {
> // ReceiveAsync( TimeSpan.Zero ); blocks forever and
> no messages will be received
> var message = await receiver.ReceiveAsync(
> TimeSpan.FromMilliseconds( 1 ) );
> if ( message == null )
> continue;
> receiver.Accept( message );
> Console.WriteLine( $"{_consumerName} - Received
> message with id: '{message.Properties.MessageId}'" );
> }
> }
> catch ( Exception ex )
> {
> Console.WriteLine( $"{_consumerName} - Connection error in
> producer '{_consumerName}' {ex.Message} => create new connection." );
> await Task.Delay( 1000, CancellationToken.None );
> }
> }
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)