Repository: tomee
Updated Branches:
  refs/heads/master 9f84727c8 -> db8ae1209


some more JMS 2 API


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/db8ae120
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/db8ae120
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/db8ae120

Branch: refs/heads/master
Commit: db8ae12098cc4e08b68848b5e38b2333cb185496
Parents: 9f84727
Author: rmannibucau <rmannibu...@apache.org>
Authored: Thu Sep 22 00:34:44 2016 +0200
Committer: rmannibucau <rmannibu...@apache.org>
Committed: Thu Sep 22 00:34:44 2016 +0200

----------------------------------------------------------------------
 .../resource/activemq/jms2/TomEEConnection.java | 16 ++++
 .../resource/activemq/jms2/TomEEProducer.java   | 92 ++++++++++++++++++++
 .../resource/activemq/jms2/TomEESession.java    | 76 ++++++++++++++++
 .../activemq/jms2/TomEEXAConnection.java        |  8 ++
 .../resource/activemq/jms2/TomEEXASession.java  | 63 ++++++++++++++
 5 files changed, 255 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/db8ae120/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
index 77063c1..bd2ba2c 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnection.java
@@ -17,6 +17,7 @@
 package org.apache.openejb.resource.activemq.jms2;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.IdGenerator;
@@ -34,6 +35,21 @@ public class TomEEConnection extends ActiveMQConnection {
     }
 
     @Override
+    public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        if (!transacted) {
+            if (acknowledgeMode == Session.SESSION_TRANSACTED) {
+                throw new JMSException("acknowledgeMode SESSION_TRANSACTED 
cannot be used for an non-transacted Session");
+            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || 
acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
+                throw new JMSException("invalid acknowledgeMode: " + 
acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
+                        "Session.CLIENT_ACKNOWLEDGE (2), 
Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or 
for transacted sessions Session.SESSION_TRANSACTED (0)");
+            }
+        }
+        return new TomEESession(this, getNextSessionId(), transacted ? 
Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), 
isAlwaysSessionAsync());
+    }
+
+    @Override
     public Session createSession(final int sessionMode) throws JMSException {
         return createSession(sessionMode == Session.SESSION_TRANSACTED, 
sessionMode);
     }

http://git-wip-us.apache.org/repos/asf/tomee/blob/db8ae120/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEProducer.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEProducer.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEProducer.java
new file mode 100644
index 0000000..b43a386
--- /dev/null
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEProducer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import org.apache.activemq.ActiveMQMessageProducer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.AsyncCallback;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerId;
+
+import javax.jms.CompletionListener;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+public class TomEEProducer extends ActiveMQMessageProducer {
+    private final ActiveMQDestination destination;
+    private long deliveryDelay;
+
+    public TomEEProducer(final ActiveMQSession session, final ProducerId 
producerId,
+                         final ActiveMQDestination destination, final int 
sendTimeout) throws JMSException {
+        super(session, producerId, destination, sendTimeout);
+        this.destination = destination;
+    }
+
+    @Override
+    public void send(final Message message, final CompletionListener 
completionListener) throws JMSException {
+        super.send(destination, message, new ProducerAsyncCallback(message, 
completionListener));
+    }
+
+    @Override
+    public void send(final Destination destination, final Message message, 
final CompletionListener completionListener) throws JMSException {
+        super.send(destination, message, new ProducerAsyncCallback(message, 
completionListener));
+    }
+
+    @Override
+    public void send(final Message message, final int deliveryMode, final int 
priority,
+                     final long timeToLive, final CompletionListener 
completionListener) throws JMSException {
+        super.send(destination, message, deliveryMode, priority, timeToLive, 
new ProducerAsyncCallback(message, completionListener));
+    }
+
+    @Override
+    public void send(final Destination destination, final Message message,
+                     final int deliveryMode, final int priority, final long 
timeToLive,
+                     final CompletionListener completionListener) throws 
JMSException {
+        super.send(destination, message, deliveryMode, priority, timeToLive, 
new ProducerAsyncCallback(message, completionListener));
+    }
+
+    @Override
+    public long getDeliveryDelay() throws JMSException {
+        return deliveryDelay;
+    }
+
+    @Override
+    public void setDeliveryDelay(final long deliveryDelay) throws JMSException 
{
+        this.deliveryDelay = deliveryDelay;
+    }
+
+    private static class ProducerAsyncCallback implements AsyncCallback {
+        private final Message message;
+        private final CompletionListener completionListener;
+
+        private ProducerAsyncCallback(final Message message, final 
CompletionListener completionListener) {
+            this.message = message;
+            this.completionListener = completionListener;
+        }
+
+        @Override
+        public void onSuccess() {
+            completionListener.onCompletion(message);
+        }
+
+        @Override
+        public void onException(final JMSException exception) {
+            completionListener.onException(message, exception);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/tomee/blob/db8ae120/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEESession.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEESession.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEESession.java
new file mode 100644
index 0000000..7072a55
--- /dev/null
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEESession.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQMessageTransformation;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.CustomDestination;
+import org.apache.activemq.command.SessionId;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Topic;
+
+// Note: not shared in the code
+public class TomEESession extends ActiveMQSession {
+    public TomEESession(final ActiveMQConnection connection, final SessionId 
sessionId,
+                        final int acknowledgeMode, final boolean 
asyncDispatch, final boolean sessionAsyncDispatch) throws JMSException {
+        super(connection, sessionId, acknowledgeMode, asyncDispatch, 
sessionAsyncDispatch);
+    }
+
+    @Override
+    public MessageProducer createProducer(final Destination destination) 
throws JMSException {
+        checkClosed();
+        if (CustomDestination.class.isInstance(destination)) {
+            return 
CustomDestination.class.cast(destination).createProducer(this);
+        }
+        return new TomEEProducer(this, getNextProducerId(), 
ActiveMQMessageTransformation.transformDestination(destination), 
connection.getSendTimeout());
+    }
+
+    @Override
+    public MessageConsumer createDurableConsumer(final Topic topic, final 
String name) throws JMSException {
+        return createDurableSubscriber(topic, name);
+    }
+
+    @Override
+    public MessageConsumer createDurableConsumer(final Topic topic, final 
String name, final String messageSelector, final boolean noLocal) throws 
JMSException {
+        return createDurableSubscriber(topic, name, messageSelector, noLocal);
+    }
+
+    @Override
+    public MessageConsumer createSharedConsumer(final Topic topic, final 
String sharedSubscriptionName) throws JMSException {
+        return createConsumer(topic);
+    }
+
+    @Override
+    public MessageConsumer createSharedConsumer(final Topic topic, final 
String sharedSubscriptionName, final String messageSelector) throws 
JMSException {
+        return createConsumer(topic, messageSelector);
+    }
+
+    @Override
+    public MessageConsumer createSharedDurableConsumer(final Topic topic, 
final String name) throws JMSException {
+        return createDurableSubscriber(topic, name);
+    }
+
+    @Override
+    public MessageConsumer createSharedDurableConsumer(final Topic topic, 
final String name, final String messageSelector) throws JMSException {
+        return createDurableSubscriber(topic, name, messageSelector, false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/tomee/blob/db8ae120/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
index 38039e3..11105b9 100644
--- 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXAConnection.java
@@ -1,6 +1,7 @@
 package org.apache.openejb.resource.activemq.jms2;
 
 import org.apache.activemq.ActiveMQXAConnection;
+import org.apache.activemq.ActiveMQXASession;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.IdGenerator;
@@ -18,6 +19,13 @@ public class TomEEXAConnection extends ActiveMQXAConnection {
     }
 
     @Override
+    public Session createSession(final boolean transacted, final int 
acknowledgeMode) throws JMSException {
+        checkClosedOrFailed();
+        ensureConnectionInfoSent();
+        return new TomEEXASession(this, getNextSessionId(), getXaAckMode() > 0 
? getXaAckMode() : Session.SESSION_TRANSACTED, isDispatchAsync());
+    }
+
+    @Override
     public Session createSession(final int sessionMode) throws JMSException {
         return super.createSession(sessionMode == Session.SESSION_TRANSACTED, 
sessionMode);
     }

http://git-wip-us.apache.org/repos/asf/tomee/blob/db8ae120/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
----------------------------------------------------------------------
diff --git 
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
new file mode 100644
index 0000000..9c811fb
--- /dev/null
+++ 
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEXASession.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.resource.activemq.jms2;
+
+import org.apache.activemq.ActiveMQXAConnection;
+import org.apache.activemq.ActiveMQXASession;
+import org.apache.activemq.command.SessionId;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+
+// Note: not shared in the code
+public class TomEEXASession extends ActiveMQXASession {
+    public TomEEXASession(final ActiveMQXAConnection connection, final 
SessionId sessionId,
+                          final int theAcknowlegeMode, final boolean 
dispatchAsync) throws JMSException {
+        super(connection, sessionId, theAcknowlegeMode, dispatchAsync);
+    }
+
+    @Override
+    public MessageConsumer createDurableConsumer(final Topic topic, final 
String name) throws JMSException {
+        return createDurableSubscriber(topic, name);
+    }
+
+    @Override
+    public MessageConsumer createDurableConsumer(final Topic topic, final 
String name, final String messageSelector, final boolean noLocal) throws 
JMSException {
+        return createDurableSubscriber(topic, name, messageSelector, noLocal);
+    }
+
+    @Override
+    public MessageConsumer createSharedConsumer(final Topic topic, final 
String sharedSubscriptionName) throws JMSException {
+        return createConsumer(topic);
+    }
+
+    @Override
+    public MessageConsumer createSharedConsumer(final Topic topic, final 
String sharedSubscriptionName, final String messageSelector) throws 
JMSException {
+        return createConsumer(topic, messageSelector);
+    }
+
+    @Override
+    public MessageConsumer createSharedDurableConsumer(final Topic topic, 
final String name) throws JMSException {
+        return createDurableSubscriber(topic, name);
+    }
+
+    @Override
+    public MessageConsumer createSharedDurableConsumer(final Topic topic, 
final String name, final String messageSelector) throws JMSException {
+        return createDurableSubscriber(topic, name, messageSelector, false);
+    }
+}

Reply via email to