Repository: tomee
Updated Branches:
  refs/heads/master 1f5f3b684 -> a3e90ee23


TOMEE-1936 message.getBody(String.class) basic support for MDB


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/a3e90ee2
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/a3e90ee2
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/a3e90ee2

Branch: refs/heads/master
Commit: a3e90ee238e31dadfc3b661d9e19c74927c99cf0
Parents: 1f5f3b6
Author: Romain manni-Bucau <rmannibu...@gmail.com>
Authored: Tue Sep 20 16:32:16 2016 +0200
Committer: Romain manni-Bucau <rmannibu...@gmail.com>
Committed: Tue Sep 20 16:32:16 2016 +0200

----------------------------------------------------------------------
 .../openejb/core/mdb/EndpointHandler.java       | 26 ++++++-
 .../openejb/resource/activemq/jms2/JMS2.java    |  2 +-
 .../resource/activemq/jms2/TomEEConnection.java |  2 +-
 .../apache/openejb/activemq/JMS2AMQTest.java    | 77 ++++++++++++++++----
 4 files changed, 90 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/a3e90ee2/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
index 1e6d44f..3e6dc76 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java
@@ -20,8 +20,11 @@ package org.apache.openejb.core.mdb;
 import org.apache.openejb.ApplicationException;
 import org.apache.openejb.BeanContext;
 import org.apache.openejb.SystemException;
+import org.apache.openejb.resource.activemq.jms2.DelegateMessage;
+import org.apache.openejb.resource.activemq.jms2.JMS2;
 
 import javax.ejb.EJBException;
+import javax.jms.Message;
 import javax.resource.spi.ApplicationServerInternalException;
 import javax.resource.spi.UnavailableException;
 import javax.resource.spi.endpoint.MessageEndpoint;
@@ -31,6 +34,8 @@ import java.lang.reflect.Method;
 import java.util.Arrays;
 
 public class EndpointHandler implements InvocationHandler, MessageEndpoint {
+    private volatile Boolean isAmq;
+
     private static enum State {
         /**
          * The handler has been initialized and is ready for invoation
@@ -225,7 +230,7 @@ public class EndpointHandler implements InvocationHandler, 
MessageEndpoint {
         Object value = null;
         try {
             // deliver the message
-            value = container.invoke(instance, method, null, args);
+            value = container.invoke(instance, method, null, 
wrapMessageForAmq5(args));
         } catch (final SystemException se) {
             throwable = se.getRootCause() != null ? se.getRootCause() : se;
             state = State.SYSTEM_EXCEPTION;
@@ -255,6 +260,25 @@ public class EndpointHandler implements InvocationHandler, 
MessageEndpoint {
         return value;
     }
 
+    // workaround for AMQ 5/JMS 2 support
+    private Object[] wrapMessageForAmq5(final Object[] args) {
+        if (args == null || args.length != 1 || 
DelegateMessage.class.isInstance(args[0])) {
+            return args;
+        }
+
+        if (isAmq == null) {
+            synchronized (this) {
+                if (isAmq == null) {
+                    isAmq = 
args[0].getClass().getName().startsWith("org.apache.activemq.");
+                }
+            }
+        }
+        if (isAmq) {
+            args[0] = JMS2.wrap(Message.class.cast(args[0]));
+        }
+        return args;
+    }
+
     public void afterDelivery() throws ApplicationServerInternalException, 
UnavailableException {
         // verify current state
         switch (state) {

http://git-wip-us.apache.org/repos/asf/tomee/blob/a3e90ee2/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
index 4182ff0..c1ad431 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
@@ -44,7 +44,7 @@ import javax.jms.TransactionInProgressRuntimeException;
 import javax.jms.TransactionRolledBackException;
 import javax.jms.TransactionRolledBackRuntimeException;
 
-final class JMS2 {
+public final class JMS2 {
     private JMS2() {
         // no-op
     }

http://git-wip-us.apache.org/repos/asf/tomee/blob/a3e90ee2/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
index ca0116e..77063c1 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
@@ -35,7 +35,7 @@ public class TomEEConnection extends ActiveMQConnection {
 
     @Override
     public Session createSession(final int sessionMode) throws JMSException {
-        return super.createSession(sessionMode == Session.SESSION_TRANSACTED, 
sessionMode);
+        return createSession(sessionMode == Session.SESSION_TRANSACTED, 
sessionMode);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tomee/blob/a3e90ee2/container/openejb-core/src/test/java/org/apache/openejb/activemq/JMS2AMQTest.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/test/java/org/apache/openejb/activemq/JMS2AMQTest.java
 
b/container/openejb-core/src/test/java/org/apache/openejb/activemq/JMS2AMQTest.java
index 3ef3b04..07e339b 100644
--- 
a/container/openejb-core/src/test/java/org/apache/openejb/activemq/JMS2AMQTest.java
+++ 
b/container/openejb-core/src/test/java/org/apache/openejb/activemq/JMS2AMQTest.java
@@ -65,22 +65,22 @@ public class JMS2AMQTest {
     public Properties config() {
         return new PropertiesBuilder()
 
-            .p("amq", "new://Resource?type=ActiveMQResourceAdapter")
-            .p("amq.DataSource", "")
-            .p("amq.BrokerXmlConfig", "broker:(vm://localhost)")
+                .p("amq", "new://Resource?type=ActiveMQResourceAdapter")
+                .p("amq.DataSource", "")
+                .p("amq.BrokerXmlConfig", "broker:(vm://localhost)")
 
-            .p("target", "new://Resource?type=Queue")
+                .p("target", "new://Resource?type=Queue")
 
-            .p("mdbs", "new://Container?type=MESSAGE")
-            .p("mdbs.ResourceAdapter", "amq")
+                .p("mdbs", "new://Container?type=MESSAGE")
+                .p("mdbs.ResourceAdapter", "amq")
 
-            .p("cf", "new://Resource?type=" + 
ConnectionFactory.class.getName())
-            .p("cf.ResourceAdapter", "amq")
+                .p("cf", "new://Resource?type=" + 
ConnectionFactory.class.getName())
+                .p("cf.ResourceAdapter", "amq")
 
-            .p("xaCf", "new://Resource?class-name=" + 
ActiveMQXAConnectionFactory.class.getName())
-            .p("xaCf.BrokerURL", "vm://localhost")
+                .p("xaCf", "new://Resource?class-name=" + 
ActiveMQXAConnectionFactory.class.getName())
+                .p("xaCf.BrokerURL", "vm://localhost")
 
-            .build();
+                .build();
     }
 
     @Module
@@ -289,9 +289,56 @@ public class JMS2AMQTest {
         assertNull(exception == null ? "ok" : exception.getMessage(), 
exception);
     }
 
+    @Test
+    public void receiveGetBody() throws InterruptedException {
+        final String text = TEXT + "2";
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final CountDownLatch ready = new CountDownLatch(1);
+        final CountDownLatch over = new CountDownLatch(1);
+        new Thread() {
+            @Override
+            public void run() {
+                {
+                    setName(JMS2AMQTest.class.getName() + 
".receiveGetBody#receiver");
+                }
+
+                try (final JMSContext context = cf.createContext()) {
+                    try (final JMSConsumer consumer = 
context.createConsumer(destination2)) {
+                        ready.countDown();
+                        final Message receive = 
consumer.receive(TimeUnit.MINUTES.toMillis(1));
+                        assertEquals(text, receive.getBody(String.class));
+                    }
+                } catch (final Throwable ex) {
+                    error.set(ex);
+                } finally {
+                    over.countDown();
+                }
+            }
+        }.start();
+
+        ready.await(1, TimeUnit.MINUTES);
+        sleep(150); // just to ensure we called receive already
+
+        // now send the message
+        try (final JMSContext context = cf.createContext()) {
+            context.createProducer().send(destination2, text);
+        } catch (final JMSRuntimeException ex) {
+            fail(ex.getMessage());
+        }
+
+        over.await(1, TimeUnit.MINUTES);
+
+        // ensure we got the message and no exception
+        final Throwable exception = error.get();
+        if (exception != null) {
+            exception.printStackTrace();
+        }
+        assertNull(exception == null ? "ok" : exception.getMessage(), 
exception);
+    }
+
     @MessageDriven(activationConfig = {
-        @ActivationConfigProperty(propertyName = "destinationType", 
propertyValue = "javax.jms.Queue"),
-        @ActivationConfigProperty(propertyName = "destination", propertyValue 
= "target")
+            @ActivationConfigProperty(propertyName = "destinationType", 
propertyValue = "javax.jms.Queue"),
+            @ActivationConfigProperty(propertyName = "destination", 
propertyValue = "target")
     })
     public static class Listener implements MessageListener {
         public static volatile CountDownLatch latch;
@@ -301,7 +348,9 @@ public class JMS2AMQTest {
         public void onMessage(final Message message) {
             try {
                 try {
-                    ok = TextMessage.class.isInstance(message) && 
TEXT.equals(TextMessage.class.cast(message).getText());
+                    ok = TextMessage.class.isInstance(message)
+                            && 
TEXT.equals(TextMessage.class.cast(message).getText())
+                            && TEXT.equals(message.getBody(String.class));
                 } catch (final JMSException e) {
                     // no-op
                 }

Reply via email to