Repository: camel
Updated Branches:
  refs/heads/master fe5522c41 -> 83b359090


CAMEL-11639: camel-jms - Add support for changing JMS message selector on 
consumer at runtime via jmx


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/83b35909
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/83b35909
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/83b35909

Branch: refs/heads/master
Commit: 83b35909070f3f695c050bc4b3ab06320c5b7e9d
Parents: fe5522c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Aug 7 07:00:15 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Aug 7 07:00:15 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsConsumer.java |  28 +++++
 .../component/jms/ManagedJmsSelectorTest.java   | 108 +++++++++++++++++++
 2 files changed, 136 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/83b35909/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 8acccc9..f4a0120 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -22,6 +22,8 @@ import javax.jms.Connection;
 import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.Processor;
 import org.apache.camel.Suspendable;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.impl.DefaultConsumer;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.support.JmsUtils;
@@ -34,6 +36,7 @@ import org.springframework.jms.support.JmsUtils;
  * @see DefaultJmsMessageListenerContainer
  * @see SimpleJmsMessageListenerContainer
  */
+@ManagedResource(description = "Managed JMS Consumer")
 public class JmsConsumer extends DefaultConsumer implements Suspendable {
     private volatile AbstractMessageListenerContainer listenerContainer;
     private volatile EndpointMessageListener messageListener;
@@ -251,4 +254,29 @@ public class JmsConsumer extends DefaultConsumer 
implements Suspendable {
         }
     }
 
+    /**
+     * Set the JMS message selector expression (or {@code null} if none).
+     * Default is none.
+     * <p>See the JMS specification for a detailed definition of selector 
expressions.
+     * <p>Note: The message selector may be replaced at runtime, with the 
listener
+     * container picking up the new selector value immediately (works e.g. with
+     * DefaultMessageListenerContainer, as long as the cache level is less than
+     * CACHE_CONSUMER). However, this is considered advanced usage; use it 
with care!
+     */
+    @ManagedAttribute(description = "Changes the JMS selector, as long the 
cache level is less than CACHE_CONSUMER.")
+    public String getMessageSelector() {
+        if (listenerContainer != null) {
+            return listenerContainer.getMessageSelector();
+        } else {
+            return null;
+        }
+    }
+
+    @ManagedAttribute(description = "Changes the JMS selector, as long the 
cache level is less than CACHE_CONSUMER.")
+    public void setMessageSelector(String messageSelector) {
+        if (listenerContainer != null) {
+            listenerContainer.setMessageSelector(messageSelector);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/83b35909/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java
new file mode 100644
index 0000000..8f6937f
--- /dev/null
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.util.Set;
+import javax.jms.ConnectionFactory;
+import javax.management.Attribute;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ *
+ */
+public class ManagedJmsSelectorTest extends CamelTestSupport {
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = new DefaultCamelContext();
+
+        ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
+        context.addComponent("activemq", 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        return context;
+    }
+
+    protected MBeanServer getMBeanServer() {
+        return 
context.getManagementStrategy().getManagementAgent().getMBeanServer();
+    }
+
+    @Test
+    public void testJmsSelectorChangeViaJmx() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+
+        Set<ObjectName> set = mbeanServer.queryNames(new 
ObjectName("*:type=consumers,*"), null);
+        assertEquals(1, set.size());
+
+        ObjectName on = set.iterator().next();
+
+        assertTrue("Should be registered", mbeanServer.isRegistered(on));
+
+        String selector = (String) mbeanServer.getAttribute(on, 
"MessageSelector");
+        assertEquals("brand='beer'", selector);
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Carlsberg");
+
+        template.sendBodyAndHeader("activemq:queue:start", "Pepsi", "brand", 
"softdrink");
+        template.sendBodyAndHeader("activemq:queue:start", "Carlsberg", 
"brand", "beer");
+
+        assertMockEndpointsSatisfied();
+
+        // change the selector at runtime
+
+        resetMocks();
+
+        mbeanServer.setAttribute(on, new Attribute("MessageSelector", 
"brand='softdrink'"));
+
+        // give it a little time to adjust
+        Thread.sleep(100);
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Pepsi");
+
+        template.sendBodyAndHeader("activemq:queue:start", "Pepsi", "brand", 
"softdrink");
+        template.sendBodyAndHeader("activemq:queue:start", "Carlsberg", 
"brand", "beer");
+
+        assertMockEndpointsSatisfied();
+
+        selector = (String) mbeanServer.getAttribute(on, "MessageSelector");
+        assertEquals("brand='softdrink'", selector);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("activemq:queue:start?cacheLevelName=CACHE_NONE&selector=brand='beer'").routeId("foo").to("log:foo").to("mock:result");
+            }
+        };
+    }
+
+
+}

Reply via email to