This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 573ae71ec AMQ-9187 - Queue Advisory message not sent - a test case
which demonstrates the bug. - the first test case
sendDelayedMessage_usingNormalProducer works fine because it is using a normal
named JMS MessageProducer. Included just for comparison purposes. - the 2nd
test case sendDelayedMessage_usingAnonymousProducer shows the bug. - the bug
fix: swap the order of the AdvisoryBroker and SchedulerBroker BrokerFilters.
- make AdvisoryBroker come after SchedulerBroker but [...]
new 0146e338f This closes #947
573ae71ec is described below
commit 573ae71ec9197ae7392941239912bd62e7db2c87
Author: Martin Devlin <[email protected]>
AuthorDate: Mon Jan 2 21:20:58 2023 -0800
AMQ-9187 - Queue Advisory message not sent
- a test case which demonstrates the bug.
- the first test case sendDelayedMessage_usingNormalProducer works fine
because it is using a normal named JMS MessageProducer. Included just for
comparison purposes.
- the 2nd test case sendDelayedMessage_usingAnonymousProducer shows the
bug.
- the bug fix: swap the order of the AdvisoryBroker and SchedulerBroker
BrokerFilters.
- make AdvisoryBroker come after SchedulerBroker but before RegionBroker
- this ensures that when a delayed message gets eventually forwarded to
the RegionBroker, the RegionBroker will "see" the AdvisoryBroker when it
invokes 'addDestination'. Thus, the AdvisoryBroker gets to send out the
advisory message as expected.
---
.../org/apache/activemq/broker/BrokerService.java | 7 +-
.../JobSchedulerWithAdvisoryMessageTest.java | 160 +++++++++++++++++++++
2 files changed, 164 insertions(+), 3 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 9a395b82b..8311379f6 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2427,6 +2427,10 @@ public class BrokerService implements Service {
* @throws IOException
*/
protected Broker addInterceptors(Broker broker) throws Exception {
+ if (isAdvisorySupport()) {
+ // AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker
+ broker = new AdvisoryBroker(broker);
+ }
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker,
getJobSchedulerStore());
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
@@ -2453,9 +2457,6 @@ public class BrokerService implements Service {
+ e.getMessage(), e);
}
}
- if (isAdvisorySupport()) {
- broker = new AdvisoryBroker(broker);
- }
broker = new CompositeDestinationBroker(broker);
broker = new TransactionBroker(broker,
getPersistenceAdapter().createTransactionStore());
if (isPopulateJMSXUserID()) {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
new file mode 100644
index 000000000..695f554f8
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.DestinationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import jakarta.jms.Connection;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Ensure that when a queue gets created for the first time, by writing a
message to the queue which has the
+ * {@link ScheduledMessage#AMQ_SCHEDULED_DELAY} header, that the Queue
Advisory message gets published to the
+ * {@link AdvisorySupport#QUEUE_ADVISORY_TOPIC} topic.
+ *
+ * See https://issues.apache.org/jira/browse/AMQ-9187
+ */
+public class JobSchedulerWithAdvisoryMessageTest extends
JobSchedulerTestSupport {
+
+ final AtomicLong uniqueQueueId = new
AtomicLong(System.currentTimeMillis());
+
+ private Connection connection;
+ private Session session;
+
+ /**
+ * The queues that got created according to the Queue Advisory Topic. See
{@link AdvisorySupport#QUEUE_ADVISORY_TOPIC}
+ */
+ private List<String> queuesCreated;
+
+ @Before
+ public void setupQueueCreationObserver() throws Exception {
+ assertTrue(broker.isAdvisorySupport()); // ensure Advisory Support is
turned on
+
+ connection = createConnection();
+ connection.start();
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ queuesCreated = new ArrayList<>();
+
+ // register to listen to a Queue Advisory
+ MessageConsumer consumer =
session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
+ consumer.setMessageListener((Message msg)->{
+ ActiveMQMessage activeMessage = (ActiveMQMessage) msg;
+ Object command = activeMessage.getDataStructure();
+ if (command instanceof DestinationInfo) {
+ DestinationInfo destinationInfo = (DestinationInfo) command;
+ String physicalName =
destinationInfo.getDestination().getPhysicalName();
+ if (destinationInfo.isAddOperation()) {
+ queuesCreated.add(physicalName);
+ }
+ }
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ super.tearDown(); // clean up the broker
+ }
+
+ @Test
+ public void sendDelayedMessage_usingNormalProducer() throws Exception {
+ // send delayed message using a named (i.e. not anonymous) jms producer
+ final String queueName = getNewQueueName();
+ final Queue destination = session.createQueue(queueName);
+ delay(200);
+ assertFalse(queuesCreated.contains(queueName)); // we do not expect
the queue to be created yet.
+
+ MessageProducer producer = session.createProducer(destination);
+ // The act of creating the jms producer actually creates the empty
queue inside the broker
+ // - so the queue already exists before we even send the first message
to it. The Advisory message will get
+ // sent immediately because a new queue was just created.
+ delay(200);
+ assertTrue(queuesCreated.contains(queueName));
+
+ // send delayed message
+ producer.send( createDelayedMessage() );
+
+ // obviously this is still true as the queue was created before we
even sent the delayed message
+ assertTrue(queuesCreated.contains(queueName));
+ }
+
+ /**
+ * See https://issues.apache.org/jira/browse/AMQ-9187
+ */
+ @Test
+ public void sendDelayedMessage_usingAnonymousProducer() throws Exception {
+ final String queueName = getNewQueueName();
+ Queue destination = session.createQueue(queueName);
+ delay(200);
+ assertFalse(queuesCreated.contains(queueName)); // we do not expect
the queue to be created yet.
+
+ // an "Anonymous Producer" isn't bound to a single queue. It can be
used for sending messages to any queue.
+ MessageProducer anonymousProducer = session.createProducer(null);
+ // creation of an anonymous producer does *not* cause any advisory
message to be sent. This is expected.
+ delay(200);
+ assertFalse(queuesCreated.contains(queueName));
+
+ // send delayed message. The queue will get created on-the-fly as we
write the first message to it.
+ // - but the queue doesn't get created immediately because the delayed
message is first stored in
+ // the JobSchedulerStore. After the delay timeout is reached, then
the message gets moved into the real
+ // queue. This is when the queue is actually created.
+ anonymousProducer.send(destination, createDelayedMessage() );
+ delay(500); // the message was delayed for only 5ms so 500ms should
be long enough
+
+ // The Advisory message should be sent because the queue was created
+ assertTrue(queuesCreated.contains(queueName));
+ }
+
+ private Message createDelayedMessage() throws JMSException {
+ TextMessage message = session.createTextMessage("delayed message");
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5); //
use short delay for unit test
+ return message;
+ }
+
+ private String getNewQueueName() {
+ return "queue-" + uniqueQueueId.getAndIncrement();
+ }
+
+ public static void delay(long delayMs) {
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException ex) {
+ }
+ }
+}