Repository: tomee
Updated Branches:
  refs/heads/tomee-1.7.x 5e0704878 -> a46a75862


TOMEE-2021 allow to control through JMX MDB listening state


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/8dd71d44
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/8dd71d44
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/8dd71d44

Branch: refs/heads/tomee-1.7.x
Commit: 8dd71d448144035ab45a119b55700effa2b7ff87
Parents: 5e07048
Author: rmannibucau <[email protected]>
Authored: Thu Mar 9 20:09:50 2017 +0100
Committer: Jonathan Gallimore <[email protected]>
Committed: Mon Mar 13 11:16:17 2017 +0000

----------------------------------------------------------------------
 .../apache/openejb/core/mdb/MdbContainer.java   |  21 ++-
 .../activemq/ActiveMQResourceAdapter.java       | 179 ++++++++++++++++++
 .../ActiveMQResourceAdapterControlTest.java     | 187 +++++++++++++++++++
 3 files changed, 386 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/8dd71d44/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
index df940be..b382799 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/MdbContainer.java
@@ -71,6 +71,9 @@ import static 
org.apache.openejb.core.transaction.EjbTransactionUtil.handleSyste
 
 public class MdbContainer implements RpcContainer {
     private static final Logger logger = 
Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
+
+    private static final ThreadLocal<BeanContext> CURRENT = new 
ThreadLocal<>();
+
     private static final Object[] NO_ARGS = new Object[0];
 
     private final Object containerID;
@@ -179,6 +182,7 @@ public class MdbContainer implements RpcContainer {
         }
 
         // activate the endpoint
+        CURRENT.set(beanContext);
         try {
             resourceAdapter.endpointActivation(endpointFactory, 
activationSpec);
         } catch (final ResourceException e) {
@@ -188,6 +192,8 @@ public class MdbContainer implements RpcContainer {
             deployments.remove(deploymentId);
 
             throw new OpenEJBException(e);
+        } finally {
+            CURRENT.remove();
         }
     }
 
@@ -264,7 +270,12 @@ public class MdbContainer implements RpcContainer {
         try {
             final EndpointFactory endpointFactory = (EndpointFactory) 
beanContext.getContainerData();
             if (endpointFactory != null) {
-                resourceAdapter.endpointDeactivation(endpointFactory, 
endpointFactory.getActivationSpec());
+                CURRENT.set(beanContext);
+                try {
+                    resourceAdapter.endpointDeactivation(endpointFactory, 
endpointFactory.getActivationSpec());
+                } finally {
+                    CURRENT.remove();
+                }
 
                 final MBeanServer server = LocalMBeanServer.get();
                 for (final ObjectName objectName : endpointFactory.jmxNames) {
@@ -472,6 +483,14 @@ public class MdbContainer implements RpcContainer {
         }
     }
 
+    public static BeanContext current() {
+        final BeanContext beanContext = CURRENT.get();
+        if (beanContext == null) {
+            CURRENT.remove();
+        }
+        return beanContext;
+    }
+
     private static class MdbCallContext {
         private Method deliveryMethod;
         private TransactionPolicy txPolicy;

http://git-wip-us.apache.org/repos/asf/tomee/blob/8dd71d44/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
index 8880875..b0ad912 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapter.java
@@ -18,21 +18,50 @@
 package org.apache.openejb.resource.activemq;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.ra.ActiveMQEndpointActivationKey;
+import org.apache.activemq.ra.ActiveMQEndpointWorker;
+import org.apache.openejb.BeanContext;
+import org.apache.openejb.core.mdb.MdbContainer;
+import org.apache.openejb.monitoring.LocalMBeanServer;
+import org.apache.openejb.monitoring.ObjectNameBuilder;
 import org.apache.openejb.util.Duration;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
 import org.apache.openejb.util.URISupport;
 import org.apache.openejb.util.URLs;
 
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanConstructorInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.resource.NotSupportedException;
+import javax.resource.ResourceException;
+import javax.resource.spi.ActivationSpec;
 import javax.resource.spi.BootstrapContext;
 import javax.resource.spi.ResourceAdapterInternalException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
 import java.lang.reflect.Method;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import static javax.management.MBeanOperationInfo.ACTION;
+
 @SuppressWarnings("UnusedDeclaration")
 public class ActiveMQResourceAdapter extends 
org.apache.activemq.ra.ActiveMQResourceAdapter {
 
@@ -40,6 +69,7 @@ public class ActiveMQResourceAdapter extends 
org.apache.activemq.ra.ActiveMQReso
     private String useDatabaseLock;
     private String startupTimeout = "60000";
     private BootstrapContext bootstrapContext;
+    private final Map<BeanContext, ObjectName> mbeanNames = new 
ConcurrentHashMap<>();
 
     public String getDataSource() {
         return dataSource;
@@ -143,6 +173,87 @@ public class ActiveMQResourceAdapter extends 
org.apache.activemq.ra.ActiveMQReso
     }
 
     @Override
+    public void endpointActivation(final MessageEndpointFactory 
endpointFactory, final ActivationSpec activationSpec) throws ResourceException {
+        final BeanContext current = MdbContainer.current();
+        if (current != null && 
"false".equalsIgnoreCase(current.getProperties().getProperty("MdbActiveOnStartup")))
 {
+            if (!equals(activationSpec.getResourceAdapter())) {
+                throw new ResourceException("Activation spec not initialized 
with this ResourceAdapter instance (" + activationSpec.getResourceAdapter() + " 
!= " + this + ")");
+            }
+            if (!(activationSpec instanceof MessageActivationSpec)) {
+                throw new NotSupportedException("That type of ActivationSpec 
not supported: " + activationSpec.getClass());
+            }
+
+            final ActiveMQEndpointActivationKey key = new 
ActiveMQEndpointActivationKey(endpointFactory, 
MessageActivationSpec.class.cast(activationSpec));
+            Map.class.cast(Reflections.get(this, "endpointWorkers")).put(key, 
new ActiveMQEndpointWorker(this, key) {
+            });
+            // we dont want that worker.start();
+        } else {
+            super.endpointActivation(endpointFactory, activationSpec);
+        }
+
+        if (current != null) {
+            addJMxControl(current, 
current.getProperties().getProperty("MdbJMXControl"));
+        }
+    }
+
+    private void addJMxControl(final BeanContext current, final String name) 
throws ResourceException {
+        if (name == null || "false".equalsIgnoreCase(name)) {
+            return;
+        }
+
+        final ActiveMQEndpointWorker worker = getWorker(current);
+        final ObjectName jmxName;
+        try {
+            jmxName = "true".equalsIgnoreCase(name) ? new ObjectNameBuilder()
+                    .set("J2EEServer", "openejb")
+                    .set("J2EEApplication", null)
+                    .set("EJBModule", current.getModuleID())
+                    .set("StatelessSessionBean", current.getEjbName())
+                    .set("j2eeType", "control")
+                    .set("name", current.getEjbName())
+                    .build() : new ObjectName(name);
+        } catch (final MalformedObjectNameException e) {
+            throw new IllegalArgumentException(e);
+        }
+        mbeanNames.put(current, jmxName);
+
+        final MBeanInfo info = new MBeanInfo(
+                "com.tomitribe.tomee.mdb.MdbControl",
+                "Allows to control a MDB listener",
+                new MBeanAttributeInfo[0],
+                new MBeanConstructorInfo[0],
+                new MBeanOperationInfo[]{
+                        new MBeanOperationInfo("start", "Ensure the listener 
is active.", new MBeanParameterInfo[0], "void", ACTION),
+                        new MBeanOperationInfo("stop", "Ensure the listener is 
not active.", new MBeanParameterInfo[0], "void", ACTION)
+                },
+                new MBeanNotificationInfo[0]
+        );
+        LocalMBeanServer.registerSilently(new MdbJmxControl(worker), jmxName);
+        log.info("Deployed MDB control for " + current.getDeploymentID() + " 
on " + jmxName);
+    }
+
+    @Override
+    public void endpointDeactivation(final MessageEndpointFactory 
endpointFactory, final ActivationSpec activationSpec) {
+        final BeanContext current = MdbContainer.current();
+        if (current != null && 
"true".equalsIgnoreCase(current.getProperties().getProperty("MdbJMXControl"))) {
+            LocalMBeanServer.unregisterSilently(mbeanNames.remove(current));
+            log.info("Undeployed MDB control for " + 
current.getDeploymentID());
+        }
+        super.endpointDeactivation(endpointFactory, activationSpec);
+    }
+
+    private ActiveMQEndpointWorker getWorker(final BeanContext beanContext) 
throws ResourceException {
+        final Map<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> 
workers = Map.class.cast(Reflections.get(
+                
MdbContainer.class.cast(beanContext.getContainer()).getResourceAdapter(), 
"endpointWorkers"));
+        for (final Map.Entry<ActiveMQEndpointActivationKey, 
ActiveMQEndpointWorker> entry : workers.entrySet()) {
+            if (entry.getKey().getMessageEndpointFactory() == 
beanContext.getContainerData()) {
+                return entry.getValue();
+            }
+        }
+        throw new IllegalStateException("No worker for " + 
beanContext.getDeploymentID());
+    }
+
+    @Override
     public BootstrapContext getBootstrapContext() {
         return this.bootstrapContext;
     }
@@ -210,4 +321,72 @@ public class ActiveMQResourceAdapter extends 
org.apache.activemq.ra.ActiveMQReso
             //Ignore
         }
     }
+
+    public static class MdbJmxControl implements DynamicMBean {
+        private static final AttributeList ATTRIBUTE_LIST = new 
AttributeList();
+        private static final MBeanInfo INFO = new MBeanInfo(
+                
"org.apache.openejb.resource.activemq.ActiveMQResourceAdapter.MdbJmxControl",
+                "Allows to control a MDB (start/stop)",
+                new MBeanAttributeInfo[0],
+                new MBeanConstructorInfo[0],
+                new MBeanOperationInfo[]{
+                        new MBeanOperationInfo("start", "Ensure the listener 
is active.", new MBeanParameterInfo[0], "void", ACTION),
+                        new MBeanOperationInfo("stop", "Ensure the listener is 
not active.", new MBeanParameterInfo[0], "void", ACTION)
+                },
+                new MBeanNotificationInfo[0]);
+
+        private final ActiveMQEndpointWorker worker;
+
+        private MdbJmxControl(final ActiveMQEndpointWorker worker) {
+            this.worker = worker;
+        }
+
+        @Override
+        public Object invoke(final String actionName, final Object[] params, 
final String[] signature) throws MBeanException, ReflectionException {
+            switch (actionName) {
+                case "stop":
+                    try {
+                        worker.stop();
+                    } catch (final InterruptedException e) {
+                        Thread.interrupted();
+                    }
+                    break;
+                case "start":
+                    try {
+                        worker.start();
+                    } catch (ResourceException e) {
+                        throw new MBeanException(new 
IllegalStateException(e.getMessage()));
+                    }
+                    break;
+                default:
+                    throw new MBeanException(new 
IllegalStateException("unsupported operation: " + actionName));
+            }
+            return null;
+        }
+
+        @Override
+        public MBeanInfo getMBeanInfo() {
+            return INFO;
+        }
+
+        @Override
+        public Object getAttribute(final String attribute) throws 
AttributeNotFoundException, MBeanException, ReflectionException {
+            throw new AttributeNotFoundException();
+        }
+
+        @Override
+        public void setAttribute(final Attribute attribute) throws 
AttributeNotFoundException, InvalidAttributeValueException, MBeanException, 
ReflectionException {
+            throw new AttributeNotFoundException();
+        }
+
+        @Override
+        public AttributeList getAttributes(final String[] attributes) {
+            return ATTRIBUTE_LIST;
+        }
+
+        @Override
+        public AttributeList setAttributes(final AttributeList attributes) {
+            return ATTRIBUTE_LIST;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tomee/blob/8dd71d44/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
 
b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
new file mode 100644
index 0000000..a574f59
--- /dev/null
+++ 
b/container/openejb-core/src/test/java/org/apache/openejb/resource/activemq/ActiveMQResourceAdapterControlTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq;
+
+import org.apache.openejb.config.EjbModule;
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.oejb3.EjbDeployment;
+import org.apache.openejb.jee.oejb3.OpenejbJar;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.monitoring.LocalMBeanServer;
+import org.apache.openejb.testing.Classes;
+import org.apache.openejb.testing.Configuration;
+import org.apache.openejb.testing.Module;
+import org.apache.openejb.testng.PropertiesBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.MessageDriven;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(ApplicationComposer.class)
+public class ActiveMQResourceAdapterControlTest {
+    @Resource(name = "ActiveMQResourceAdapterControlTest/test/ejb/Mdb")
+    private Queue queue;
+
+    @Resource
+    private ConnectionFactory connectionFactory;
+
+    @Configuration
+    public Properties config() {
+        return new PropertiesBuilder()
+                .p("ra", "new://Resource?type=ActiveMQResourceAdapter")
+                .p("ra.brokerXmlConfig", 
"broker:(vm://localhost)?useJmx=false&persistent=false")
+                .p("ra.serverUrl", "vm://localhost")
+
+                .p("mdb", "new://Container?type=MESSAGE")
+                .p("mdb.resourceAdapter", "ra")
+
+                .p("cf", "new://Resource?type=javax.jms.ConnectionFactory")
+                .p("cf.resourceAdapter", "ra")
+
+                .p("openejb.deploymentId.format", 
"{appId}/{ejbJarId}/{ejbName}")
+                .build();
+    }
+
+    @Module
+    @Classes(value = Mdb.class)
+    public EjbModule app() {
+        return new EjbModule(new EjbJar("test"), new OpenejbJar() {{
+            setId("test");
+            getEjbDeployment().add(new EjbDeployment() {{
+                setEjbName("ejb/Mdb");
+                getProperties().put("MdbActiveOnStartup", "false");
+                getProperties().put("MdbJMXControl", "default:type=test");
+            }});
+        }});
+    }
+
+    @Test
+    public void ensureControl() throws Exception {
+        assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", 
10, TimeUnit.SECONDS));
+
+        setControl("start");
+        assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES));
+        assertEquals("Will be received after", Mdb.awaiter.message);
+
+        final long start = System.currentTimeMillis();
+        assertTrue(sendAndWait("First", 1, TimeUnit.MINUTES));
+        assertEquals("First", Mdb.awaiter.message);
+        final long end = System.currentTimeMillis();
+
+        Mdb.awaiter.message = null;
+        setControl("stop");
+        // default would be wait 10s, but if machine is slow we compute it 
from the first msg stats
+        final long waitWithoutResponse = Math.max(10, 5 * (end - start) / 
1000);
+        System.out.println("We'll wait " + waitWithoutResponse + "s to get a 
message on a stopped listener");
+        assertFalse(Mdb.awaiter.message, sendAndWait("Will be received after", 
waitWithoutResponse, TimeUnit.SECONDS));
+        assertNull(Mdb.awaiter.message);
+
+        setControl("start");
+        assertTrue(sendAndWait("Second", 1, TimeUnit.MINUTES));
+        assertEquals("Will be received after", Mdb.awaiter.message);
+
+        Mdb.awaiter.message = null;
+        assertTrue(Mdb.awaiter.semaphore.tryAcquire(1, TimeUnit.MINUTES));
+        assertEquals("Second", Mdb.awaiter.message);
+    }
+
+    private void setControl(final String action) throws Exception {
+        LocalMBeanServer.get().invoke(
+                new ObjectName("default:type=test"),
+                action, new Object[0], new String[0]);
+    }
+
+    private boolean sendAndWait(final String second, final long wait, final 
TimeUnit unit) throws JMSException {
+        doSend(second);
+        try {
+            return Mdb.awaiter.semaphore.tryAcquire(wait, unit);
+        } catch (final InterruptedException e) {
+            Thread.interrupted();
+            fail();
+            return false;
+        }
+    }
+
+    private void doSend(final String txt) throws JMSException {
+        Connection c = null;
+        try {
+            c = connectionFactory.createConnection();
+            Session session = null;
+            try {
+                session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = null;
+                try {
+                    producer = session.createProducer(queue);
+                    producer.send(session.createTextMessage(txt));
+                } finally {
+                    if (producer != null) {
+                        producer.close();
+                    }
+                }
+            } finally {
+                if (session != null) {
+                    session.close();
+                }
+            }
+        } finally {
+            if (c != null) {
+                c.close();
+            }
+        }
+    }
+
+    @MessageDriven(name = "ejb/Mdb")
+    public static class Mdb implements MessageListener {
+        static final MessageAwaiter awaiter = new MessageAwaiter();
+
+        @Override
+        public synchronized void onMessage(final Message message) {
+            try {
+                awaiter.message = TextMessage.class.cast(message).getText();
+            } catch (final JMSException e) {
+                throw new IllegalStateException(e);
+            } finally {
+                awaiter.semaphore.release();
+            }
+        }
+    }
+
+    public static class MessageAwaiter {
+        private final Semaphore semaphore = new Semaphore(0);
+        private volatile String message;
+    }
+}

Reply via email to