Hope no-one minds - I have backported this to 1.7.x: https://github.com/apache/tomee/pull/62
If no-one objects, I'll merge this in. Many thanks Jon On Thu, Mar 9, 2017 at 7:09 PM, <[email protected]> wrote: > Repository: tomee > Updated Branches: > refs/heads/master 933bd899d -> 9c661758a > > > TOMEE-2021 allow to control through JMX MDB listening state > > > Project: http://git-wip-us.apache.org/repos/asf/tomee/repo > Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/9c661758 > Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/9c661758 > Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/9c661758 > > Branch: refs/heads/master > Commit: 9c661758aeff813d05748e91d01c1e522c09c0a4 > Parents: 933bd89 > Author: rmannibucau <[email protected]> > Authored: Thu Mar 9 20:09:50 2017 +0100 > Committer: rmannibucau <[email protected]> > Committed: Thu Mar 9 20:09:50 2017 +0100 > > ---------------------------------------------------------------------- > .../apache/openejb/core/mdb/MdbContainer.java | 21 ++- > .../activemq/ActiveMQResourceAdapter.java | 179 ++++++++++++++++++ > .../ActiveMQResourceAdapterControlTest.java | 187 +++++++++++++++++++ > 3 files changed, 386 insertions(+), 1 deletion(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/ > container/openejb-core/src/main/java/org/apache/openejb/ > core/mdb/MdbContainer.java > ---------------------------------------------------------------------- > diff --git > a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java > b/container/openejb-core/src/main/java/org/apache/openejb/ > core/mdb/MdbContainer.java > index 389d228..a17f342 100644 > --- a/container/openejb-core/src/main/java/org/apache/openejb/ > core/mdb/MdbContainer.java > +++ b/container/openejb-core/src/main/java/org/apache/openejb/ > core/mdb/MdbContainer.java > @@ -71,6 +71,9 @@ import static org.apache.openejb.core.transaction. > EjbTransactionUtil.handleSyste > > public class MdbContainer implements RpcContainer { > private static final Logger logger = > Logger.getInstance(LogCategory.OPENEJB, > "org.apache.openejb.util.resources"); > + > + private static final ThreadLocal<BeanContext> CURRENT = new > ThreadLocal<>(); > + > private static final Object[] NO_ARGS = new Object[0]; > > private final Object containerID; > @@ -183,6 +186,7 @@ public class MdbContainer implements RpcContainer { > } > > // activate the endpoint > + CURRENT.set(beanContext); > try { > resourceAdapter.endpointActivation(endpointFactory, > activationSpec); > } catch (final ResourceException e) { > @@ -192,6 +196,8 @@ public class MdbContainer implements RpcContainer { > deployments.remove(deploymentId); > > throw new OpenEJBException(e); > + } finally { > + CURRENT.remove(); > } > } > > @@ -276,7 +282,12 @@ public class MdbContainer implements RpcContainer { > try { > final EndpointFactory endpointFactory = (EndpointFactory) > beanContext.getContainerData(); > if (endpointFactory != null) { > - resourceAdapter.endpointDeactivation(endpointFactory, > endpointFactory.getActivationSpec()); > + CURRENT.set(beanContext); > + try { > + resourceAdapter.endpointDeactivation(endpointFactory, > endpointFactory.getActivationSpec()); > + } finally { > + CURRENT.remove(); > + } > > final MBeanServer server = LocalMBeanServer.get(); > for (final ObjectName objectName : > endpointFactory.jmxNames) { > @@ -481,6 +492,14 @@ public class MdbContainer implements RpcContainer { > } > } > > + public static BeanContext current() { > + final BeanContext beanContext = CURRENT.get(); > + if (beanContext == null) { > + CURRENT.remove(); > + } > + return beanContext; > + } > + > private static class MdbCallContext { > private Method deliveryMethod; > private TransactionPolicy txPolicy; > > http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/ > container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ > ActiveMQResourceAdapter.java > ---------------------------------------------------------------------- > diff --git a/container/openejb-core/src/main/java/org/apache/openejb/ > resource/activemq/ActiveMQResourceAdapter.java > b/container/openejb-core/src/main/java/org/apache/openejb/ > resource/activemq/ActiveMQResourceAdapter.java > index dde1e2b..73762ce 100644 > --- a/container/openejb-core/src/main/java/org/apache/openejb/ > resource/activemq/ActiveMQResourceAdapter.java > +++ b/container/openejb-core/src/main/java/org/apache/openejb/ > resource/activemq/ActiveMQResourceAdapter.java > @@ -22,9 +22,15 @@ import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.RedeliveryPolicy; > import org.apache.activemq.broker.BrokerService; > import org.apache.activemq.ra.ActiveMQConnectionRequestInfo; > +import org.apache.activemq.ra.ActiveMQEndpointActivationKey; > +import org.apache.activemq.ra.ActiveMQEndpointWorker; > import org.apache.activemq.ra.ActiveMQManagedConnection; > import org.apache.activemq.ra.MessageActivationSpec; > +import org.apache.openejb.BeanContext; > +import org.apache.openejb.core.mdb.MdbContainer; > import org.apache.openejb.loader.SystemInstance; > +import org.apache.openejb.monitoring.LocalMBeanServer; > +import org.apache.openejb.monitoring.ObjectNameBuilder; > import org.apache.openejb.resource.AutoConnectionTracker; > import org.apache.openejb.resource.activemq.jms2.TomEEConnectionFactory; > import org.apache.openejb.resource.activemq.jms2. > TomEEManagedConnectionProxy; > @@ -38,9 +44,28 @@ import org.apache.openejb.util.reflection.Reflections; > > import javax.jms.Connection; > import javax.jms.JMSException; > +import javax.management.Attribute; > +import javax.management.AttributeList; > +import javax.management.AttributeNotFoundException; > +import javax.management.DynamicMBean; > +import javax.management.InvalidAttributeValueException; > +import javax.management.MBeanAttributeInfo; > +import javax.management.MBeanConstructorInfo; > +import javax.management.MBeanException; > +import javax.management.MBeanInfo; > +import javax.management.MBeanNotificationInfo; > +import javax.management.MBeanOperationInfo; > +import javax.management.MBeanParameterInfo; > +import javax.management.MalformedObjectNameException; > +import javax.management.ObjectName; > +import javax.management.ReflectionException; > import javax.naming.NamingException; > +import javax.resource.NotSupportedException; > +import javax.resource.ResourceException; > +import javax.resource.spi.ActivationSpec; > import javax.resource.spi.BootstrapContext; > import javax.resource.spi.ResourceAdapterInternalException; > +import javax.resource.spi.endpoint.MessageEndpointFactory; > import java.lang.reflect.InvocationHandler; > import java.lang.reflect.Method; > import java.lang.reflect.Proxy; > @@ -48,9 +73,13 @@ import java.net.URISyntaxException; > import java.util.Collection; > import java.util.Iterator; > import java.util.Locale; > +import java.util.Map; > import java.util.Properties; > +import java.util.concurrent.ConcurrentHashMap; > import java.util.concurrent.TimeUnit; > > +import static javax.management.MBeanOperationInfo.ACTION; > + > @SuppressWarnings("UnusedDeclaration") > public class ActiveMQResourceAdapter extends > org.apache.activemq.ra.ActiveMQResourceAdapter > { > > @@ -58,6 +87,7 @@ public class ActiveMQResourceAdapter extends > org.apache.activemq.ra.ActiveMQReso > private String useDatabaseLock; > private String startupTimeout = "60000"; > private BootstrapContext bootstrapContext; > + private final Map<BeanContext, ObjectName> mbeanNames = new > ConcurrentHashMap<>(); > > public String getDataSource() { > return dataSource; > @@ -161,6 +191,87 @@ public class ActiveMQResourceAdapter extends > org.apache.activemq.ra.ActiveMQReso > } > > @Override > + public void endpointActivation(final MessageEndpointFactory > endpointFactory, final ActivationSpec activationSpec) throws > ResourceException { > + final BeanContext current = MdbContainer.current(); > + if (current != null && "false".equalsIgnoreCase( > current.getProperties().getProperty("MdbActiveOnStartup"))) { > + if (!equals(activationSpec.getResourceAdapter())) { > + throw new ResourceException("Activation spec not > initialized with this ResourceAdapter instance (" + > activationSpec.getResourceAdapter() > + " != " + this + ")"); > + } > + if (!(activationSpec instanceof MessageActivationSpec)) { > + throw new NotSupportedException("That type of > ActivationSpec not supported: " + activationSpec.getClass()); > + } > + > + final ActiveMQEndpointActivationKey key = new > ActiveMQEndpointActivationKey(endpointFactory, > MessageActivationSpec.class.cast(activationSpec)); > + Map.class.cast(Reflections.get(this, > "endpointWorkers")).put(key, new ActiveMQEndpointWorker(this, key) { > + }); > + // we dont want that worker.start(); > + } else { > + super.endpointActivation(endpointFactory, activationSpec); > + } > + > + if (current != null) { > + addJMxControl(current, current.getProperties(). > getProperty("MdbJMXControl")); > + } > + } > + > + private void addJMxControl(final BeanContext current, final String > name) throws ResourceException { > + if (name == null || "false".equalsIgnoreCase(name)) { > + return; > + } > + > + final ActiveMQEndpointWorker worker = getWorker(current); > + final ObjectName jmxName; > + try { > + jmxName = "true".equalsIgnoreCase(name) ? new > ObjectNameBuilder() > + .set("J2EEServer", "openejb") > + .set("J2EEApplication", null) > + .set("EJBModule", current.getModuleID()) > + .set("StatelessSessionBean", current.getEjbName()) > + .set("j2eeType", "control") > + .set("name", current.getEjbName()) > + .build() : new ObjectName(name); > + } catch (final MalformedObjectNameException e) { > + throw new IllegalArgumentException(e); > + } > + mbeanNames.put(current, jmxName); > + > + final MBeanInfo info = new MBeanInfo( > + "com.tomitribe.tomee.mdb.MdbControl", > + "Allows to control a MDB listener", > + new MBeanAttributeInfo[0], > + new MBeanConstructorInfo[0], > + new MBeanOperationInfo[]{ > + new MBeanOperationInfo("start", "Ensure the > listener is active.", new MBeanParameterInfo[0], "void", ACTION), > + new MBeanOperationInfo("stop", "Ensure the > listener is not active.", new MBeanParameterInfo[0], "void", ACTION) > + }, > + new MBeanNotificationInfo[0] > + ); > + LocalMBeanServer.registerSilently(new MdbJmxControl(worker), > jmxName); > + log.info("Deployed MDB control for " + current.getDeploymentID() > + " on " + jmxName); > + } > + > + @Override > + public void endpointDeactivation(final MessageEndpointFactory > endpointFactory, final ActivationSpec activationSpec) { > + final BeanContext current = MdbContainer.current(); > + if (current != null && "true".equalsIgnoreCase( > current.getProperties().getProperty("MdbJMXControl"))) { > + LocalMBeanServer.unregisterSilently(mbeanNames. > remove(current)); > + log.info("Undeployed MDB control for " + > current.getDeploymentID()); > + } > + super.endpointDeactivation(endpointFactory, activationSpec); > + } > + > + private ActiveMQEndpointWorker getWorker(final BeanContext > beanContext) throws ResourceException { > + final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> > workers = Map.class.cast(Reflections.get( > + > MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(), > "endpointWorkers")); > + for (final Map.Entry<ActiveMQEndpointActivationKey, > ActiveMQEndpointWorker> entry : workers.entrySet()) { > + if (entry.getKey().getMessageEndpointFactory() == > beanContext.getContainerData()) { > + return entry.getValue(); > + } > + } > + throw new IllegalStateException("No worker for " + > beanContext.getDeploymentID()); > + } > + > + @Override > public BootstrapContext getBootstrapContext() { > return this.bootstrapContext; > } > @@ -296,4 +407,72 @@ public class ActiveMQResourceAdapter extends > org.apache.activemq.ra.ActiveMQReso > //Ignore > } > } > + > + public static class MdbJmxControl implements DynamicMBean { > + private static final AttributeList ATTRIBUTE_LIST = new > AttributeList(); > + private static final MBeanInfo INFO = new MBeanInfo( > + "org.apache.openejb.resource.activemq. > ActiveMQResourceAdapter.MdbJmxControl", > + "Allows to control a MDB (start/stop)", > + new MBeanAttributeInfo[0], > + new MBeanConstructorInfo[0], > + new MBeanOperationInfo[]{ > + new MBeanOperationInfo("start", "Ensure the > listener is active.", new MBeanParameterInfo[0], "void", ACTION), > + new MBeanOperationInfo("stop", "Ensure the > listener is not active.", new MBeanParameterInfo[0], "void", ACTION) > + }, > + new MBeanNotificationInfo[0]); > + > + private final ActiveMQEndpointWorker worker; > + > + private MdbJmxControl(final ActiveMQEndpointWorker worker) { > + this.worker = worker; > + } > + > + @Override > + public Object invoke(final String actionName, final Object[] > params, final String[] signature) throws MBeanException, > ReflectionException { > + switch (actionName) { > + case "stop": > + try { > + worker.stop(); > + } catch (final InterruptedException e) { > + Thread.interrupted(); > + } > + break; > + case "start": > + try { > + worker.start(); > + } catch (ResourceException e) { > + throw new MBeanException(new > IllegalStateException(e.getMessage())); > + } > + break; > + default: > + throw new MBeanException(new > IllegalStateException("unsupported > operation: " + actionName)); > + } > + return null; > + } > + > + @Override > + public MBeanInfo getMBeanInfo() { > + return INFO; > + } > + > + @Override > + public Object getAttribute(final String attribute) throws > AttributeNotFoundException, MBeanException, ReflectionException { > + throw new AttributeNotFoundException(); > + } > + > + @Override > + public void setAttribute(final Attribute attribute) throws > AttributeNotFoundException, InvalidAttributeValueException, > MBeanException, ReflectionException { > + throw new AttributeNotFoundException(); > + } > + > + @Override > + public AttributeList getAttributes(final String[] attributes) { > + return ATTRIBUTE_LIST; > + } > + > + @Override > + public AttributeList setAttributes(final AttributeList > attributes) { > + return ATTRIBUTE_LIST; > + } > + } > } > > http://git-wip-us.apache.org/repos/asf/tomee/blob/9c661758/ > container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ > ActiveMQResourceAdapterControlTest.java > ---------------------------------------------------------------------- > diff --git a/container/openejb-core/src/test/java/org/apache/openejb/ > resource/activemq/ActiveMQResourceAdapterControlTest.java > b/container/openejb-core/src/test/java/org/apache/openejb/ > resource/activemq/ActiveMQResourceAdapterControlTest.java > new file mode 100644 > index 0000000..a574f59 > --- /dev/null > +++ b/container/openejb-core/src/test/java/org/apache/openejb/ > resource/activemq/ActiveMQResourceAdapterControlTest.java > @@ -0,0 +1,187 @@ > +/* > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.openejb.resource.activemq; > + > +import org.apache.openejb.config.EjbModule; > +import org.apache.openejb.jee.EjbJar; > +import org.apache.openejb.jee.oejb3.EjbDeployment; > +import org.apache.openejb.jee.oejb3.OpenejbJar; > +import org.apache.openejb.junit.ApplicationComposer; > +import org.apache.openejb.monitoring.LocalMBeanServer; > +import org.apache.openejb.testing.Classes; > +import org.apache.openejb.testing.Configuration; > +import org.apache.openejb.testing.Module; > +import org.apache.openejb.testng.PropertiesBuilder; > +import org.junit.Test; > +import org.junit.runner.RunWith; > + > +import javax.annotation.Resource; > +import javax.ejb.MessageDriven; > +import javax.jms.Connection; > +import javax.jms.ConnectionFactory; > +import javax.jms.JMSException; > +import javax.jms.Message; > +import javax.jms.MessageListener; > +import javax.jms.MessageProducer; > +import javax.jms.Queue; > +import javax.jms.Session; > +import javax.jms.TextMessage; > +import javax.management.ObjectName; > +import java.util.Properties; > +import java.util.concurrent.Semaphore; > +import java.util.concurrent.TimeUnit; > + > +import static org.junit.Assert.assertEquals; > +import static org.junit.Assert.assertFalse; > +import static org.junit.Assert.assertNull; > +import static org.junit.Assert.assertTrue; > +import static org.junit.Assert.fail; > + > +@RunWith(ApplicationComposer.class) > +public class ActiveMQResourceAdapterControlTest { > + @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb") > + private Queue queue; > + > + @Resource > + private ConnectionFactory connectionFactory; > + > + @Configuration > + public Properties config() { > + return new PropertiesBuilder() > + .p("ra", "new://Resource?type=ActiveMQResourceAdapter") > + .p("ra.brokerXmlConfig", "broker:(vm://localhost)? > useJmx=false&persistent=false") > + .p("ra.serverUrl", "vm://localhost") > + > + .p("mdb", "new://Container?type=MESSAGE") > + .p("mdb.resourceAdapter", "ra") > + > + .p("cf", "new://Resource?type=javax. > jms.ConnectionFactory") > + .p("cf.resourceAdapter", "ra") > + > + .p("openejb.deploymentId.format", > "{appId}/{ejbJarId}/{ejbName}") > + .build(); > + } > + > + @Module > + @Classes(value = Mdb.class) > + public EjbModule app() { > + return new EjbModule(new EjbJar("test"), new OpenejbJar() {{ > + setId("test"); > + getEjbDeployment().add(new EjbDeployment() {{ > + setEjbName("ejb/Mdb"); > + getProperties().put("MdbActiveOnStartup", "false"); > + getProperties().put("MdbJMXControl", > "default:type=test"); > + }}); > + }}); > + } > + > + @Test > + public void ensureControl() throws Exception { > + assertFalse(Mdb.awaiter.message, sendAndWait("Will be received > after", 10, TimeUnit.SECONDS)); > + > + setControl("start"); > + assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, > TimeUnit.MINUTES)); > + assertEquals("Will be received after", Mdb.awaiter.message); > + > + final long start = System.currentTimeMillis(); > + assertTrue(sendAndWait("First", 1, TimeUnit.MINUTES)); > + assertEquals("First", Mdb.awaiter.message); > + final long end = System.currentTimeMillis(); > + > + Mdb.awaiter.message = null; > + setControl("stop"); > + // default would be wait 10s, but if machine is slow we compute > it from the first msg stats > + final long waitWithoutResponse = Math.max(10, 5 * (end - start) / > 1000); > + System.out.println("We'll wait " + waitWithoutResponse + "s to > get a message on a stopped listener"); > + assertFalse(Mdb.awaiter.message, sendAndWait("Will be received > after", waitWithoutResponse, TimeUnit.SECONDS)); > + assertNull(Mdb.awaiter.message); > + > + setControl("start"); > + assertTrue(sendAndWait("Second", 1, TimeUnit.MINUTES)); > + assertEquals("Will be received after", Mdb.awaiter.message); > + > + Mdb.awaiter.message = null; > + assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, > TimeUnit.MINUTES)); > + assertEquals("Second", Mdb.awaiter.message); > + } > + > + private void setControl(final String action) throws Exception { > + LocalMBeanServer.get().invoke( > + new ObjectName("default:type=test"), > + action, new Object[0], new String[0]); > + } > + > + private boolean sendAndWait(final String second, final long wait, > final TimeUnit unit) throws JMSException { > + doSend(second); > + try { > + return Mdb.awaiter.semaphore.tryAcquire(wait, unit); > + } catch (final InterruptedException e) { > + Thread.interrupted(); > + fail(); > + return false; > + } > + } > + > + private void doSend(final String txt) throws JMSException { > + Connection c = null; > + try { > + c = connectionFactory.createConnection(); > + Session session = null; > + try { > + session = c.createSession(false, > Session.AUTO_ACKNOWLEDGE); > + MessageProducer producer = null; > + try { > + producer = session.createProducer(queue); > + producer.send(session.createTextMessage(txt)); > + } finally { > + if (producer != null) { > + producer.close(); > + } > + } > + } finally { > + if (session != null) { > + session.close(); > + } > + } > + } finally { > + if (c != null) { > + c.close(); > + } > + } > + } > + > + @MessageDriven(name = "ejb/Mdb") > + public static class Mdb implements MessageListener { > + static final MessageAwaiter awaiter = new MessageAwaiter(); > + > + @Override > + public synchronized void onMessage(final Message message) { > + try { > + awaiter.message = TextMessage.class.cast( > message).getText(); > + } catch (final JMSException e) { > + throw new IllegalStateException(e); > + } finally { > + awaiter.semaphore.release(); > + } > + } > + } > + > + public static class MessageAwaiter { > + private final Semaphore semaphore = new Semaphore(0); > + private volatile String message; > + } > +} > >
