This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
new adf83c7cb4 [AMQ-9530] Fix SelectorAwareVirtualTopicInterceptor
ClassCastException if next is not Topic.
adf83c7cb4 is described below
commit adf83c7cb47ebf627fd9ed80c59b64c32a993c43
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Tue Jul 9 16:44:07 2024 -0700
[AMQ-9530] Fix SelectorAwareVirtualTopicInterceptor ClassCastException if
next is not Topic.
(cherry picked from commit 473267baf862cc65c3c20b90f737036d4f89db29)
---
.../virtual/BaseVirtualDestinationFilter.java | 39 +++++++
.../broker/region/virtual/MappedQueueFilter.java | 13 +--
.../SelectorAwareVirtualTopicInterceptor.java | 7 +-
.../region/virtual/VirtualTopicInterceptor.java | 3 +-
...ualTopicSelectorWithAnotherInterceptorTest.java | 126 +++++++++++++++++++++
5 files changed, 173 insertions(+), 15 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java
new file mode 100644
index 0000000000..82783e5da5
--- /dev/null
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.virtual;
+
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+
+import java.util.Optional;
+
+public class BaseVirtualDestinationFilter extends DestinationFilter {
+
+ public BaseVirtualDestinationFilter(Destination next) {
+ super(next);
+ }
+
+ BaseDestination getBaseDestination(Destination virtualDest) {
+ if (virtualDest instanceof BaseDestination) {
+ return (BaseDestination) virtualDest;
+ } else if (virtualDest instanceof DestinationFilter) {
+ return ((DestinationFilter)
virtualDest).getAdaptor(BaseDestination.class);
+ }
+ return null;
+ }
+}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index 2baa33a398..838e81c0ed 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -16,12 +16,12 @@
*/
package org.apache.activemq.broker.region.virtual;
+import java.util.Optional;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
@@ -34,7 +34,7 @@ import org.apache.activemq.util.SubscriptionKey;
* Creates a mapped Queue that can recover messages from subscription recovery
* policy of its Virtual Topic.
*/
-public class MappedQueueFilter extends DestinationFilter {
+public class MappedQueueFilter extends BaseVirtualDestinationFilter {
private final ActiveMQDestination virtualDestination;
@@ -87,15 +87,6 @@ public class MappedQueueFilter extends DestinationFilter {
}
}
- private BaseDestination getBaseDestination(Destination virtualDest) {
- if (virtualDest instanceof BaseDestination) {
- return (BaseDestination) virtualDest;
- } else if (virtualDest instanceof DestinationFilter) {
- return ((DestinationFilter)
virtualDest).getAdaptor(BaseDestination.class);
- }
- return null;
- }
-
@Override
public synchronized void removeSubscription(ConnectionContext context,
Subscription sub, long lastDeliveredSequenceId) throws Exception {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
index 727f79d380..b46e4f45db 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
@@ -17,9 +17,9 @@
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
@@ -41,8 +41,11 @@ public class SelectorAwareVirtualTopicInterceptor extends
VirtualTopicIntercepto
public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic
virtualTopic) {
super(next, virtualTopic);
+ BaseDestination baseDestination = getBaseDestination(next);
selectorCachePlugin = (SubQueueSelectorCacheBroker)
-
((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
+ (baseDestination != null
+ ?
baseDestination.createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class)
+ : null);
}
/**
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index 9e5c251bde..b163931dbd 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -25,7 +25,6 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -39,7 +38,7 @@ import javax.jms.ResourceAllocationException;
/**
* A Destination which implements <a
href="https://activemq.apache.org/virtual-destinations">Virtual Topic</a>
*/
-public class VirtualTopicInterceptor extends DestinationFilter {
+public class VirtualTopicInterceptor extends BaseVirtualDestinationFilter {
private final String prefix;
private final String postfix;
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java
new file mode 100644
index 0000000000..a4a9967bed
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import jakarta.jms.Destination;
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualTopicSelectorWithAnotherInterceptorTest extends
CompositeTopicTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(VirtualTopicSelectorWithAnotherInterceptorTest.class);
+
+ protected Destination getConsumer1Dsetination() {
+ return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
+ }
+
+ protected Destination getConsumer2Dsetination() {
+ return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
+ }
+
+ protected Destination getProducerDestination() {
+ return new ActiveMQTopic("VirtualTopic.TEST");
+ }
+
+ @Override
+ protected void assertMessagesArrived(ConsumerBean messageList1,
ConsumerBean messageList2) {
+ messageList1.assertMessagesArrived(total / 2);
+ messageList2.assertMessagesArrived(total / 2);
+
+ messageList1.flushMessages();
+ messageList2.flushMessages();
+
+ LOG.info("validate no other messages on queues");
+ try {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ Destination destination1 = getConsumer1Dsetination();
+ Destination destination2 = getConsumer2Dsetination();
+ MessageConsumer c1 = session.createConsumer(destination1, null);
+ MessageConsumer c2 = session.createConsumer(destination2, null);
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+
+ LOG.info("send one simple message that should go to both
consumers");
+ MessageProducer producer =
session.createProducer(getProducerDestination());
+ assertNotNull(producer);
+
+ producer.send(session.createTextMessage("Last Message"));
+
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(1);
+
+ } catch (JMSException e) {
+ e.printStackTrace();
+ fail("unexpeced ex while waiting for last messages: " + e);
+ }
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ // use message selectors on consumers that need to propagate up to the
virtual
+ // topic dispatch so that un matched messages do not linger on
subscription queues
+ messageSelector1 = "odd = 'yes'";
+ messageSelector2 = "odd = 'no'";
+
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+
+ VirtualTopic virtualTopic = new VirtualTopic();
+ // the new config that enables selectors on the interceptor
+ virtualTopic.setSelectorAware(true);
+ VirtualDestinationInterceptor interceptor = new
VirtualDestinationInterceptor();
+ interceptor.setVirtualDestinations(new
VirtualDestination[]{virtualTopic});
+ TestDestinationInterceptor testInterceptor = new
TestDestinationInterceptor();
+ broker.setDestinationInterceptors(new
DestinationInterceptor[]{testInterceptor, interceptor});
+ return broker;
+ }
+
+ private static class TestDestinationInterceptor implements
DestinationInterceptor {
+
+ @Override
+ public org.apache.activemq.broker.region.Destination
intercept(org.apache.activemq.broker.region.Destination destination) {
+ return new DestinationFilter(destination);
+ }
+
+ @Override
+ public void remove(org.apache.activemq.broker.region.Destination
destination) {
+ }
+
+ @Override
+ public void create(Broker broker, ConnectionContext context,
ActiveMQDestination destination) throws Exception {
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact