This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
new 7ed319e99 AMQ-9187 - Queue Advisory message not sent - a test case
which demonstrates the bug. - the first test case
sendDelayedMessage_usingNormalProducer works fine because it is using a normal
named JMS MessageProducer. Included just for comparison purposes. - the 2nd
test case sendDelayedMessage_usingAnonymousProducer shows the bug. - the bug
fix: swap the order of the AdvisoryBroker and SchedulerBroker BrokerFilters.
- make AdvisoryBroker come after SchedulerBroker but [...]
7ed319e99 is described below
commit 7ed319e99d39b820a7ddfa6f5548f2ccec72c06d
Author: Martin Devlin <[email protected]>
AuthorDate: Mon Jan 2 21:20:58 2023 -0800
AMQ-9187 - Queue Advisory message not sent
- a test case which demonstrates the bug.
- the first test case sendDelayedMessage_usingNormalProducer works fine
because it is using a normal named JMS MessageProducer. Included just for
comparison purposes.
- the 2nd test case sendDelayedMessage_usingAnonymousProducer shows the
bug.
- the bug fix: swap the order of the AdvisoryBroker and SchedulerBroker
BrokerFilters.
- make AdvisoryBroker come after SchedulerBroker but before RegionBroker
- this ensures that when a delayed message gets eventually forwarded to
the RegionBroker, the RegionBroker will "see" the AdvisoryBroker when it
invokes 'addDestination'. Thus, the AdvisoryBroker gets to send out the
advisory message as expected.
(cherry picked from commit 573ae71ec9197ae7392941239912bd62e7db2c87)
---
.../org/apache/activemq/broker/BrokerService.java | 7 +-
.../JobSchedulerWithAdvisoryMessageTest.java | 160 +++++++++++++++++++++
2 files changed, 164 insertions(+), 3 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 8c969571c..405e88902 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2427,6 +2427,10 @@ public class BrokerService implements Service {
* @throws IOException
*/
protected Broker addInterceptors(Broker broker) throws Exception {
+ if (isAdvisorySupport()) {
+ // AMQ-9187 - the AdvisoryBroker must be after the SchedulerBroker
+ broker = new AdvisoryBroker(broker);
+ }
if (isSchedulerSupport()) {
SchedulerBroker sb = new SchedulerBroker(this, broker,
getJobSchedulerStore());
sb.setMaxRepeatAllowed(maxSchedulerRepeatAllowed);
@@ -2453,9 +2457,6 @@ public class BrokerService implements Service {
+ e.getMessage(), e);
}
}
- if (isAdvisorySupport()) {
- broker = new AdvisoryBroker(broker);
- }
broker = new CompositeDestinationBroker(broker);
broker = new TransactionBroker(broker,
getPersistenceAdapter().createTransactionStore());
if (isPopulateJMSXUserID()) {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
new file mode 100644
index 000000000..86b3caa21
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerWithAdvisoryMessageTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.DestinationInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Ensure that when a queue gets created for the first time, by writing a
message to the queue which has the
+ * {@link ScheduledMessage#AMQ_SCHEDULED_DELAY} header, that the Queue
Advisory message gets published to the
+ * {@link AdvisorySupport#QUEUE_ADVISORY_TOPIC} topic.
+ *
+ * See https://issues.apache.org/jira/browse/AMQ-9187
+ */
+public class JobSchedulerWithAdvisoryMessageTest extends
JobSchedulerTestSupport {
+
+ final AtomicLong uniqueQueueId = new
AtomicLong(System.currentTimeMillis());
+
+ private Connection connection;
+ private Session session;
+
+ /**
+ * The queues that got created according to the Queue Advisory Topic. See
{@link AdvisorySupport#QUEUE_ADVISORY_TOPIC}
+ */
+ private List<String> queuesCreated;
+
+ @Before
+ public void setupQueueCreationObserver() throws Exception {
+ assertTrue(broker.isAdvisorySupport()); // ensure Advisory Support is
turned on
+
+ connection = createConnection();
+ connection.start();
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ queuesCreated = new ArrayList<>();
+
+ // register to listen to a Queue Advisory
+ MessageConsumer consumer =
session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
+ consumer.setMessageListener((Message msg)->{
+ ActiveMQMessage activeMessage = (ActiveMQMessage) msg;
+ Object command = activeMessage.getDataStructure();
+ if (command instanceof DestinationInfo) {
+ DestinationInfo destinationInfo = (DestinationInfo) command;
+ String physicalName =
destinationInfo.getDestination().getPhysicalName();
+ if (destinationInfo.isAddOperation()) {
+ queuesCreated.add(physicalName);
+ }
+ }
+ });
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ super.tearDown(); // clean up the broker
+ }
+
+ @Test
+ public void sendDelayedMessage_usingNormalProducer() throws Exception {
+ // send delayed message using a named (i.e. not anonymous) jms producer
+ final String queueName = getNewQueueName();
+ final Queue destination = session.createQueue(queueName);
+ delay(200);
+ assertFalse(queuesCreated.contains(queueName)); // we do not expect
the queue to be created yet.
+
+ MessageProducer producer = session.createProducer(destination);
+ // The act of creating the jms producer actually creates the empty
queue inside the broker
+ // - so the queue already exists before we even send the first message
to it. The Advisory message will get
+ // sent immediately because a new queue was just created.
+ delay(200);
+ assertTrue(queuesCreated.contains(queueName));
+
+ // send delayed message
+ producer.send( createDelayedMessage() );
+
+ // obviously this is still true as the queue was created before we
even sent the delayed message
+ assertTrue(queuesCreated.contains(queueName));
+ }
+
+ /**
+ * See https://issues.apache.org/jira/browse/AMQ-9187
+ */
+ @Test
+ public void sendDelayedMessage_usingAnonymousProducer() throws Exception {
+ final String queueName = getNewQueueName();
+ Queue destination = session.createQueue(queueName);
+ delay(200);
+ assertFalse(queuesCreated.contains(queueName)); // we do not expect
the queue to be created yet.
+
+ // an "Anonymous Producer" isn't bound to a single queue. It can be
used for sending messages to any queue.
+ MessageProducer anonymousProducer = session.createProducer(null);
+ // creation of an anonymous producer does *not* cause any advisory
message to be sent. This is expected.
+ delay(200);
+ assertFalse(queuesCreated.contains(queueName));
+
+ // send delayed message. The queue will get created on-the-fly as we
write the first message to it.
+ // - but the queue doesn't get created immediately because the delayed
message is first stored in
+ // the JobSchedulerStore. After the delay timeout is reached, then
the message gets moved into the real
+ // queue. This is when the queue is actually created.
+ anonymousProducer.send(destination, createDelayedMessage() );
+ delay(500); // the message was delayed for only 5ms so 500ms should
be long enough
+
+ // The Advisory message should be sent because the queue was created
+ assertTrue(queuesCreated.contains(queueName));
+ }
+
+ private Message createDelayedMessage() throws JMSException {
+ TextMessage message = session.createTextMessage("delayed message");
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5); //
use short delay for unit test
+ return message;
+ }
+
+ private String getNewQueueName() {
+ return "queue-" + uniqueQueueId.getAndIncrement();
+ }
+
+ public static void delay(long delayMs) {
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException ex) {
+ }
+ }
+}