Repository: tomee Updated Branches: refs/heads/tomee-1.7.x 5e0704878 -> a46a75862
TOMEE-2021 allow to control through JMX MDB listening state Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/8dd71d44 Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/8dd71d44 Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/8dd71d44 Branch: refs/heads/tomee-1.7.x Commit: 8dd71d448144035ab45a119b55700effa2b7ff87 Parents: 5e07048 Author: rmannibucau <[email protected]> Authored: Thu Mar 9 20:09:50 2017 +0100 Committer: Jonathan Gallimore <[email protected]> Committed: Mon Mar 13 11:16:17 2017 +0000 ---------------------------------------------------------------------- .../apache/openejb/core/mdb/MdbContainer.java | 21 ++- .../activemq/ActiveMQResourceAdapter.java | 179 ++++++++++++++++++ .../ActiveMQResourceAdapterControlTest.java | 187 +++++++++++++++++++ 3 files changed, 386 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/8dd71d44/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java index df940be..b382799 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java @@ -71,6 +71,9 @@ import static org.apache.openejb.core.transaction.EjbTransactionUtil.handleSyste public class MdbContainer implements RpcContainer { private static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources"); + + private static final ThreadLocal<BeanContext> CURRENT = new ThreadLocal<>(); + private static final Object[] NO_ARGS = new Object[0]; private final Object containerID; @@ -179,6 +182,7 @@ public class MdbContainer implements RpcContainer { } // activate the endpoint + CURRENT.set(beanContext); try { resourceAdapter.endpointActivation(endpointFactory, activationSpec); } catch (final ResourceException e) { @@ -188,6 +192,8 @@ public class MdbContainer implements RpcContainer { deployments.remove(deploymentId); throw new OpenEJBException(e); + } finally { + CURRENT.remove(); } } @@ -264,7 +270,12 @@ public class MdbContainer implements RpcContainer { try { final EndpointFactory endpointFactory = (EndpointFactory) beanContext.getContainerData(); if (endpointFactory != null) { - resourceAdapter.endpointDeactivation(endpointFactory, endpointFactory.getActivationSpec()); + CURRENT.set(beanContext); + try { + resourceAdapter.endpointDeactivation(endpointFactory, endpointFactory.getActivationSpec()); + } finally { + CURRENT.remove(); + } final MBeanServer server = LocalMBeanServer.get(); for (final ObjectName objectName : endpointFactory.jmxNames) { @@ -472,6 +483,14 @@ public class MdbContainer implements RpcContainer { } } + public static BeanContext current() { + final BeanContext beanContext = CURRENT.get(); + if (beanContext == null) { + CURRENT.remove(); + } + return beanContext; + } + private static class MdbCallContext { private Method deliveryMethod; private TransactionPolicy txPolicy; http://git-wip-us.apache.org/repos/asf/tomee/blob/8dd71d44/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java index 8880875..b0ad912 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java @@ -18,21 +18,50 @@ package org.apache.openejb.resource.activemq; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.ra.ActiveMQEndpointActivationKey; +import org.apache.activemq.ra.ActiveMQEndpointWorker; +import org.apache.openejb.BeanContext; +import org.apache.openejb.core.mdb.MdbContainer; +import org.apache.openejb.monitoring.LocalMBeanServer; +import org.apache.openejb.monitoring.ObjectNameBuilder; import org.apache.openejb.util.Duration; import org.apache.openejb.util.LogCategory; import org.apache.openejb.util.Logger; import org.apache.openejb.util.URISupport; import org.apache.openejb.util.URLs; +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.AttributeNotFoundException; +import javax.management.DynamicMBean; +import javax.management.InvalidAttributeValueException; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanConstructorInfo; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.MBeanNotificationInfo; +import javax.management.MBeanOperationInfo; +import javax.management.MBeanParameterInfo; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import javax.resource.NotSupportedException; +import javax.resource.ResourceException; +import javax.resource.spi.ActivationSpec; import javax.resource.spi.BootstrapContext; import javax.resource.spi.ResourceAdapterInternalException; +import javax.resource.spi.endpoint.MessageEndpointFactory; import java.lang.reflect.Method; import java.net.URISyntaxException; import java.util.Collection; import java.util.Iterator; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import static javax.management.MBeanOperationInfo.ACTION; + @SuppressWarnings("UnusedDeclaration") public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQResourceAdapter { @@ -40,6 +69,7 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso private String useDatabaseLock; private String startupTimeout = "60000"; private BootstrapContext bootstrapContext; + private final Map<BeanContext, ObjectName> mbeanNames = new ConcurrentHashMap<>(); public String getDataSource() { return dataSource; @@ -143,6 +173,87 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso } @Override + public void endpointActivation(final MessageEndpointFactory endpointFactory, final ActivationSpec activationSpec) throws ResourceException { + final BeanContext current = MdbContainer.current(); + if (current != null && "false".equalsIgnoreCase(current.getProperties().getProperty("MdbActiveOnStartup"))) { + if (!equals(activationSpec.getResourceAdapter())) { + throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " != " + this + ")"); + } + if (!(activationSpec instanceof MessageActivationSpec)) { + throw new NotSupportedException("That type of ActivationSpec not supported: " + activationSpec.getClass()); + } + + final ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, MessageActivationSpec.class.cast(activationSpec)); + Map.class.cast(Reflections.get(this, "endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) { + }); + // we dont want that worker.start(); + } else { + super.endpointActivation(endpointFactory, activationSpec); + } + + if (current != null) { + addJMxControl(current, current.getProperties().getProperty("MdbJMXControl")); + } + } + + private void addJMxControl(final BeanContext current, final String name) throws ResourceException { + if (name == null || "false".equalsIgnoreCase(name)) { + return; + } + + final ActiveMQEndpointWorker worker = getWorker(current); + final ObjectName jmxName; + try { + jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder() + .set("J2EEServer", "openejb") + .set("J2EEApplication", null) + .set("EJBModule", current.getModuleID()) + .set("StatelessSessionBean", current.getEjbName()) + .set("j2eeType", "control") + .set("name", current.getEjbName()) + .build() : new ObjectName(name); + } catch (final MalformedObjectNameException e) { + throw new IllegalArgumentException(e); + } + mbeanNames.put(current, jmxName); + + final MBeanInfo info = new MBeanInfo( + "com.tomitribe.tomee.mdb.MdbControl", + "Allows to control a MDB listener", + new MBeanAttributeInfo[0], + new MBeanConstructorInfo[0], + new MBeanOperationInfo[]{ + new MBeanOperationInfo("start", "Ensure the listener is active.", new MBeanParameterInfo[0], "void", ACTION), + new MBeanOperationInfo("stop", "Ensure the listener is not active.", new MBeanParameterInfo[0], "void", ACTION) + }, + new MBeanNotificationInfo[0] + ); + LocalMBeanServer.registerSilently(new MdbJmxControl(worker), jmxName); + log.info("Deployed MDB control for " + current.getDeploymentID() + " on " + jmxName); + } + + @Override + public void endpointDeactivation(final MessageEndpointFactory endpointFactory, final ActivationSpec activationSpec) { + final BeanContext current = MdbContainer.current(); + if (current != null && "true".equalsIgnoreCase(current.getProperties().getProperty("MdbJMXControl"))) { + LocalMBeanServer.unregisterSilently(mbeanNames.remove(current)); + log.info("Undeployed MDB control for " + current.getDeploymentID()); + } + super.endpointDeactivation(endpointFactory, activationSpec); + } + + private ActiveMQEndpointWorker getWorker(final BeanContext beanContext) throws ResourceException { + final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> workers = Map.class.cast(Reflections.get( + MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(), "endpointWorkers")); + for (final Map.Entry<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> entry : workers.entrySet()) { + if (entry.getKey().getMessageEndpointFactory() == beanContext.getContainerData()) { + return entry.getValue(); + } + } + throw new IllegalStateException("No worker for " + beanContext.getDeploymentID()); + } + + @Override public BootstrapContext getBootstrapContext() { return this.bootstrapContext; } @@ -210,4 +321,72 @@ public class ActiveMQResourceAdapter extends org.apache.activemq.ra.ActiveMQReso //Ignore } } + + public static class MdbJmxControl implements DynamicMBean { + private static final AttributeList ATTRIBUTE_LIST = new AttributeList(); + private static final MBeanInfo INFO = new MBeanInfo( + "org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl", + "Allows to control a MDB (start/stop)", + new MBeanAttributeInfo[0], + new MBeanConstructorInfo[0], + new MBeanOperationInfo[]{ + new MBeanOperationInfo("start", "Ensure the listener is active.", new MBeanParameterInfo[0], "void", ACTION), + new MBeanOperationInfo("stop", "Ensure the listener is not active.", new MBeanParameterInfo[0], "void", ACTION) + }, + new MBeanNotificationInfo[0]); + + private final ActiveMQEndpointWorker worker; + + private MdbJmxControl(final ActiveMQEndpointWorker worker) { + this.worker = worker; + } + + @Override + public Object invoke(final String actionName, final Object[] params, final String[] signature) throws MBeanException, ReflectionException { + switch (actionName) { + case "stop": + try { + worker.stop(); + } catch (final InterruptedException e) { + Thread.interrupted(); + } + break; + case "start": + try { + worker.start(); + } catch (ResourceException e) { + throw new MBeanException(new IllegalStateException(e.getMessage())); + } + break; + default: + throw new MBeanException(new IllegalStateException("unsupported operation: " + actionName)); + } + return null; + } + + @Override + public MBeanInfo getMBeanInfo() { + return INFO; + } + + @Override + public Object getAttribute(final String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException { + throw new AttributeNotFoundException(); + } + + @Override + public void setAttribute(final Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException { + throw new AttributeNotFoundException(); + } + + @Override + public AttributeList getAttributes(final String[] attributes) { + return ATTRIBUTE_LIST; + } + + @Override + public AttributeList setAttributes(final AttributeList attributes) { + return ATTRIBUTE_LIST; + } + } } http://git-wip-us.apache.org/repos/asf/tomee/blob/8dd71d44/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java new file mode 100644 index 0000000..a574f59 --- /dev/null +++ b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.resource.activemq; + +import org.apache.openejb.config.EjbModule; +import org.apache.openejb.jee.EjbJar; +import org.apache.openejb.jee.oejb3.EjbDeployment; +import org.apache.openejb.jee.oejb3.OpenejbJar; +import org.apache.openejb.junit.ApplicationComposer; +import org.apache.openejb.monitoring.LocalMBeanServer; +import org.apache.openejb.testing.Classes; +import org.apache.openejb.testing.Configuration; +import org.apache.openejb.testing.Module; +import org.apache.openejb.testng.PropertiesBuilder; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.annotation.Resource; +import javax.ejb.MessageDriven; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.ObjectName; +import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(ApplicationComposer.class) +public class ActiveMQResourceAdapterControlTest { + @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb") + private Queue queue; + + @Resource + private ConnectionFactory connectionFactory; + + @Configuration + public Properties config() { + return new PropertiesBuilder() + .p("ra", "new://Resource?type=ActiveMQResourceAdapter") + .p("ra.brokerXmlConfig", "broker:(vm://localhost)?useJmx=false&persistent=false") + .p("ra.serverUrl", "vm://localhost") + + .p("mdb", "new://Container?type=MESSAGE") + .p("mdb.resourceAdapter", "ra") + + .p("cf", "new://Resource?type=javax.jms.ConnectionFactory") + .p("cf.resourceAdapter", "ra") + + .p("openejb.deploymentId.format", "{appId}/{ejbJarId}/{ejbName}") + .build(); + } + + @Module + @Classes(value = Mdb.class) + public EjbModule app() { + return new EjbModule(new EjbJar("test"), new OpenejbJar() {{ + setId("test"); + getEjbDeployment().add(new EjbDeployment() {{ + setEjbName("ejb/Mdb"); + getProperties().put("MdbActiveOnStartup", "false"); + getProperties().put("MdbJMXControl", "default:type=test"); + }}); + }}); + } + + @Test + public void ensureControl() throws Exception { + assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", 10, TimeUnit.SECONDS)); + + setControl("start"); + assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES)); + assertEquals("Will be received after", Mdb.awaiter.message); + + final long start = System.currentTimeMillis(); + assertTrue(sendAndWait("First", 1, TimeUnit.MINUTES)); + assertEquals("First", Mdb.awaiter.message); + final long end = System.currentTimeMillis(); + + Mdb.awaiter.message = null; + setControl("stop"); + // default would be wait 10s, but if machine is slow we compute it from the first msg stats + final long waitWithoutResponse = Math.max(10, 5 * (end - start) / 1000); + System.out.println("We'll wait " + waitWithoutResponse + "s to get a message on a stopped listener"); + assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", waitWithoutResponse, TimeUnit.SECONDS)); + assertNull(Mdb.awaiter.message); + + setControl("start"); + assertTrue(sendAndWait("Second", 1, TimeUnit.MINUTES)); + assertEquals("Will be received after", Mdb.awaiter.message); + + Mdb.awaiter.message = null; + assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES)); + assertEquals("Second", Mdb.awaiter.message); + } + + private void setControl(final String action) throws Exception { + LocalMBeanServer.get().invoke( + new ObjectName("default:type=test"), + action, new Object[0], new String[0]); + } + + private boolean sendAndWait(final String second, final long wait, final TimeUnit unit) throws JMSException { + doSend(second); + try { + return Mdb.awaiter.semaphore.tryAcquire(wait, unit); + } catch (final InterruptedException e) { + Thread.interrupted(); + fail(); + return false; + } + } + + private void doSend(final String txt) throws JMSException { + Connection c = null; + try { + c = connectionFactory.createConnection(); + Session session = null; + try { + session = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = null; + try { + producer = session.createProducer(queue); + producer.send(session.createTextMessage(txt)); + } finally { + if (producer != null) { + producer.close(); + } + } + } finally { + if (session != null) { + session.close(); + } + } + } finally { + if (c != null) { + c.close(); + } + } + } + + @MessageDriven(name = "ejb/Mdb") + public static class Mdb implements MessageListener { + static final MessageAwaiter awaiter = new MessageAwaiter(); + + @Override + public synchronized void onMessage(final Message message) { + try { + awaiter.message = TextMessage.class.cast(message).getText(); + } catch (final JMSException e) { + throw new IllegalStateException(e); + } finally { + awaiter.semaphore.release(); + } + } + } + + public static class MessageAwaiter { + private final Semaphore semaphore = new Semaphore(0); + private volatile String message; + } +}
