This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 0c6f9a9 AMQ-7352 - Add support for anonymous producer advisories
0c6f9a9 is described below
commit 0c6f9a9a1e253d50dac6b265f04c0a458ac535ee
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Fri Nov 22 13:35:32 2019 -0500
AMQ-7352 - Add support for anonymous producer advisories
By default this behavior is turned off but can be enabled by setting
anonymousProducerAdvisorySupport on the BrokerService to true
---
.../apache/activemq/advisory/AdvisoryBroker.java | 13 +--
.../org/apache/activemq/broker/BrokerService.java | 9 ++
.../apache/activemq/advisory/AdvisorySupport.java | 8 +-
.../broker/advisory/AdvisoryBrokerTest.java | 100 ++++++++++++++++++++-
4 files changed, 122 insertions(+), 8 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 1508c61..f2d41e9 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -229,8 +229,10 @@ public class AdvisoryBroker extends BrokerFilter {
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
super.addProducer(context, info);
- // Don't advise advisory topics.
- if (info.getDestination() != null &&
!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+ //Verify destination is either non-null or that we want to advise
anonymous producers on null destination
+ //Don't advise advisory topics.
+ if ((info.getDestination() != null ||
getBrokerService().isAnonymousProducerAdvisorySupport())
+ && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic =
AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
fireProducerAdvisory(context, info.getDestination(), topic, info);
producers.put(info.getProducerId(), info);
@@ -412,12 +414,13 @@ public class AdvisoryBroker extends BrokerFilter {
public void removeProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
super.removeProducer(context, info);
- // Don't advise advisory topics.
+ //Verify destination is either non-null or that we want to advise
anonymous producers on null destination
+ //Don't advise advisory topics.
ActiveMQDestination dest = info.getDestination();
- if (info.getDestination() != null &&
!AdvisorySupport.isAdvisoryTopic(dest)) {
+ if ((dest != null ||
getBrokerService().isAnonymousProducerAdvisorySupport()) &&
!AdvisorySupport.isAdvisoryTopic(dest)) {
ActiveMQTopic topic =
AdvisorySupport.getProducerAdvisoryTopic(dest);
producers.remove(info.getProducerId());
- if (!dest.isTemporary() || destinations.containsKey(dest)) {
+ if (dest == null || !dest.isTemporary() ||
destinations.containsKey(dest)) {
fireProducerAdvisory(context, dest, topic,
info.createRemoveCommand());
}
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0b05d31..df27da1 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -192,6 +192,7 @@ public class BrokerService implements Service {
// to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
+ private boolean anonymousProducerAdvisorySupport = false;
private URI vmConnectorURI;
private String defaultSocketURIString;
private PolicyMap destinationPolicy;
@@ -1522,6 +1523,14 @@ public class BrokerService implements Service {
this.advisorySupport = advisorySupport;
}
+ public boolean isAnonymousProducerAdvisorySupport() {
+ return anonymousProducerAdvisorySupport;
+ }
+
+ public void setAnonymousProducerAdvisorySupport(boolean
anonymousProducerAdvisorySupport) {
+ this.anonymousProducerAdvisorySupport =
anonymousProducerAdvisorySupport;
+ }
+
public List<TransportConnector> getTransportConnectors() {
return new ArrayList<>(transportConnectors);
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
index 402cad4..c030acd 100644
---
a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
+++
b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
@@ -36,6 +36,7 @@ public final class AdvisorySupport {
public static final String PRODUCER_ADVISORY_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "Producer.";
public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX =
PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX =
PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+ public static final String ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX =
PRODUCER_ADVISORY_TOPIC_PREFIX + "Anonymous";
public static final String CONSUMER_ADVISORY_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "Consumer.";
public static final String
VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX +
"VirtualDestination.Consumer.";
public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX =
CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
@@ -137,7 +138,9 @@ public final class AdvisorySupport {
public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination
destination) {
String prefix;
- if (destination.isQueue()) {
+ if (destination == null) {
+ prefix = ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX;
+ } else if (destination.isQueue()) {
prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
} else {
prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
@@ -146,7 +149,8 @@ public final class AdvisorySupport {
}
private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination
destination, String prefix, boolean consumerTopics) {
- return new ActiveMQTopic(prefix +
destination.getPhysicalName().replaceAll(",", "‚"));
+ return destination != null ? new ActiveMQTopic(prefix +
destination.getPhysicalName().replaceAll(",", "‚")):
+ new ActiveMQTopic(prefix);
}
public static ActiveMQTopic getExpiredMessageTopic(Destination
destination) throws JMSException {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index c65dc53..438d649 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -175,7 +175,6 @@ public class AdvisoryBrokerTest extends BrokerTestSupport {
assertNoMessagesLeft(connection2);
}
-
public void testProducerAdvisories() throws Exception {
ActiveMQDestination queue = new ActiveMQQueue("test");
@@ -319,6 +318,105 @@ public class AdvisoryBrokerTest extends BrokerTestSupport
{
assertNoMessagesLeft(connection1);
}
+ public void testAnonymousProducerAdvisoriesTrue() throws Exception {
+ //turn on support for anonymous producers
+ broker.setAnonymousProducerAdvisorySupport(true);
+
+ ActiveMQDestination destination =
AdvisorySupport.getProducerAdvisoryTopic(null);
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX,
destination.getPhysicalName());
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(consumerInfo1);
+
+ assertNoMessagesLeft(connection1);
+
+ // Setup a producer.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ //don't set a destination
+
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // We should get an advisory of the new produver.
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ assertNotNull(m1.getDataStructure());
+ assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(),
producerInfo2.getProducerId());
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX,
m1.getDestination().getPhysicalName());
+
+ // Close the second connection.
+ connection2.request(closeConnectionInfo(connectionInfo2));
+ connection2.stop();
+
+ // We should get an advisory of the producer closing
+ m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ assertNotNull(m1.getDataStructure());
+ RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+ assertEquals(r.getObjectId(), producerInfo2.getProducerId());
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX,
m1.getDestination().getPhysicalName());
+
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void testAnonymousProducerAdvisoriesFalse() throws Exception {
+ broker.setAnonymousProducerAdvisorySupport(false);
+
+ assertAnonymousProducerAdvisoriesOff();
+ }
+
+ public void testAnonymousProducerAdvisoriesDefault() throws Exception {
+ //Default for now is to have anonymous producer advisories turned off
+ assertAnonymousProducerAdvisoriesOff();
+ }
+
+ private void assertAnonymousProducerAdvisoriesOff() throws Exception {
+ ActiveMQDestination destination =
AdvisorySupport.getProducerAdvisoryTopic(null);
+ assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX,
destination.getPhysicalName());
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
+ consumerInfo1.setPrefetchSize(100);
+
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(consumerInfo1);
+
+ assertNoMessagesLeft(connection1);
+
+ // Setup a producer.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ //don't set a destination
+
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ // We should get an advisory of the new produver.
+ Message m1 = receiveMessage(connection1, 1000);
+ assertNull(m1);
+
+ assertNoMessagesLeft(connection2);
+ }
+
public static Test suite() {
return suite(AdvisoryBrokerTest.class);
}