This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
     new adf83c7cb4 [AMQ-9530] Fix SelectorAwareVirtualTopicInterceptor 
ClassCastException if next is not Topic.
adf83c7cb4 is described below

commit adf83c7cb47ebf627fd9ed80c59b64c32a993c43
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Tue Jul 9 16:44:07 2024 -0700

    [AMQ-9530] Fix SelectorAwareVirtualTopicInterceptor ClassCastException if 
next is not Topic.
    
    (cherry picked from commit 473267baf862cc65c3c20b90f737036d4f89db29)
---
 .../virtual/BaseVirtualDestinationFilter.java      |  39 +++++++
 .../broker/region/virtual/MappedQueueFilter.java   |  13 +--
 .../SelectorAwareVirtualTopicInterceptor.java      |   7 +-
 .../region/virtual/VirtualTopicInterceptor.java    |   3 +-
 ...ualTopicSelectorWithAnotherInterceptorTest.java | 126 +++++++++++++++++++++
 5 files changed, 173 insertions(+), 15 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java
new file mode 100644
index 0000000000..82783e5da5
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.virtual;
+
+import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+
+import java.util.Optional;
+
+public class BaseVirtualDestinationFilter extends DestinationFilter {
+    
+    public BaseVirtualDestinationFilter(Destination next) {
+        super(next);
+    }
+
+    BaseDestination getBaseDestination(Destination virtualDest) {
+        if (virtualDest instanceof BaseDestination) {
+            return (BaseDestination) virtualDest;
+        } else if (virtualDest instanceof DestinationFilter) {
+            return ((DestinationFilter) 
virtualDest).getAdaptor(BaseDestination.class);
+        }
+        return null;
+    }
+}
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
index 2baa33a398..838e81c0ed 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java
@@ -16,12 +16,12 @@
  */
 package org.apache.activemq.broker.region.virtual;
 
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.IndirectMessageReference;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
@@ -34,7 +34,7 @@ import org.apache.activemq.util.SubscriptionKey;
  * Creates a mapped Queue that can recover messages from subscription recovery
  * policy of its Virtual Topic.
  */
-public class MappedQueueFilter extends DestinationFilter {
+public class MappedQueueFilter extends BaseVirtualDestinationFilter {
 
     private final ActiveMQDestination virtualDestination;
 
@@ -87,15 +87,6 @@ public class MappedQueueFilter extends DestinationFilter {
         }
     }
 
-    private BaseDestination getBaseDestination(Destination virtualDest) {
-        if (virtualDest instanceof BaseDestination) {
-            return (BaseDestination) virtualDest;
-        } else if (virtualDest instanceof DestinationFilter) {
-            return ((DestinationFilter) 
virtualDest).getAdaptor(BaseDestination.class);
-        }
-        return null;
-    }
-
     @Override
     public synchronized void removeSubscription(ConnectionContext context, 
Subscription sub, long lastDeliveredSequenceId) throws Exception {
         super.removeSubscription(context, sub, lastDeliveredSequenceId);
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
index 727f79d380..b46e4f45db 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
@@ -17,9 +17,9 @@
 package org.apache.activemq.broker.region.virtual;
 
 import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
@@ -41,8 +41,11 @@ public class SelectorAwareVirtualTopicInterceptor extends 
VirtualTopicIntercepto
 
     public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic 
virtualTopic) {
         super(next, virtualTopic);
+        BaseDestination baseDestination = getBaseDestination(next);
         selectorCachePlugin = (SubQueueSelectorCacheBroker)
-                
((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
+                (baseDestination != null
+                ? 
baseDestination.createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class)
+                : null);
     }
 
     /**
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
index 9e5c251bde..b163931dbd 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
@@ -25,7 +25,6 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -39,7 +38,7 @@ import javax.jms.ResourceAllocationException;
 /**
  * A Destination which implements <a 
href="https://activemq.apache.org/virtual-destinations";>Virtual Topic</a>
  */
-public class VirtualTopicInterceptor extends DestinationFilter {
+public class VirtualTopicInterceptor extends BaseVirtualDestinationFilter {
 
     private final String prefix;
     private final String postfix;
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java
new file mode 100644
index 0000000000..a4a9967bed
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import jakarta.jms.Destination;
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Session;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.spring.ConsumerBean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VirtualTopicSelectorWithAnotherInterceptorTest extends 
CompositeTopicTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(VirtualTopicSelectorWithAnotherInterceptorTest.class);
+            
+    protected Destination getConsumer1Dsetination() {
+        return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
+    }
+
+    protected Destination getConsumer2Dsetination() {
+        return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
+    }
+    
+    protected Destination getProducerDestination() {
+        return new ActiveMQTopic("VirtualTopic.TEST");
+    }
+    
+    @Override
+    protected void assertMessagesArrived(ConsumerBean messageList1, 
ConsumerBean messageList2) {
+        messageList1.assertMessagesArrived(total / 2);
+        messageList2.assertMessagesArrived(total / 2);
+ 
+        messageList1.flushMessages();
+        messageList2.flushMessages();
+        
+        LOG.info("validate no other messages on queues");
+        try {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                
+            Destination destination1 = getConsumer1Dsetination();
+            Destination destination2 = getConsumer2Dsetination();
+            MessageConsumer c1 = session.createConsumer(destination1, null);
+            MessageConsumer c2 = session.createConsumer(destination2, null);
+            c1.setMessageListener(messageList1);
+            c2.setMessageListener(messageList2);
+            
+            
+            LOG.info("send one simple message that should go to both 
consumers");
+            MessageProducer producer = 
session.createProducer(getProducerDestination());
+            assertNotNull(producer);
+            
+            producer.send(session.createTextMessage("Last Message"));
+            
+            messageList1.assertMessagesArrived(1);
+            messageList2.assertMessagesArrived(1);
+        
+        } catch (JMSException e) {
+            e.printStackTrace();
+            fail("unexpeced ex while waiting for last messages: " + e);
+        }
+    }
+    
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        // use message selectors on consumers that need to propagate up to the 
virtual
+        // topic dispatch so that un matched messages do not linger on 
subscription queues
+        messageSelector1 = "odd = 'yes'";
+        messageSelector2 = "odd = 'no'";
+        
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+
+        VirtualTopic virtualTopic = new VirtualTopic();
+        // the new config that enables selectors on the interceptor
+        virtualTopic.setSelectorAware(true);
+        VirtualDestinationInterceptor interceptor = new 
VirtualDestinationInterceptor();
+        interceptor.setVirtualDestinations(new 
VirtualDestination[]{virtualTopic});
+        TestDestinationInterceptor testInterceptor = new 
TestDestinationInterceptor();
+        broker.setDestinationInterceptors(new 
DestinationInterceptor[]{testInterceptor, interceptor});
+        return broker;
+    }
+
+    private static class TestDestinationInterceptor implements 
DestinationInterceptor {
+
+        @Override
+        public org.apache.activemq.broker.region.Destination 
intercept(org.apache.activemq.broker.region.Destination destination) {
+            return new DestinationFilter(destination);
+        }
+
+        @Override
+        public void remove(org.apache.activemq.broker.region.Destination 
destination) {
+        }
+
+        @Override
+        public void create(Broker broker, ConnectionContext context, 
ActiveMQDestination destination) throws Exception {
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to