This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c6f9a9  AMQ-7352 - Add support for anonymous producer advisories
0c6f9a9 is described below

commit 0c6f9a9a1e253d50dac6b265f04c0a458ac535ee
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Fri Nov 22 13:35:32 2019 -0500

    AMQ-7352 - Add support for anonymous producer advisories
    
    By default this behavior is turned off but can be enabled by setting
    anonymousProducerAdvisorySupport on the BrokerService to true
---
 .../apache/activemq/advisory/AdvisoryBroker.java   |  13 +--
 .../org/apache/activemq/broker/BrokerService.java  |   9 ++
 .../apache/activemq/advisory/AdvisorySupport.java  |   8 +-
 .../broker/advisory/AdvisoryBrokerTest.java        | 100 ++++++++++++++++++++-
 4 files changed, 122 insertions(+), 8 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 1508c61..f2d41e9 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -229,8 +229,10 @@ public class AdvisoryBroker extends BrokerFilter {
     public void addProducer(ConnectionContext context, ProducerInfo info) 
throws Exception {
         super.addProducer(context, info);
 
-        // Don't advise advisory topics.
-        if (info.getDestination() != null && 
!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
+        //Verify destination is either non-null or that we want to advise 
anonymous producers on null destination
+        //Don't advise advisory topics.
+        if ((info.getDestination() != null || 
getBrokerService().isAnonymousProducerAdvisorySupport())
+                && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = 
AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
             fireProducerAdvisory(context, info.getDestination(), topic, info);
             producers.put(info.getProducerId(), info);
@@ -412,12 +414,13 @@ public class AdvisoryBroker extends BrokerFilter {
     public void removeProducer(ConnectionContext context, ProducerInfo info) 
throws Exception {
         super.removeProducer(context, info);
 
-        // Don't advise advisory topics.
+        //Verify destination is either non-null or that we want to advise 
anonymous producers on null destination
+        //Don't advise advisory topics.
         ActiveMQDestination dest = info.getDestination();
-        if (info.getDestination() != null && 
!AdvisorySupport.isAdvisoryTopic(dest)) {
+        if ((dest != null || 
getBrokerService().isAnonymousProducerAdvisorySupport()) && 
!AdvisorySupport.isAdvisoryTopic(dest)) {
             ActiveMQTopic topic = 
AdvisorySupport.getProducerAdvisoryTopic(dest);
             producers.remove(info.getProducerId());
-            if (!dest.isTemporary() || destinations.containsKey(dest)) {
+            if (dest == null || !dest.isTemporary() || 
destinations.containsKey(dest)) {
                 fireProducerAdvisory(context, dest, topic, 
info.createRemoveCommand());
             }
         }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0b05d31..df27da1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -192,6 +192,7 @@ public class BrokerService implements Service {
     // to other jms messaging systems
     private boolean deleteAllMessagesOnStartup;
     private boolean advisorySupport = true;
+    private boolean anonymousProducerAdvisorySupport = false;
     private URI vmConnectorURI;
     private String defaultSocketURIString;
     private PolicyMap destinationPolicy;
@@ -1522,6 +1523,14 @@ public class BrokerService implements Service {
         this.advisorySupport = advisorySupport;
     }
 
+    public boolean isAnonymousProducerAdvisorySupport() {
+        return anonymousProducerAdvisorySupport;
+    }
+
+    public void setAnonymousProducerAdvisorySupport(boolean 
anonymousProducerAdvisorySupport) {
+        this.anonymousProducerAdvisorySupport = 
anonymousProducerAdvisorySupport;
+    }
+
     public List<TransportConnector> getTransportConnectors() {
         return new ArrayList<>(transportConnectors);
     }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
 
b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
index 402cad4..c030acd 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
@@ -36,6 +36,7 @@ public final class AdvisorySupport {
     public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = 
ADVISORY_TOPIC_PREFIX + "Producer.";
     public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = 
PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
     public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = 
PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
+    public static final String ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX = 
PRODUCER_ADVISORY_TOPIC_PREFIX + "Anonymous";
     public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = 
ADVISORY_TOPIC_PREFIX + "Consumer.";
     public static final String 
VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + 
"VirtualDestination.Consumer.";
     public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = 
CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
@@ -137,7 +138,9 @@ public final class AdvisorySupport {
 
     public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination 
destination) {
         String prefix;
-        if (destination.isQueue()) {
+        if (destination == null) {
+            prefix = ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX;
+        } else if (destination.isQueue()) {
             prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
         } else {
             prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
@@ -146,7 +149,8 @@ public final class AdvisorySupport {
     }
 
     private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination 
destination, String prefix, boolean consumerTopics) {
-        return new ActiveMQTopic(prefix + 
destination.getPhysicalName().replaceAll(",", "&sbquo;"));
+        return destination != null ? new ActiveMQTopic(prefix + 
destination.getPhysicalName().replaceAll(",", "&sbquo;")):
+            new ActiveMQTopic(prefix);
     }
 
     public static ActiveMQTopic getExpiredMessageTopic(Destination 
destination) throws JMSException {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index c65dc53..438d649 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -175,7 +175,6 @@ public class AdvisoryBrokerTest extends BrokerTestSupport {
         assertNoMessagesLeft(connection2);
     }
 
-
     public void testProducerAdvisories() throws Exception {
 
         ActiveMQDestination queue = new ActiveMQQueue("test");
@@ -319,6 +318,105 @@ public class AdvisoryBrokerTest extends BrokerTestSupport 
{
         assertNoMessagesLeft(connection1);
     }
 
+    public void testAnonymousProducerAdvisoriesTrue() throws Exception {
+        //turn on support for anonymous producers
+        broker.setAnonymousProducerAdvisorySupport(true);
+
+        ActiveMQDestination destination = 
AdvisorySupport.getProducerAdvisoryTopic(null);
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, 
destination.getPhysicalName());
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(consumerInfo1);
+
+        assertNoMessagesLeft(connection1);
+
+        // Setup a producer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        //don't set a destination
+
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+
+        // We should get an advisory of the new produver.
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), 
producerInfo2.getProducerId());
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, 
m1.getDestination().getPhysicalName());
+
+        // Close the second connection.
+        connection2.request(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
+
+        // We should get an advisory of the producer closing
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+        assertEquals(r.getObjectId(), producerInfo2.getProducerId());
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, 
m1.getDestination().getPhysicalName());
+
+        assertNoMessagesLeft(connection2);
+    }
+
+    public void testAnonymousProducerAdvisoriesFalse() throws Exception {
+        broker.setAnonymousProducerAdvisorySupport(false);
+
+        assertAnonymousProducerAdvisoriesOff();
+    }
+
+    public void testAnonymousProducerAdvisoriesDefault() throws Exception {
+        //Default for now is to have anonymous producer advisories turned off
+        assertAnonymousProducerAdvisoriesOff();
+    }
+
+    private void assertAnonymousProducerAdvisoriesOff() throws Exception {
+        ActiveMQDestination destination = 
AdvisorySupport.getProducerAdvisoryTopic(null);
+        assertEquals(AdvisorySupport.ANONYMOUS_PRODUCER_ADVISORY_TOPIC_PREFIX, 
destination.getPhysicalName());
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(consumerInfo1);
+
+        assertNoMessagesLeft(connection1);
+
+        // Setup a producer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        //don't set a destination
+
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+
+        // We should get an advisory of the new produver.
+        Message m1 = receiveMessage(connection1, 1000);
+        assertNull(m1);
+
+        assertNoMessagesLeft(connection2);
+    }
+
     public static Test suite() {
         return suite(AdvisoryBrokerTest.class);
     }

Reply via email to