This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new f83c5f1ba AMQ-9175 - Properly set broker on ConnectionContext inside
MemoryTransactionStores
f83c5f1ba is described below
commit f83c5f1ba156ed850acf76dc42f944b4150d21f8
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Mon Nov 21 11:02:12 2022 -0500
AMQ-9175 - Properly set broker on ConnectionContext inside
MemoryTransactionStores
This fixes a NPE when using the messageDelivered advisory and
transactions
---
.../store/memory/MemoryPersistenceAdapter.java | 13 ++-
.../store/memory/MemoryTransactionStore.java | 12 ++-
.../store/jdbc/JdbcMemoryTransactionStore.java | 9 +-
.../apache/activemq/advisory/AdvisoryTests.java | 110 ++++++++++++++++-----
4 files changed, 115 insertions(+), 29 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index a1233e0ae..a668c4eb1 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -24,6 +24,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
@@ -43,13 +45,15 @@ import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*/
-public class MemoryPersistenceAdapter implements PersistenceAdapter,
NoLocalSubscriptionAware {
+public class MemoryPersistenceAdapter implements PersistenceAdapter,
NoLocalSubscriptionAware,
+ BrokerServiceAware {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
MemoryTransactionStore transactionStore;
ConcurrentMap<ActiveMQDestination, TopicMessageStore> topics = new
ConcurrentHashMap<ActiveMQDestination, TopicMessageStore>();
ConcurrentMap<ActiveMQDestination, MessageStore> queues = new
ConcurrentHashMap<ActiveMQDestination, MessageStore>();
private boolean useExternalMessageReferences;
+ protected BrokerService brokerService;
@Override
public Set<ActiveMQDestination> getDestinations() {
@@ -118,7 +122,7 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter, NoLocalSubs
@Override
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
- transactionStore = new MemoryTransactionStore(this);
+ transactionStore = new MemoryTransactionStore(this, brokerService);
}
return transactionStore;
}
@@ -253,4 +257,9 @@ public class MemoryPersistenceAdapter implements
PersistenceAdapter, NoLocalSubs
public boolean isPersistNoLocal() {
return true;
}
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
index abf828243..62b57613e 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -51,6 +52,7 @@ public class MemoryTransactionStore implements
TransactionStore {
protected ConcurrentMap<Object, Tx> inflightTransactions = new
ConcurrentHashMap<Object, Tx>();
protected Map<TransactionId, Tx> preparedTransactions =
Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
protected final PersistenceAdapter persistenceAdapter;
+ protected final BrokerService brokerService;
private boolean doingRecover;
@@ -93,6 +95,13 @@ public class MemoryTransactionStore implements
TransactionStore {
*/
public void commit() throws IOException {
ConnectionContext ctx = new ConnectionContext();
+ try {
+ if (brokerService != null) {
+ ctx.setBroker(brokerService.getBroker());
+ }
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
persistenceAdapter.beginTransaction(ctx);
try {
@@ -134,8 +143,9 @@ public class MemoryTransactionStore implements
TransactionStore {
MessageStore getMessageStore();
}
- public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
+ public MemoryTransactionStore(PersistenceAdapter persistenceAdapter,
BrokerService brokerService) {
this.persistenceAdapter = persistenceAdapter;
+ this.brokerService = brokerService;
}
public MessageStore proxy(MessageStore messageStore) {
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index ccf748501..32871696c 100644
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -51,7 +51,7 @@ public class JdbcMemoryTransactionStore extends
MemoryTransactionStore {
public JdbcMemoryTransactionStore(JDBCPersistenceAdapter
jdbcPersistenceAdapter) {
- super(jdbcPersistenceAdapter);
+ super(jdbcPersistenceAdapter,
jdbcPersistenceAdapter.getBrokerService());
}
@Override
@@ -163,6 +163,13 @@ public class JdbcMemoryTransactionStore extends
MemoryTransactionStore {
if (tx != null) {
// undo prepare work
ConnectionContext ctx = new ConnectionContext();
+ try {
+ if (brokerService != null) {
+ ctx.setBroker(brokerService.getBroker());
+ }
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
persistenceAdapter.beginTransaction(ctx);
try {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 82f50cc80..624126e13 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -25,8 +25,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashSet;
-
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -39,9 +37,7 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.jms.Topic;
-
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
@@ -49,7 +45,6 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.NullMessageReference;
import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -76,21 +71,27 @@ public class AdvisoryTests {
protected Connection connection;
protected String bindAddress =
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected final boolean includeBodyForAdvisory;
+ protected final boolean persistent;
protected final int EXPIRE_MESSAGE_PERIOD = 3000;
- @Parameters(name = "includeBodyForAdvisory={0}")
+ @Parameters(name = "includeBodyForAdvisory={0}, persistent={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
// Include the full body of the message
- {true},
+ {true, false},
+ // Don't include the full body of the message
+ {false, false},
+ // Include the full body of the message
+ {true, true},
// Don't include the full body of the message
- {false}
+ {false, true}
});
}
- public AdvisoryTests(boolean includeBodyForAdvisory) {
+ public AdvisoryTests(boolean includeBodyForAdvisory, boolean persistent) {
super();
this.includeBodyForAdvisory = includeBodyForAdvisory;
+ this.persistent = persistent;
}
@Test(timeout = 60000)
@@ -172,45 +173,88 @@ public class AdvisoryTests {
@Test(timeout = 60000)
public void testQueueMessageDeliveryAdvisory() throws Exception {
- testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDeliveredAdvisoryTopic);
+ testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDeliveredAdvisoryTopic, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueMessageDeliveryAdvisoryTransacted() throws Exception {
+ testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDeliveredAdvisoryTopic, true);
}
@Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisory() throws Exception {
- testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic);
+ testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueMessageDispatchedAdvisoryTransacted() throws
Exception {
+ testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
}
@Test(timeout = 60000)
public void testQueueMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
- testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic);
+ testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueMessageDispatchedAdvisorySyncTransacted() throws
Exception {
+ ((ActiveMQConnection)connection).setDispatchAsync(false);
+ testMessageConsumerAdvisory(new ActiveMQQueue(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
}
@Test(timeout = 60000)
public void testTopicMessageDeliveryAdvisory() throws Exception {
- testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDeliveredAdvisoryTopic);
+ testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDeliveredAdvisoryTopic, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicMessageDeliveryAdvisoryTransacted() throws Exception {
+ testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDeliveredAdvisoryTopic, true);
}
@Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisory() throws Exception {
- testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic);
+ testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicMessageDispatchedAdvisoryTransacted() throws
Exception {
+ testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
}
@Test(timeout = 60000)
public void testTopicMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
- testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic);
+ testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testTopicMessageDispatchedAdvisorySyncTransacted() throws
Exception {
+ ((ActiveMQConnection)connection).setDispatchAsync(false);
+ testMessageConsumerAdvisory(new ActiveMQTopic(getClass().getName()),
AdvisorySupport::getMessageDispatchedAdvisoryTopic, true);
}
@Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisory() throws Exception {
-
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic);
+
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic,
false);
}
@Test(timeout = 60000)
public void testDurableMessageDispatchedAdvisorySync() throws Exception {
((ActiveMQConnection)connection).setDispatchAsync(false);
-
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic);
+
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic,
false);
+ }
+
+ @Test(timeout = 60000)
+ public void testDurableMessageDispatchedAdvisoryTransacted() throws
Exception {
+
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic,
true);
+ }
+
+ @Test(timeout = 60000)
+ public void testDurableMessageDispatchedAdvisorySyncTransacted() throws
Exception {
+ ((ActiveMQConnection)connection).setDispatchAsync(false);
+
testDurableSubscriptionAdvisory(AdvisorySupport::getMessageDispatchedAdvisoryTopic,
true);
}
@Test(timeout = 60000)
@@ -243,8 +287,9 @@ public class AdvisoryTests {
assertNull(msg);
}
- private void testMessageConsumerAdvisory(ActiveMQDestination dest,
Function<ActiveMQDestination, Topic> advisoryTopicSupplier) throws Exception {
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ private void testMessageConsumerAdvisory(ActiveMQDestination dest,
Function<ActiveMQDestination, Topic> advisoryTopicSupplier,
+ boolean transacted) throws Exception {
+ Session s = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = s.createConsumer(dest);
assertNotNull(consumer);
@@ -256,15 +301,21 @@ public class AdvisoryTests {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
+ if (transacted) {
+ s.commit();
+ }
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
+ if (transacted) {
+ s.commit();
+ }
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
- //This should always be tcp:// because that is the transport that is
used to connect even though
- //the nio transport is the first one in the list
-
assertTrue(((String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL)).startsWith("tcp://"));
+ //Could be either
+ String originBrokerUrl =
(String)message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL);
+ assertTrue(originBrokerUrl.startsWith("tcp://") ||
originBrokerUrl.startsWith("nio://"));
assertEquals(message.getProperty(AdvisorySupport.MSG_PROPERTY_DESTINATION),
dest.getQualifiedName());
//Make sure consumer id exists if dispatched advisory
@@ -277,8 +328,9 @@ public class AdvisoryTests {
assertIncludeBodyForAdvisory(payload);
}
- private void testDurableSubscriptionAdvisory(Function<ActiveMQDestination,
Topic> advisoryTopicSupplier) throws Exception {
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ private void testDurableSubscriptionAdvisory(Function<ActiveMQDestination,
Topic> advisoryTopicSupplier,
+ boolean transacted) throws Exception {
+ Session s = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
Topic topic = s.createTopic(getClass().getName());
MessageConsumer consumer = s.createDurableSubscriber(topic, "sub");
assertNotNull(consumer);
@@ -291,9 +343,15 @@ public class AdvisoryTests {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
+ if (transacted) {
+ s.commit();
+ }
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
+ if (transacted) {
+ s.commit();
+ }
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
@@ -632,7 +690,9 @@ public class AdvisoryTests {
}
protected void configureBroker(BrokerService answer) throws Exception {
- answer.setPersistent(false);
+ answer.setPersistent(persistent);
+ answer.setDeleteAllMessagesOnStartup(true);
+
PolicyEntry policy = new PolicyEntry();
policy.setExpireMessagesPeriod(EXPIRE_MESSAGE_PERIOD);
policy.setAdvisoryForFastProducers(true);