[ 
https://issues.apache.org/jira/browse/ARTEMIS-2375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861241#comment-16861241
 ] 

Justin Bertram edited comment on ARTEMIS-2375 at 6/11/19 6:14 PM:
------------------------------------------------------------------

bq. I can understand how the unit test would lead you to believe that since the 
API requires a queue in order to create a consumer.

It's not just that the API requires a queue in order to create a consumer. It's 
the way the wildcard address feature was designed. 

Remember, it's a wildcard *address*, not a wildcard *consumer*. Messages sent 
to any address matching the wildcard address will be routed to the wildcard 
address' queues and those messages are acknowledged independently of the 
messages routed to the particular address' queues where they were originally 
sent.

bq. I think that creating a brand new queue in order to use a wildcard and 
receive messages from other queues is the bug.

That's not a bug. That's how it was designed to work.

bq. It has essentially created a memory leak in which the messages from the 
originating queues are never cleaned up. Memory usage on the artemis box slowly 
creeps up until it's consumed all memory and the server crashes.

If you don't want messages to accumulate in the individual queues on the 
addresses that match the wildcard then simply remove those queues. An address 
with no queue is a valid configuration. Here's a example:

{code:java}
   @Test
   public void testAsyncConsumerWildcardAck() throws Exception {
      ActiveMQServer server = createServer(false);
      server.start();
      ServerLocator locator = 
createInVMNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
      ClientSessionFactory cf = createSessionFactory(locator);
      ClientSession sendSession = cf.createSession(false, true, true);
      final ClientSession session = cf.createSession(false, true, true);

      SimpleString addressAB = new SimpleString("a.b");
      SimpleString addressAC = new SimpleString("a.c");
      SimpleString address = new SimpleString("a.#");
      SimpleString queueName = new SimpleString("Q");
      session.createAddress(addressAB, RoutingType.ANYCAST, false);
      session.createAddress(addressAC, RoutingType.ANYCAST, false);
      session.createQueue(address, RoutingType.ANYCAST, queueName, null, false);
      ClientProducer producer = session.createProducer(addressAB);
      ClientProducer producer2 = session.createProducer(addressAC);
      ClientConsumer clientConsumer = session.createConsumer(queueName);

      producer.send(sendSession.createMessage(false));
      producer2.send(sendSession.createMessage(false));
      final CountDownLatch latch = new CountDownLatch(2);
      session.start();
      clientConsumer.setMessageHandler(new MessageHandler() {
         @Override
         public void onMessage(final ClientMessage message) {
            try {
               message.acknowledge();
            } catch (ActiveMQException e) {
               try {
                  session.close();
               } catch (ActiveMQException e1) {
                  e1.printStackTrace();
               }
            }
            latch.countDown();
         }
      });
      Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
      Queue q = (Queue) 
server.getPostOffice().getBinding(queueName).getBindable();
      Assert.assertEquals(2, q.getMessagesAdded());
      Assert.assertEquals(2, q.getMessagesAcknowledged());
      sendSession.close();
      session.close();
      Assert.assertEquals(0, server.getTotalMessageCount());
   }
{code}

bq. At least one other person has run into this before: 
http://activemq.2283324.n4.nabble.com/Artemis-Wild-card-messages-received-but-not-acknowledged-td4728617.html

It looks like he had the same misunderstanding that you did. I followed up with 
the user on the mailing list.


was (Author: jbertram):
bq. I can understand how the unit test would lead you to believe that since the 
API requires a queue in order to create a consumer.

It's not just that the API requires a queue in order to create a consumer. It's 
the way the wildcard address feature was designed. 

Remember, it's a wildcard *address*, not a wildcard *consumer*. Messages sent 
to any address matching the wildcard address will be routed to the wildcard 
address' queues and those messages are acknowledged independently of the 
messages routed to the particular address' queues where they were originally 
sent.

bq. I think that creating a brand new queue in order to use a wildcard and 
receive messages from other queues is the bug.

That's not a bug. That's how it was designed to work.

bq. It has essentially created a memory leak in which the messages from the 
originating queues are never cleaned up. Memory usage on the artemis box slowly 
creeps up until it's consumed all memory and the server crashes.

If you don't want messages to accumulate in the individual queues on the 
addresses that match the wildcard then simply remove them. An address with no 
queue is a valid configuration. Here's a example:

{code:java}
   @Test
   public void testAsyncConsumerWildcardAck() throws Exception {
      ActiveMQServer server = createServer(false);
      server.start();
      ServerLocator locator = 
createInVMNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
      ClientSessionFactory cf = createSessionFactory(locator);
      ClientSession sendSession = cf.createSession(false, true, true);
      final ClientSession session = cf.createSession(false, true, true);

      SimpleString addressAB = new SimpleString("a.b");
      SimpleString addressAC = new SimpleString("a.c");
      SimpleString address = new SimpleString("a.#");
      SimpleString queueName = new SimpleString("Q");
      session.createAddress(addressAB, RoutingType.ANYCAST, false);
      session.createAddress(addressAC, RoutingType.ANYCAST, false);
      session.createQueue(address, RoutingType.ANYCAST, queueName, null, false);
      ClientProducer producer = session.createProducer(addressAB);
      ClientProducer producer2 = session.createProducer(addressAC);
      ClientConsumer clientConsumer = session.createConsumer(queueName);

      producer.send(sendSession.createMessage(false));
      producer2.send(sendSession.createMessage(false));
      final CountDownLatch latch = new CountDownLatch(2);
      session.start();
      clientConsumer.setMessageHandler(new MessageHandler() {
         @Override
         public void onMessage(final ClientMessage message) {
            try {
               message.acknowledge();
            } catch (ActiveMQException e) {
               try {
                  session.close();
               } catch (ActiveMQException e1) {
                  e1.printStackTrace();
               }
            }
            latch.countDown();
         }
      });
      Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
      Queue q = (Queue) 
server.getPostOffice().getBinding(queueName).getBindable();
      Assert.assertEquals(2, q.getMessagesAdded());
      Assert.assertEquals(2, q.getMessagesAcknowledged());
      sendSession.close();
      session.close();
      Assert.assertEquals(0, server.getTotalMessageCount());
   }
{code}

bq. At least one other person has run into this before: 
http://activemq.2283324.n4.nabble.com/Artemis-Wild-card-messages-received-but-not-acknowledged-td4728617.html

It looks like he had the same misunderstanding that you did.

> JMS, Wildcard destination consumer, and Acknowledgements going to wrong queue
> -----------------------------------------------------------------------------
>
>                 Key: ARTEMIS-2375
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2375
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.9.0
>            Reporter: Craig Schmidt
>            Assignee: Justin Bertram
>            Priority: Major
>
> I have an ActiveMQ server set up where I have multiple (and unbounded) queues 
> which differ only in the last component. e.g. myqueue.1, myqueue.2, 
> myqueue.4, etc. The last component is from a database that will have a 
> varying set of customers defined - and I want to set up one queue for each 
> customer. (I want a separate queue - the third party we're talking to needs 
> throttling at our 'customer' level. 
> The address setting looks like this in broker.xml: 
> {code:xml}
> <address-setting match="myqueue.#">
>   <dead-letter-address>myqueue.DLQ</dead-letter-address>
>   <expiry-address>myqueue.ExpiryQueue</expiry-address>
>   <redelivery-delay>500</redelivery-delay>
>   <max-size-bytes>-1</max-size-bytes>
>   <message-counter-history-day-limit>10</message-counter-history-day-limit>
>   <address-full-policy>PAGE</address-full-policy>
>   <auto-create-queues>true</auto-create-queues>
>   <auto-create-addresses>true</auto-create-addresses>
>   <auto-create-jms-queues>true</auto-create-jms-queues>
>   <auto-create-jms-topics>true</auto-create-jms-topics>
>   <max-delivery-attempts>3</max-delivery-attempts>
> </address-setting>
> {code}
>  I have a producer that creates the queue name based on the customer key, and 
> uses JmsMessagingTemplate.convertAndSent(queueName, message). I have a 
> consumer annotated like this: 
> {code:java}
> @JmsListener(destination = "myqueue.#", containerFactory = 
> "throttledLongCodeFactory")
> public void processLongCodeMessage1(Session session, Message<MessageRequest> 
> message) throws JMSException { 
>   //... do the message handling - no ActiveMQ accesses in here... 
>   session.commit();
> }
> {code}
> FWIW, here's the code for the throttledLongCodeFactory: 
> {code:java}
> @Bean public DefaultJmsListenerContainerFactory 
> throttledLongCodeFactory(DefaultJmsListenerContainerFactoryConfigurer 
> configurer) {
>   ActiveMQConnectionFactory connectionFactory = 
> createActiveMQConnectionFactory();
>   // For throttling. Used to limit the number of messages a consumer will 
> handle per second. Default is -1. 
>   Integer maxConsumerRate = 
> appProperties.getArtemis().getLongCode().getMaxConsumerRate(); 
>   if (maxConsumerRate != null) {
>     connectionFactory.setConsumerMaxRate(maxConsumerRate); 
>   } 
>   // This provides all boot's default to this factory, including the message 
> converter 
>   DefaultJmsListenerContainerFactory factory = new 
> DefaultJmsListenerContainerFactory(); 
>   configurer.configure(factory, connectionFactory);
>   return factory;
> }
> {code}
>  What I'm finding in looking at the ActiveMQ Management Console is that the 
> consumer ACK's are going to a (new) queue "myqueue.#" (i.e. literally has the 
> '#' in the name), rather than the actual source queue for each message. In 
> the consumer, I can see the actual source queue name (e.g. "myqueue.2") by 
> inspecting the ClientMessageImpl field 'address'. What I'd like is for the 
> ACK's to go to the source queue. the way it is, my specific queues are just 
> building up the number of messages they contain, which isn't doing the 
> Artemis server memory any good.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to