This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
     new 7ed319e99 AMQ-9187 - Queue Advisory message not sent - a test case 
which demonstrates the bug.   - the first test case 
sendDelayedMessage_usingNormalProducer works fine because it is using a normal 
named JMS MessageProducer. Included just for comparison purposes.   - the 2nd 
test case sendDelayedMessage_usingAnonymousProducer shows the bug. - the bug 
fix: swap the order of the AdvisoryBroker and SchedulerBroker BrokerFilters.   
- make AdvisoryBroker come after SchedulerBroker but [...]
7ed319e99 is described below

commit 7ed319e99d39b820a7ddfa6f5548f2ccec72c06d
Author: Martin Devlin <[email protected]>
AuthorDate: Mon Jan 2 21:20:58 2023 -0800

    AMQ-9187 - Queue Advisory message not sent
    - a test case which demonstrates the bug.
      - the first test case sendDelayedMessage_usingNormalProducer works fine 
because it is using a normal named JMS MessageProducer. Included just for 
comparison purposes.
      - the 2nd test case sendDelayedMessage_usingAnonymousProducer shows the 
bug.
    - the bug fix: swap the order of the AdvisoryBroker and SchedulerBroker 
BrokerFilters.
      - make AdvisoryBroker come after SchedulerBroker but before RegionBroker
      - this ensures that when a delayed message gets eventually forwarded to 
the RegionBroker, the RegionBroker will "see" the AdvisoryBroker when it 
invokes 'addDestination'. Thus, the AdvisoryBroker gets to send out the 
advisory message as expected.
    
    (cherry picked from commit 573ae71ec9197ae7392941239912bd62e7db2c87)
---
 .../org/apache/activemq/broker/BrokerService.java  |   7 +-
 .../JobSchedulerWithAdvisoryMessageTest.java       | 160 +++++++++++++++++++++
 2 files changed, 164 insertions(+), 3 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 8c969571c..405e88902 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2427,6 +2427,10 @@ public class BrokerService implements Service {
      * @throws IOException
      */
     protected Broker addInterceptors(Broker broker) throws Exception {
+        if (isAdvisorySupport()) {
+            // AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker
+            broker = new AdvisoryBroker(broker);
+        }
         if (isSchedulerSupport()) {
             SchedulerBroker sb = new SchedulerBroker(this, broker, 
getJobSchedulerStore());
             sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
@@ -2453,9 +2457,6 @@ public class BrokerService implements Service {
                         + e.getMessage(), e);
             }
         }
-        if (isAdvisorySupport()) {
-            broker = new AdvisoryBroker(broker);
-        }
         broker = new CompositeDestinationBroker(broker);
         broker = new TransactionBroker(broker, 
getPersistenceAdapter().createTransactionStore());
         if (isPopulateJMSXUserID()) {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
new file mode 100644
index 000000000..86b3caa21
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.DestinationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Ensure that when a queue gets created for the first time, by writing a 
message to the queue which has the
+ * {@link ScheduledMessage#AMQ_SCHEDULED_DELAY} header, that the Queue 
Advisory message gets published to the
+ * {@link AdvisorySupport#QUEUE_ADVISORY_TOPIC} topic.
+ *
+ * See https://issues.apache.org/jira/browse/AMQ-9187
+ */
+public class JobSchedulerWithAdvisoryMessageTest extends 
JobSchedulerTestSupport {
+
+    final AtomicLong uniqueQueueId = new 
AtomicLong(System.currentTimeMillis());
+
+    private Connection connection;
+    private Session session;
+
+    /**
+     * The queues that got created according to the Queue Advisory Topic. See 
{@link AdvisorySupport#QUEUE_ADVISORY_TOPIC}
+     */
+    private List<String> queuesCreated;
+
+    @Before
+    public void setupQueueCreationObserver() throws Exception {
+        assertTrue(broker.isAdvisorySupport()); // ensure Advisory Support is 
turned on
+
+        connection = createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        queuesCreated = new ArrayList<>();
+
+        // register to listen to a Queue Advisory
+        MessageConsumer consumer = 
session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
+        consumer.setMessageListener((Message msg)->{
+            ActiveMQMessage activeMessage = (ActiveMQMessage) msg;
+            Object command = activeMessage.getDataStructure();
+            if (command instanceof DestinationInfo) {
+                DestinationInfo destinationInfo = (DestinationInfo) command;
+                String physicalName = 
destinationInfo.getDestination().getPhysicalName();
+                if (destinationInfo.isAddOperation()) {
+                    queuesCreated.add(physicalName);
+                }
+            }
+        });
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        session.close();
+        connection.close();
+        super.tearDown(); // clean up the broker
+    }
+
+    @Test
+    public void sendDelayedMessage_usingNormalProducer() throws Exception {
+        // send delayed message using a named (i.e. not anonymous) jms producer
+        final String queueName = getNewQueueName();
+        final Queue destination = session.createQueue(queueName);
+        delay(200);
+        assertFalse(queuesCreated.contains(queueName)); // we do not expect 
the queue to be created yet.
+
+        MessageProducer producer = session.createProducer(destination);
+        // The act of creating the jms producer actually creates the empty 
queue inside the broker
+        // - so the queue already exists before we even send the first message 
to it. The Advisory message will get
+        // sent immediately because a new queue was just created.
+        delay(200);
+        assertTrue(queuesCreated.contains(queueName));
+
+        // send delayed message
+        producer.send( createDelayedMessage() );
+
+        // obviously this is still true as the queue was created before we 
even sent the delayed message
+        assertTrue(queuesCreated.contains(queueName));
+    }
+
+    /**
+     * See https://issues.apache.org/jira/browse/AMQ-9187
+     */
+    @Test
+    public void sendDelayedMessage_usingAnonymousProducer() throws Exception {
+        final String queueName = getNewQueueName();
+        Queue destination = session.createQueue(queueName);
+        delay(200);
+        assertFalse(queuesCreated.contains(queueName)); // we do not expect 
the queue to be created yet.
+
+        // an "Anonymous Producer" isn't bound to a single queue. It can be 
used for sending messages to any queue.
+        MessageProducer anonymousProducer = session.createProducer(null);
+        // creation of an anonymous producer does *not* cause any advisory 
message to be sent. This is expected.
+        delay(200);
+        assertFalse(queuesCreated.contains(queueName));
+
+        // send delayed message. The queue will get created on-the-fly as we 
write the first message to it.
+        // - but the queue doesn't get created immediately because the delayed 
message is first stored in
+        //   the JobSchedulerStore. After the delay timeout is reached, then 
the message gets moved into the real
+        //   queue. This is when the queue is actually created.
+        anonymousProducer.send(destination, createDelayedMessage() );
+        delay(500);  // the message was delayed for only 5ms so 500ms should 
be long enough
+
+        // The Advisory message should be sent because the queue was created
+        assertTrue(queuesCreated.contains(queueName));
+    }
+
+    private Message createDelayedMessage() throws JMSException {
+        TextMessage message = session.createTextMessage("delayed message");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5); // 
use short delay for unit test
+        return message;
+    }
+
+    private String getNewQueueName() {
+        return "queue-" + uniqueQueueId.getAndIncrement();
+    }
+
+    public static void delay(long delayMs) {
+        try {
+            Thread.sleep(delayMs);
+        } catch (InterruptedException ex) {
+        }
+    }
+}

Reply via email to