This is an automated email from the ASF dual-hosted git repository.
jfisher pushed a commit to branch tomee-7.0.x
in repository https://gitbox.apache.org/repos/asf/tomee.git
The following commit(s) were added to refs/heads/tomee-7.0.x by this push:
new 3780eb6 Fix several JMS/JMS2.0 bugs
3780eb6 is described below
commit 3780eb6008ea3369ab745d0c1d7af718664323be
Author: Jonathan S. Fisher <[email protected]>
AuthorDate: Mon Aug 26 20:09:10 2019 -0500
Fix several JMS/JMS2.0 bugs
* Fix TOMEE-2229: JMSContext Injected by TomEE does not participate in JTA
* Fix TOMEE-2650: TomEE Creating non-JTA Sessions
* Fix TOMEE-2651: TomEE doesn't return JMS Connection to pool after a
Transaction Timeout
* Fix TOMEE-2652: TransactionSupport parameter not honored on JMS
Connection Factory resources
---
.../openejb/resource/AutoConnectionTracker.java | 26 ++--
.../openejb/resource/activemq/jms2/JMS2.java | 13 ++
.../resource/activemq/jms2/JMSContextImpl.java | 7 +-
.../resource/activemq/jms2/JMSProducerImpl.java | 1 +
.../activemq/jms2/TomEEConnectionFactory.java | 42 +++++--
.../activemq/jms2/TomEEManagedConnection.java | 10 +-
.../jms2/TomEEManagedConnectionFactory.java | 45 ++++++-
.../activemq/jms2/TomEEManagedConnectionProxy.java | 132 ++++++++++++++++++++-
.../activemq/jms2/TomEERAConnectionFactory.java | 108 ++++++++++++++++-
.../activemq/jms2/cdi/JMS2CDIExtension.java | 18 ++-
10 files changed, 360 insertions(+), 42 deletions(-)
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
index 49f891e..3064aec 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/AutoConnectionTracker.java
@@ -22,6 +22,7 @@ import
org.apache.geronimo.connector.outbound.ConnectionReturnAction;
import org.apache.geronimo.connector.outbound.ConnectionTrackingInterceptor;
import org.apache.geronimo.connector.outbound.ManagedConnectionInfo;
import
org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
+import org.apache.geronimo.transaction.manager.TransactionImpl;
import org.apache.openejb.dyni.DynamicSubclass;
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.util.LogCategory;
@@ -32,7 +33,6 @@ import javax.resource.ResourceException;
import javax.resource.spi.DissociatableManagedConnection;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
-import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.TransactionSynchronizationRegistry;
import java.lang.ref.PhantomReference;
@@ -58,6 +58,7 @@ public class AutoConnectionTracker implements
ConnectionTracker {
private final TransactionManager txMgr;
private final Logger logger =
Logger.getInstance(LogCategory.OPENEJB_CONNECTOR,
"org.apache.openejb.resource");
private final ConcurrentMap<ManagedConnectionInfo, ProxyPhantomReference>
references = new ConcurrentHashMap<ManagedConnectionInfo,
ProxyPhantomReference>();
+ @SuppressWarnings("rawtypes")
private final ReferenceQueue referenceQueue = new ReferenceQueue();
private final ConcurrentMap<Class<?>, Class<?>> proxies = new
ConcurrentHashMap<>();
private final ConcurrentMap<Class<?>, Class<?>[]> interfaces = new
ConcurrentHashMap<>();
@@ -80,6 +81,7 @@ public class AutoConnectionTracker implements
ConnectionTracker {
* @param connectionInfo the connection to be obtained
* @param key the unique id of the connection manager
*/
+ @Override
public void setEnvironment(final ConnectionInfo connectionInfo, final
String key) {
ProxyPhantomReference reference = (ProxyPhantomReference)
referenceQueue.poll();
while (reference != null) {
@@ -103,12 +105,14 @@ public class AutoConnectionTracker implements
ConnectionTracker {
* @param connectionInfo the connection that was obtained
* @param reassociate should always be false
*/
+ @SuppressWarnings("unchecked")
+ @Override
public void handleObtained(final ConnectionTrackingInterceptor
interceptor, final ConnectionInfo connectionInfo, final boolean reassociate)
throws ResourceException {
if (txMgr != null && registry != null) {
- Transaction currentTx = null;
+ TransactionImpl currentTx = null;
try {
- currentTx = txMgr.getTransaction();
- } catch (SystemException e) {
+ currentTx = (TransactionImpl) txMgr.getTransaction();
+ } catch (SystemException | ClassCastException e) {
//ignore
}
@@ -166,16 +170,18 @@ public class AutoConnectionTracker implements
ConnectionTracker {
* @param connectionInfo the connection that was released
* @param action ignored
*/
+ @Override
+ @SuppressWarnings("unchecked")
public void handleReleased(final ConnectionTrackingInterceptor
interceptor, final ConnectionInfo connectionInfo, final ConnectionReturnAction
action) {
- Transaction currentTx = null;
+ TransactionImpl currentTx = null;
try {
- currentTx = txMgr.getTransaction();
- } catch (SystemException e) {
+ currentTx = (TransactionImpl) txMgr.getTransaction();
+ } catch (SystemException | ClassCastException e) {
//ignore
}
if (currentTx != null) {
- Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>
txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>)
registry.getResource(KEY);
+ Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>
txConnections = (Map<ManagedConnectionInfo, Map<ConnectionInfo, Object>>)
currentTx.getResource(KEY);
if (txConnections == null) {
txConnections = new HashMap<ManagedConnectionInfo,
Map<ConnectionInfo, Object>>();
registry.putResource(KEY, txConnections);
@@ -193,6 +199,7 @@ public class AutoConnectionTracker implements
ConnectionTracker {
}
}
+ @SuppressWarnings("rawtypes")
final PhantomReference phantomReference =
references.remove(connectionInfo.getManagedConnectionInfo());
if (phantomReference != null) {
phantomReference.clear();
@@ -292,6 +299,7 @@ public class AutoConnectionTracker implements
ConnectionTracker {
this.handle = handle;
}
+ @Override
public Object invoke(final Object object, final Method method, final
Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
if (method.getName().equals("finalize")) {
@@ -330,7 +338,7 @@ public class AutoConnectionTracker implements
ConnectionTracker {
public ProxyPhantomReference(final ConnectionTrackingInterceptor
interceptor,
final ManagedConnectionInfo
managedConnectionInfo,
final ConnectionInvocationHandler handler,
- final ReferenceQueue referenceQueue) {
+ @SuppressWarnings("rawtypes") final
ReferenceQueue referenceQueue) {
super(handler, referenceQueue);
this.interceptor = interceptor;
this.managedConnectionInfo = managedConnectionInfo;
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
index 1f7e78a..29c0ebf 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMS2.java
@@ -43,6 +43,9 @@ import javax.jms.TransactionInProgressException;
import javax.jms.TransactionInProgressRuntimeException;
import javax.jms.TransactionRolledBackException;
import javax.jms.TransactionRolledBackRuntimeException;
+import javax.transaction.SystemException;
+
+import org.apache.openejb.OpenEJB;
public final class JMS2 {
private JMS2() {
@@ -83,6 +86,7 @@ public final class JMS2 {
return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
+ @SuppressWarnings("unchecked")
public static <T extends Message> T wrap(final T message10) {
if (message10 == null) {
return null;
@@ -112,4 +116,13 @@ public final class JMS2 {
}
return (T) new DelegateMessage(message10);
}
+
+
+ public static boolean inTx() {
+ try {
+ return OpenEJB.getTransactionManager().getTransaction() != null;
+ } catch (SystemException | NullPointerException e) {
+ return false;
+ }
+ }
}
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
index db975be..872b1c9 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSContextImpl.java
@@ -42,6 +42,7 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.XAConnection;
+
import java.io.Serializable;
import static
org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
@@ -80,7 +81,6 @@ public class JMSContextImpl implements JMSContext {
if (connection == null) {
try {
connection = username != null ?
factory.createConnection(username, password) : factory.createConnection();
- xa = XAConnection.class.isInstance(connection);
} catch (final JMSException e) {
throw toRuntimeException(e);
}
@@ -96,10 +96,11 @@ public class JMSContextImpl implements JMSContext {
}
if (session == null) {
try {
+ Connection connection = connection();
if (xa) {
- session =
XAConnection.class.cast(connection()).createXASession();
+ session =
XAConnection.class.cast(connection).createXASession();
} else {
- session = connection().createSession(sessionMode);
+ session = connection.createSession(sessionMode);
}
} catch (final JMSException e) {
throw toRuntimeException(e);
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
index 9969639..eabfad8 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/JMSProducerImpl.java
@@ -40,6 +40,7 @@ import java.util.Set;
import static
org.apache.openejb.resource.activemq.jms2.JMS2.toRuntimeException;
import static org.apache.openejb.resource.activemq.jms2.JMS2.wrap;
+@SuppressWarnings("deprecation")
class JMSProducerImpl implements JMSProducer {
private final JMSContextImpl context;
private final MessageProducer producer;
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
index 7bb6a52..9af3d0d 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEConnectionFactory.java
@@ -17,35 +17,63 @@
package org.apache.openejb.resource.activemq.jms2;
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.ActiveMQXASslConnectionFactory;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.transport.Transport;
import javax.jms.JMSContext;
-public class TomEEConnectionFactory extends ActiveMQSslConnectionFactory {
+public class TomEEConnectionFactory extends ActiveMQXASslConnectionFactory {
@Override
protected ActiveMQConnection createActiveMQConnection(final Transport
transport, final JMSStatsImpl stats) throws Exception {
- return new TomEEConnection(transport, getClientIdGenerator(),
getConnectionIdGenerator(), stats);
+ return new TomEEXAConnection(transport, getClientIdGenerator(),
getConnectionIdGenerator(), stats);
}
@Override
public JMSContext createContext() {
- return new JMSContextImpl(this, -1, null, null, false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ }
+ return new JMSContextImpl(this, mode, null, null, inTx);
}
@Override
public JMSContext createContext(final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, null, null, false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = sessionMode;
+ }
+ return new JMSContextImpl(this, mode, null, null, inTx);
}
@Override
public JMSContext createContext(final String userName, final String
password) {
- return new JMSContextImpl(this, -1, userName, password, false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ }
+ return new JMSContextImpl(this, mode, userName, password, inTx);
}
@Override
public JMSContext createContext(final String userName, final String
password, final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, userName, password,
false);
+ boolean inTx = JMS2.inTx();
+ int mode;
+ if (inTx) {
+ mode = -1;
+ } else {
+ mode = sessionMode;
+ }
+ return new JMSContextImpl(this, mode, userName, password, inTx);
}
}
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
index 4d7bb29..fe8e2b7 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnection.java
@@ -23,12 +23,14 @@ import org.apache.activemq.ra.ManagedConnectionProxy;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
import javax.security.auth.Subject;
import java.lang.reflect.Field;
import java.util.Collection;
public class TomEEManagedConnection extends ActiveMQManagedConnection {
private static final Field PROXY_CONNECTIONS_FIELD;
+ private TransactionSupportLevel transactionSupportLevel;
static {
try {
@@ -41,14 +43,16 @@ public class TomEEManagedConnection extends
ActiveMQManagedConnection {
private final Collection<ManagedConnectionProxy> proxyConnections;
+ @SuppressWarnings("unchecked")
public TomEEManagedConnection(final Subject subject, final
ActiveMQConnection physicalConnection,
- final ActiveMQConnectionRequestInfo info)
throws ResourceException {
+ final ActiveMQConnectionRequestInfo info,
TransactionSupportLevel transactionSupportLevel) throws ResourceException {
super(subject, physicalConnection, info);
try {
proxyConnections =
Collection.class.cast(PROXY_CONNECTIONS_FIELD.get(this));
} catch (final IllegalAccessException e) {
throw new IllegalStateException("Incompatible AMQ", e);
}
+ this.transactionSupportLevel = transactionSupportLevel;
}
@Override
@@ -57,4 +61,8 @@ public class TomEEManagedConnection extends
ActiveMQManagedConnection {
proxyConnections.add(proxy);
return proxy;
}
+
+ public TransactionSupportLevel getTransactionSupportLevel() {
+ return transactionSupportLevel;
+ }
}
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
index 44ea157..22c44d0 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionFactory.java
@@ -22,17 +22,25 @@ import
org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
import org.apache.activemq.ra.MessageActivationSpec;
import org.apache.activemq.ra.SimpleConnectionManager;
+import java.util.Locale;
+
import javax.jms.JMSException;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
import javax.security.auth.Subject;
public class TomEEManagedConnectionFactory extends
ActiveMQManagedConnectionFactory {
+ private static final long serialVersionUID = 1L;
+ private TransactionSupportLevel transactionSupportLevel;
+
@Override
public Object createConnectionFactory(final ConnectionManager manager)
throws ResourceException {
- return new TomEERAConnectionFactory(this, manager, getInfo());
+ TomEERAConnectionFactory factory = new TomEERAConnectionFactory(this,
manager, getInfo());
+ factory.setTransactionSupport(transactionSupportLevel);
+ return factory;
}
@Override
@@ -56,7 +64,7 @@ public class TomEEManagedConnectionFactory extends
ActiveMQManagedConnectionFact
amqInfo = getInfo();
}
try {
- return new TomEEManagedConnection(subject,
makeConnection(amqInfo), amqInfo);
+ return new TomEEManagedConnection(subject,
makeConnection(amqInfo), amqInfo, transactionSupportLevel);
} catch (final JMSException e) {
throw new ResourceException("Could not create connection.", e);
}
@@ -67,4 +75,37 @@ public class TomEEManagedConnectionFactory extends
ActiveMQManagedConnectionFact
return !(object == null || !getClass().isInstance(object))
&& ((ActiveMQManagedConnectionFactory)
object).getInfo().equals(getInfo());
}
+
+ public String getTransactionSupport() {
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ return "xa";
+ case LocalTransaction:
+ return "local";
+ case NoTransaction:
+ return "none";
+ default:
+ return null;
+ }
+ }
+
+ public void setTransactionSupport(String transactionSupport) {
+ if (transactionSupport == null) {
+ throw new IllegalArgumentException("transactionSupport cannot be
not null");
+ } else {
+ switch (transactionSupport.toLowerCase(Locale.ENGLISH)) {
+ case "xa":
+ transactionSupportLevel =
TransactionSupportLevel.XATransaction;
+ break;
+ case "local":
+ transactionSupportLevel =
TransactionSupportLevel.LocalTransaction;
+ break;
+ case "none":
+ transactionSupportLevel =
TransactionSupportLevel.NoTransaction;
+ break;
+ default:
+ throw new IllegalArgumentException("transactionSupport
must be xa, local, or none:" + transactionSupport);
+ }
+ }
+ }
}
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
index f07b7a2..aefe4f9 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEEManagedConnectionProxy.java
@@ -18,21 +18,28 @@ package org.apache.openejb.resource.activemq.jms2;
import org.apache.activemq.ra.ActiveMQManagedConnection;
import org.apache.activemq.ra.ManagedConnectionProxy;
+import org.apache.openejb.OpenEJB;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
+import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
import javax.resource.spi.ConnectionRequestInfo;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
public class TomEEManagedConnectionProxy extends ManagedConnectionProxy
// cause
org.apache.openejb.resource.AutoConnectionTracker.proxyConnection() just uses
getInterfaces()
- implements Connection, QueueConnection, TopicConnection, ExceptionListener
{
+ implements Connection, QueueConnection, TopicConnection,
ExceptionListener, XAConnection {
private volatile ActiveMQManagedConnection connection;
@@ -48,13 +55,119 @@ public class TomEEManagedConnectionProxy extends
ManagedConnectionProxy
}
@Override
- public Session createSession(final int sessionMode) throws JMSException {
- return connection.getPhysicalConnection().createSession(sessionMode);
+ public Session createSession(final int acknowledgeMode) throws
JMSException {
+ // For the next three methods, we ignore the requested session mode
per the
+ // spec:
+ //
https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html#createSession-int-
+ //
+ // But we also allow the user to override this behavior. If they set
+ // transactionSupport on the connection factory
+ // we will not return to them a xa session, even though the underlying
physical
+ // connection may support XA.
+
+ int mode;
+ boolean xa;
+ TransactionSupportLevel transactionSupportLevel;
+ if (connection instanceof TomEEManagedConnection) {
+ transactionSupportLevel = ((TomEEManagedConnection)
connection).getTransactionSupportLevel();
+ } else {
+ transactionSupportLevel = TransactionSupportLevel.XATransaction;
+ }
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = acknowledgeMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode
not supported:" + transactionSupportLevel);
+ }
+ if (xa) {
+ return createXASession();
+ } else {
+ return connection.getPhysicalConnection().createSession(mode);
+ }
+ }
+
+ @Override
+ public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException {
+ int mode;
+ boolean xa;
+ TransactionSupportLevel transactionSupportLevel;
+ if (connection instanceof TomEEManagedConnection) {
+ transactionSupportLevel = ((TomEEManagedConnection)
connection).getTransactionSupportLevel();
+ } else if (!transacted) {
+ transactionSupportLevel = TransactionSupportLevel.NoTransaction;
+ } else {
+ transactionSupportLevel = TransactionSupportLevel.XATransaction;
+ }
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = acknowledgeMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode
not supported:" + transactionSupportLevel);
+ }
+ if (xa) {
+ return createXASession();
+ } else {
+ return connection.getPhysicalConnection().createSession(mode);
+ }
}
@Override
public Session createSession() throws JMSException {
- return connection.getPhysicalConnection().createSession();
+ int mode;
+ boolean xa;
+ TransactionSupportLevel transactionSupportLevel;
+ if (connection instanceof TomEEManagedConnection) {
+ transactionSupportLevel = ((TomEEManagedConnection)
connection).getTransactionSupportLevel();
+ } else {
+ transactionSupportLevel = TransactionSupportLevel.XATransaction;
+ }
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode
not supported:" + transactionSupportLevel);
+ }
+ if (xa) {
+ return createXASession();
+ } else {
+ return connection.getPhysicalConnection().createSession(mode);
+ }
}
@Override
@@ -69,4 +182,15 @@ public class TomEEManagedConnectionProxy extends
ManagedConnectionProxy
final
ServerSessionPool sessionPool, final int maxMessages) throws JMSException {
return
connection.getPhysicalConnection().createSharedConnectionConsumer(topic,
subscriptionName, messageSelector, sessionPool, maxMessages);
}
+
+ @Override
+ public XASession createXASession() throws JMSException {
+ XASession session = ((XAConnection)
connection.getPhysicalConnection()).createXASession();
+ try {
+
OpenEJB.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
+ } catch (IllegalStateException | SystemException | RollbackException
e) {
+ throw new RuntimeException(e);
+ }
+ return session;
+ }
}
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
index 75f3582..1a76f9f 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/TomEERAConnectionFactory.java
@@ -16,15 +16,18 @@
*/
package org.apache.openejb.resource.activemq.jms2;
+import javax.resource.spi.TransactionSupport.TransactionSupportLevel;
import org.apache.activemq.ra.ActiveMQConnectionFactory;
import org.apache.activemq.ra.ActiveMQConnectionRequestInfo;
import org.apache.activemq.ra.ActiveMQManagedConnectionFactory;
import javax.jms.JMSContext;
-import javax.jms.Session;
import javax.resource.spi.ConnectionManager;
public class TomEERAConnectionFactory extends ActiveMQConnectionFactory {
+ private static final long serialVersionUID = 1L;
+ private TransactionSupportLevel transactionSupportLevel =
TransactionSupportLevel.XATransaction;
+
public TomEERAConnectionFactory(final ActiveMQManagedConnectionFactory
factory, final ConnectionManager manager,
final ActiveMQConnectionRequestInfo
connectionRequestInfo) {
super(factory, manager, connectionRequestInfo);
@@ -32,21 +35,116 @@ public class TomEERAConnectionFactory extends
ActiveMQConnectionFactory {
@Override
public JMSContext createContext() {
- return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, null, null,
false);
+ // See notes here. We _do_ allow the user to override session mode at
the
+ // connectionFactory level, otherwise we follow the spec.
+ //
https://docs.oracle.com/javaee/7/api/javax/jms/ConnectionFactory.html#createContext-int-
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode
not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, null, null, xa);
}
@Override
public JMSContext createContext(final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, null, null, false);
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = sessionMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode
not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, null, null, xa);
}
@Override
public JMSContext createContext(final String userName, final String
password) {
- return new JMSContextImpl(this, Session.AUTO_ACKNOWLEDGE, userName,
password, false);
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = JMSContext.AUTO_ACKNOWLEDGE;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode
not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, userName, password, xa);
}
@Override
public JMSContext createContext(final String userName, final String
password, final int sessionMode) {
- return new JMSContextImpl(this, sessionMode, userName, password,
false);
+ int mode;
+ boolean xa;
+ switch (transactionSupportLevel) {
+ case XATransaction:
+ if (JMS2.inTx()) {
+ mode = -1;
+ xa = true;
+ break;
+ }
+ case NoTransaction:
+ mode = sessionMode;
+ xa = false;
+ break;
+ case LocalTransaction:
+ mode = JMSContext.SESSION_TRANSACTED;
+ xa = false;
+ break;
+ default:
+ throw new IllegalStateException("transactionSupportLevel mode
not supported:" + transactionSupportLevel);
+ }
+ return new JMSContextImpl(this, mode, userName, password, xa);
+ }
+
+ public TransactionSupportLevel getTransactionSupport() {
+ return transactionSupportLevel;
+ }
+
+ public void setTransactionSupport(TransactionSupportLevel
transactionSupportLevel) {
+ if (transactionSupportLevel == null) {
+ throw new IllegalArgumentException("transactionSupportLevel cannot
be null");
+ } else {
+ this.transactionSupportLevel = transactionSupportLevel;
+ }
}
}
diff --git
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
index 98ef326..fef6048 100644
---
a/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
+++
b/container/openejb-core/src/main/java/org/apache/openejb/resource/activemq/jms2/cdi/JMS2CDIExtension.java
@@ -16,10 +16,10 @@
*/
package org.apache.openejb.resource.activemq.jms2.cdi;
-import org.apache.openejb.OpenEJB;
import org.apache.openejb.assembler.classic.OpenEjbConfiguration;
import org.apache.openejb.assembler.classic.ResourceInfo;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.resource.activemq.jms2.JMS2;
import org.apache.openejb.spi.ContainerSystem;
import javax.annotation.PreDestroy;
@@ -56,7 +56,6 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.NamingException;
-import javax.transaction.SystemException;
import javax.transaction.TransactionScoped;
import java.io.Serializable;
import java.util.Map;
@@ -139,6 +138,7 @@ public class JMS2CDIExtension implements Extension {
}
public abstract static class AutoContextDestruction implements
Serializable {
+ private static final long serialVersionUID = 1L;
private transient Map<Key, JMSContext> contexts = new
ConcurrentHashMap<>();
public void push(final Key key, final JMSContext c) {
@@ -169,13 +169,16 @@ public class JMS2CDIExtension implements Extension {
@RequestScoped
public static class RequestAutoContextDestruction extends
AutoContextDestruction {
+ private static final long serialVersionUID = 1L;
}
@TransactionScoped
public static class TransactionAutoContextDestruction extends
AutoContextDestruction {
+ private static final long serialVersionUID = 1L;
}
public static class Key implements Serializable {
+ private static final long serialVersionUID = 1L;
private volatile ConnectionFactory connectionFactoryInstance;
private final String connectionFactory;
private final String username;
@@ -249,6 +252,7 @@ public class JMS2CDIExtension implements Extension {
}
public static class InternalJMSContext implements JMSContext, Serializable
{
+ private static final long serialVersionUID = 1L;
private final Key key;
private final RequestAutoContextDestruction requestStorage;
private final TransactionAutoContextDestruction transactionStorage;
@@ -260,7 +264,7 @@ public class JMS2CDIExtension implements Extension {
}
private synchronized JMSContext context() {
- if (inTx()) {
+ if (JMS2.inTx()) {
return findOrCreateContext(transactionStorage);
}
return findOrCreateContext(requestStorage);
@@ -275,14 +279,6 @@ public class JMS2CDIExtension implements Extension {
return jmsContext;
}
- private boolean inTx() {
- try {
- return OpenEJB.getTransactionManager().getTransaction() !=
null;
- } catch (SystemException e) {
- return false;
- }
- }
-
// plain delegation now
@Override