This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 20dc305a4 AMQ-9175 - Properly set broker on ConnectionContext inside 
MemoryTransactionStores
20dc305a4 is described below

commit 20dc305a49c0538ef743043f12f3d075694c693f
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Mon Nov 21 11:02:12 2022 -0500

    AMQ-9175 - Properly set broker on ConnectionContext inside
    MemoryTransactionStores
    
    This fixes a NPE when using the messageDelivered advisory and
    transactions
    
    (cherry picked from commit f83c5f1ba156ed850acf76dc42f944b4150d21f8)
---
 .../store/memory/MemoryPersistenceAdapter.java     |  13 ++-
 .../store/memory/MemoryTransactionStore.java       |  12 ++-
 .../store/jdbc/JdbcMemoryTransactionStore.java     |   9 +-
 .../apache/activemq/advisory/AdvisoryTests.java    | 107 +++++++++++++++++----
 4 files changed, 117 insertions(+), 24 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index a1233e0ae..a668c4eb1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -43,13 +45,15 @@ import org.slf4j.LoggerFactory;
 /**
  * @org.apache.xbean.XBean
  */
-public class MemoryPersistenceAdapter implements PersistenceAdapter, 
NoLocalSubscriptionAware {
+public class MemoryPersistenceAdapter implements PersistenceAdapter, 
NoLocalSubscriptionAware,
+    BrokerServiceAware {
     private static final Logger LOG = 
LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
 
     MemoryTransactionStore transactionStore;
     ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new 
ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
     ConcurrentMap<ActiveMQDestination, MessageStore> queues = new 
ConcurrentHashMap<ActiveMQDestination, MessageStore>();
     private boolean useExternalMessageReferences;
+    protected BrokerService brokerService;
 
     @Override
     public Set<ActiveMQDestination> getDestinations() {
@@ -118,7 +122,7 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter, NoLocalSubs
     @Override
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
-            transactionStore = new MemoryTransactionStore(this);
+            transactionStore = new MemoryTransactionStore(this, brokerService);
         }
         return transactionStore;
     }
@@ -253,4 +257,9 @@ public class MemoryPersistenceAdapter implements 
PersistenceAdapter, NoLocalSubs
     public boolean isPersistNoLocal() {
         return true;
     }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index abf828243..62b57613e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -51,6 +52,7 @@ public class MemoryTransactionStore implements 
TransactionStore {
     protected ConcurrentMap<Object, Tx> inflightTransactions = new 
ConcurrentHashMap<Object, Tx>();
     protected Map<TransactionId, Tx> preparedTransactions = 
Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
     protected final PersistenceAdapter persistenceAdapter;
+    protected final BrokerService brokerService;
 
     private boolean doingRecover;
 
@@ -93,6 +95,13 @@ public class MemoryTransactionStore implements 
TransactionStore {
          */
         public void commit() throws IOException {
             ConnectionContext ctx = new ConnectionContext();
+            try {
+                if (brokerService != null) {
+                    ctx.setBroker(brokerService.getBroker());
+                }
+            }  catch (Exception e) {
+                throw new IOException(e.getMessage(), e);
+            }
             persistenceAdapter.beginTransaction(ctx);
             try {
 
@@ -134,8 +143,9 @@ public class MemoryTransactionStore implements 
TransactionStore {
         MessageStore getMessageStore();
     }
 
-    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
+    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter, 
BrokerService brokerService) {
         this.persistenceAdapter = persistenceAdapter;
+        this.brokerService = brokerService;
     }
 
     public MessageStore proxy(MessageStore messageStore) {
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index ccf748501..32871696c 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -51,7 +51,7 @@ public class JdbcMemoryTransactionStore extends 
MemoryTransactionStore {
 
 
     public JdbcMemoryTransactionStore(JDBCPersistenceAdapter 
jdbcPersistenceAdapter) {
-        super(jdbcPersistenceAdapter);
+        super(jdbcPersistenceAdapter, 
jdbcPersistenceAdapter.getBrokerService());
     }
 
     @Override
@@ -163,6 +163,13 @@ public class JdbcMemoryTransactionStore extends 
MemoryTransactionStore {
             if (tx != null) {
                 // undo prepare work
                 ConnectionContext ctx = new ConnectionContext();
+                try {
+                    if (brokerService != null) {
+                        ctx.setBroker(brokerService.getBroker());
+                    }
+                }  catch (Exception e) {
+                    throw new IOException(e.getMessage(), e);
+                }
                 persistenceAdapter.beginTransaction(ctx);
                 try {
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 497856505..9207fb0b4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Enumeration;
 import java.util.HashSet;
-
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -35,9 +35,9 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
+import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.Topic;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
@@ -45,7 +45,6 @@ import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.NullMessageReference;
 import 
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -71,24 +70,28 @@ public class AdvisoryTests {
     protected BrokerService broker;
     protected Connection connection;
     protected String bindAddress = 
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
-    protected int topicCount;
     protected final boolean includeBodyForAdvisory;
+    protected final boolean persistent;
     protected final int EXPIRE_MESSAGE_PERIOD = 3000;
 
-
-    @Parameters(name = "includeBodyForAdvisory={0}")
+    @Parameters(name = "includeBodyForAdvisory={0}, persistent={1}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                // Include the full body of the message
-                {true},
-                // Don't include the full body of the message
-                {false}
+            // Include the full body of the message
+            {true, false},
+            // Don't include the full body of the message
+            {false, false},
+            // Include the full body of the message
+            {true, true},
+            // Don't include the full body of the message
+            {false, true}
         });
     }
 
-    public AdvisoryTests(boolean includeBodyForAdvisory) {
+    public AdvisoryTests(boolean includeBodyForAdvisory, boolean persistent) {
         super();
         this.includeBodyForAdvisory = includeBodyForAdvisory;
+        this.persistent = persistent;
     }
 
     @Test(timeout = 60000)
@@ -169,30 +172,92 @@ public class AdvisoryTests {
     }
 
     @Test(timeout = 60000)
-    public void testMessageDeliveryAdvisory() throws Exception {
-        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = s.createQueue(getClass().getName());
-        MessageConsumer consumer = s.createConsumer(queue);
+    public void testQueueMessageDeliveryAdvisory() throws Exception {
+        testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), 
AdvisorySupport::getMessageDeliveredAdvisoryTopic, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testQueueMessageDeliveryAdvisoryTransacted() throws Exception {
+        testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()), 
AdvisorySupport::getMessageDeliveredAdvisoryTopic, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicMessageDeliveryAdvisory() throws Exception {
+        testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), 
AdvisorySupport::getMessageDeliveredAdvisoryTopic, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicMessageDeliveryAdvisoryTransacted() throws Exception {
+        testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()), 
AdvisorySupport::getMessageDeliveredAdvisoryTopic, true);
+    }
+
+    private void testMessageConsumerAdvisory(ActiveMQDestination dest, 
Function<ActiveMQDestination, Topic> advisoryTopicSupplier,
+        boolean transacted) throws Exception {
+        Session s = connection.createSession(transacted, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = s.createConsumer(dest);
         assertNotNull(consumer);
 
-        Topic advisoryTopic = 
AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
+        Topic advisoryTopic = advisoryTopicSupplier.apply(dest);
         MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
         // start throwing messages at the consumer
-        MessageProducer producer = s.createProducer(queue);
+        MessageProducer producer = s.createProducer(dest);
+
+        BytesMessage m = s.createBytesMessage();
+        m.writeBytes(new byte[1024]);
+        producer.send(m);
+        if (transacted) {
+            s.commit();
+        }
+
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+        if (transacted) {
+            s.commit();
+        }
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+
+        //Could be either
+        String originBrokerUrl = 
(String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL);
+        assertTrue(originBrokerUrl.startsWith("tcp://") || 
originBrokerUrl.startsWith("nio://"));
+        
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), 
dest.getQualifiedName());
+
+        //Add assertion to make sure body is included for advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
+    }
+
+    private void testDurableSubscriptionAdvisory(Function<ActiveMQDestination, 
Topic> advisoryTopicSupplier,
+        boolean transacted) throws Exception {
+        Session s = connection.createSession(transacted, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = s.createTopic(getClass().getName());
+        MessageConsumer consumer = s.createDurableSubscriber(topic, "sub");
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = 
advisoryTopicSupplier.apply((ActiveMQDestination) topic);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(topic);
 
         BytesMessage m = s.createBytesMessage();
         m.writeBytes(new byte[1024]);
         producer.send(m);
+        if (transacted) {
+            s.commit();
+        }
 
         Message msg = advisoryConsumer.receive(1000);
         assertNotNull(msg);
+        if (transacted) {
+            s.commit();
+        }
         ActiveMQMessage message = (ActiveMQMessage) msg;
         ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
 
         //This should always be tcp:// because that is the transport that is 
used to connect even though
         //the nio transport is the first one in the list
         
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
-        
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), 
((ActiveMQDestination) queue).getQualifiedName());
+        
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION), 
((ActiveMQDestination) topic).getQualifiedName());
 
         //Add assertion to make sure body is included for advisory topics
         //when includeBodyForAdvisory is true
@@ -524,7 +589,9 @@ public class AdvisoryTests {
     }
 
     protected void configureBroker(BrokerService answer) throws Exception {
-        answer.setPersistent(false);
+        answer.setPersistent(persistent);
+        answer.setDeleteAllMessagesOnStartup(true);
+
         PolicyEntry policy = new PolicyEntry();
         policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD);
         policy.setAdvisoryForFastProducers(true);

Reply via email to