Repository: camel Updated Branches: refs/heads/master fe5522c41 -> 83b359090
CAMEL-11639: camel-jms - Add support for changing JMS message selector on consumer at runtime via jmx Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/83b35909 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/83b35909 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/83b35909 Branch: refs/heads/master Commit: 83b35909070f3f695c050bc4b3ab06320c5b7e9d Parents: fe5522c Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Aug 7 07:00:15 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Aug 7 07:00:15 2017 +0200 ---------------------------------------------------------------------- .../apache/camel/component/jms/JmsConsumer.java | 28 +++++ .../component/jms/ManagedJmsSelectorTest.java | 108 +++++++++++++++++++ 2 files changed, 136 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/83b35909/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java index 8acccc9..f4a0120 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java @@ -22,6 +22,8 @@ import javax.jms.Connection; import org.apache.camel.FailedToCreateConsumerException; import org.apache.camel.Processor; import org.apache.camel.Suspendable; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.impl.DefaultConsumer; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.support.JmsUtils; @@ -34,6 +36,7 @@ import org.springframework.jms.support.JmsUtils; * @see DefaultJmsMessageListenerContainer * @see SimpleJmsMessageListenerContainer */ +@ManagedResource(description = "Managed JMS Consumer") public class JmsConsumer extends DefaultConsumer implements Suspendable { private volatile AbstractMessageListenerContainer listenerContainer; private volatile EndpointMessageListener messageListener; @@ -251,4 +254,29 @@ public class JmsConsumer extends DefaultConsumer implements Suspendable { } } + /** + * Set the JMS message selector expression (or {@code null} if none). + * Default is none. + * <p>See the JMS specification for a detailed definition of selector expressions. + * <p>Note: The message selector may be replaced at runtime, with the listener + * container picking up the new selector value immediately (works e.g. with + * DefaultMessageListenerContainer, as long as the cache level is less than + * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! + */ + @ManagedAttribute(description = "Changes the JMS selector, as long the cache level is less than CACHE_CONSUMER.") + public String getMessageSelector() { + if (listenerContainer != null) { + return listenerContainer.getMessageSelector(); + } else { + return null; + } + } + + @ManagedAttribute(description = "Changes the JMS selector, as long the cache level is less than CACHE_CONSUMER.") + public void setMessageSelector(String messageSelector) { + if (listenerContainer != null) { + listenerContainer.setMessageSelector(messageSelector); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/83b35909/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java new file mode 100644 index 0000000..8f6937f --- /dev/null +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/ManagedJmsSelectorTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import java.util.Set; +import javax.jms.ConnectionFactory; +import javax.management.Attribute; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class ManagedJmsSelectorTest extends CamelTestSupport { + + @Override + protected boolean useJmx() { + return true; + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext context = new DefaultCamelContext(); + + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + context.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + return context; + } + + protected MBeanServer getMBeanServer() { + return context.getManagementStrategy().getManagementAgent().getMBeanServer(); + } + + @Test + public void testJmsSelectorChangeViaJmx() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=consumers,*"), null); + assertEquals(1, set.size()); + + ObjectName on = set.iterator().next(); + + assertTrue("Should be registered", mbeanServer.isRegistered(on)); + + String selector = (String) mbeanServer.getAttribute(on, "MessageSelector"); + assertEquals("brand='beer'", selector); + + getMockEndpoint("mock:result").expectedBodiesReceived("Carlsberg"); + + template.sendBodyAndHeader("activemq:queue:start", "Pepsi", "brand", "softdrink"); + template.sendBodyAndHeader("activemq:queue:start", "Carlsberg", "brand", "beer"); + + assertMockEndpointsSatisfied(); + + // change the selector at runtime + + resetMocks(); + + mbeanServer.setAttribute(on, new Attribute("MessageSelector", "brand='softdrink'")); + + // give it a little time to adjust + Thread.sleep(100); + + getMockEndpoint("mock:result").expectedBodiesReceived("Pepsi"); + + template.sendBodyAndHeader("activemq:queue:start", "Pepsi", "brand", "softdrink"); + template.sendBodyAndHeader("activemq:queue:start", "Carlsberg", "brand", "beer"); + + assertMockEndpointsSatisfied(); + + selector = (String) mbeanServer.getAttribute(on, "MessageSelector"); + assertEquals("brand='softdrink'", selector); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("activemq:queue:start?cacheLevelName=CACHE_NONE&selector=brand='beer'").routeId("foo").to("log:foo").to("mock:result"); + } + }; + } + + +}