This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 573ae71ec AMQ-9187 - Queue Advisory message not sent - a test case 
which demonstrates the bug.   - the first test case 
sendDelayedMessage_usingNormalProducer works fine because it is using a normal 
named JMS MessageProducer. Included just for comparison purposes.   - the 2nd 
test case sendDelayedMessage_usingAnonymousProducer shows the bug. - the bug 
fix: swap the order of the AdvisoryBroker and SchedulerBroker BrokerFilters.   
- make AdvisoryBroker come after SchedulerBroker but [...]
     new 0146e338f This closes #947
573ae71ec is described below

commit 573ae71ec9197ae7392941239912bd62e7db2c87
Author: Martin Devlin <[email protected]>
AuthorDate: Mon Jan 2 21:20:58 2023 -0800

    AMQ-9187 - Queue Advisory message not sent
    - a test case which demonstrates the bug.
      - the first test case sendDelayedMessage_usingNormalProducer works fine 
because it is using a normal named JMS MessageProducer. Included just for 
comparison purposes.
      - the 2nd test case sendDelayedMessage_usingAnonymousProducer shows the 
bug.
    - the bug fix: swap the order of the AdvisoryBroker and SchedulerBroker 
BrokerFilters.
      - make AdvisoryBroker come after SchedulerBroker but before RegionBroker
      - this ensures that when a delayed message gets eventually forwarded to 
the RegionBroker, the RegionBroker will "see" the AdvisoryBroker when it 
invokes 'addDestination'. Thus, the AdvisoryBroker gets to send out the 
advisory message as expected.
---
 .../org/apache/activemq/broker/BrokerService.java  |   7 +-
 .../JobSchedulerWithAdvisoryMessageTest.java       | 160 +++++++++++++++++++++
 2 files changed, 164 insertions(+), 3 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 9a395b82b..8311379f6 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2427,6 +2427,10 @@ public class BrokerService implements Service {
      * @throws IOException
      */
     protected Broker addInterceptors(Broker broker) throws Exception {
+        if (isAdvisorySupport()) {
+            // AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker
+            broker = new AdvisoryBroker(broker);
+        }
         if (isSchedulerSupport()) {
             SchedulerBroker sb = new SchedulerBroker(this, broker, 
getJobSchedulerStore());
             sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
@@ -2453,9 +2457,6 @@ public class BrokerService implements Service {
                         + e.getMessage(), e);
             }
         }
-        if (isAdvisorySupport()) {
-            broker = new AdvisoryBroker(broker);
-        }
         broker = new CompositeDestinationBroker(broker);
         broker = new TransactionBroker(broker, 
getPersistenceAdapter().createTransactionStore());
         if (isPopulateJMSXUserID()) {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
new file mode 100644
index 000000000..695f554f8
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.DestinationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import jakarta.jms.Connection;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Ensure that when a queue gets created for the first time, by writing a 
message to the queue which has the
+ * {@link ScheduledMessage#AMQ_SCHEDULED_DELAY} header, that the Queue 
Advisory message gets published to the
+ * {@link AdvisorySupport#QUEUE_ADVISORY_TOPIC} topic.
+ *
+ * See https://issues.apache.org/jira/browse/AMQ-9187
+ */
+public class JobSchedulerWithAdvisoryMessageTest extends 
JobSchedulerTestSupport {
+
+    final AtomicLong uniqueQueueId = new 
AtomicLong(System.currentTimeMillis());
+
+    private Connection connection;
+    private Session session;
+
+    /**
+     * The queues that got created according to the Queue Advisory Topic. See 
{@link AdvisorySupport#QUEUE_ADVISORY_TOPIC}
+     */
+    private List<String> queuesCreated;
+
+    @Before
+    public void setupQueueCreationObserver() throws Exception {
+        assertTrue(broker.isAdvisorySupport()); // ensure Advisory Support is 
turned on
+
+        connection = createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        queuesCreated = new ArrayList<>();
+
+        // register to listen to a Queue Advisory
+        MessageConsumer consumer = 
session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
+        consumer.setMessageListener((Message msg)->{
+            ActiveMQMessage activeMessage = (ActiveMQMessage) msg;
+            Object command = activeMessage.getDataStructure();
+            if (command instanceof DestinationInfo) {
+                DestinationInfo destinationInfo = (DestinationInfo) command;
+                String physicalName = 
destinationInfo.getDestination().getPhysicalName();
+                if (destinationInfo.isAddOperation()) {
+                    queuesCreated.add(physicalName);
+                }
+            }
+        });
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        session.close();
+        connection.close();
+        super.tearDown(); // clean up the broker
+    }
+
+    @Test
+    public void sendDelayedMessage_usingNormalProducer() throws Exception {
+        // send delayed message using a named (i.e. not anonymous) jms producer
+        final String queueName = getNewQueueName();
+        final Queue destination = session.createQueue(queueName);
+        delay(200);
+        assertFalse(queuesCreated.contains(queueName)); // we do not expect 
the queue to be created yet.
+
+        MessageProducer producer = session.createProducer(destination);
+        // The act of creating the jms producer actually creates the empty 
queue inside the broker
+        // - so the queue already exists before we even send the first message 
to it. The Advisory message will get
+        // sent immediately because a new queue was just created.
+        delay(200);
+        assertTrue(queuesCreated.contains(queueName));
+
+        // send delayed message
+        producer.send( createDelayedMessage() );
+
+        // obviously this is still true as the queue was created before we 
even sent the delayed message
+        assertTrue(queuesCreated.contains(queueName));
+    }
+
+    /**
+     * See https://issues.apache.org/jira/browse/AMQ-9187
+     */
+    @Test
+    public void sendDelayedMessage_usingAnonymousProducer() throws Exception {
+        final String queueName = getNewQueueName();
+        Queue destination = session.createQueue(queueName);
+        delay(200);
+        assertFalse(queuesCreated.contains(queueName)); // we do not expect 
the queue to be created yet.
+
+        // an "Anonymous Producer" isn't bound to a single queue. It can be 
used for sending messages to any queue.
+        MessageProducer anonymousProducer = session.createProducer(null);
+        // creation of an anonymous producer does *not* cause any advisory 
message to be sent. This is expected.
+        delay(200);
+        assertFalse(queuesCreated.contains(queueName));
+
+        // send delayed message. The queue will get created on-the-fly as we 
write the first message to it.
+        // - but the queue doesn't get created immediately because the delayed 
message is first stored in
+        //   the JobSchedulerStore. After the delay timeout is reached, then 
the message gets moved into the real
+        //   queue. This is when the queue is actually created.
+        anonymousProducer.send(destination, createDelayedMessage() );
+        delay(500);  // the message was delayed for only 5ms so 500ms should 
be long enough
+
+        // The Advisory message should be sent because the queue was created
+        assertTrue(queuesCreated.contains(queueName));
+    }
+
+    private Message createDelayedMessage() throws JMSException {
+        TextMessage message = session.createTextMessage("delayed message");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5); // 
use short delay for unit test
+        return message;
+    }
+
+    private String getNewQueueName() {
+        return "queue-" + uniqueQueueId.getAndIncrement();
+    }
+
+    public static void delay(long delayMs) {
+        try {
+            Thread.sleep(delayMs);
+        } catch (InterruptedException ex) {
+        }
+    }
+}

Reply via email to