Author: timothyjward
Date: Wed Apr 13 16:31:16 2016
New Revision: 1738968

URL: http://svn.apache.org/viewvc?rev=1738968&view=rev
Log:
[tx-control] Actually enlist the XA connection

Removed:
    
aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/XAEnabledTxContextBindingConnection.java
Modified:
    
aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java

Modified: 
aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
URL: 
http://svn.apache.org/viewvc/aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java?rev=1738968&r1=1738967&r2=1738968&view=diff
==============================================================================
--- 
aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
 (original)
+++ 
aries/trunk/tx-control/tx-control-provider-jpa-xa/src/main/java/org/apache/aries/tx/control/jpa/xa/impl/JPAEntityManagerProviderFactoryImpl.java
 Wed Apr 13 16:31:16 2016
@@ -2,6 +2,7 @@ package org.apache.aries.tx.control.jpa.
 
 import static java.util.Optional.ofNullable;
 import static javax.persistence.spi.PersistenceUnitTransactionType.JTA;
+import static 
org.osgi.service.transaction.control.TransactionStatus.NO_TRANSACTION;
 
 import java.io.PrintWriter;
 import java.sql.Connection;
@@ -9,13 +10,20 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.logging.Logger;
 
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.spi.PersistenceUnitTransactionType;
 import javax.sql.DataSource;
+import javax.transaction.xa.XAResource;
 
+import org.apache.aries.tx.control.jdbc.common.impl.ScopedConnectionWrapper;
+import org.apache.aries.tx.control.jdbc.common.impl.TxConnectionWrapper;
+import org.apache.aries.tx.control.jdbc.xa.connection.impl.XAConnectionWrapper;
 import org.osgi.service.jpa.EntityManagerFactoryBuilder;
+import org.osgi.service.transaction.control.TransactionContext;
 import org.osgi.service.transaction.control.TransactionControl;
 import org.osgi.service.transaction.control.TransactionException;
 import org.osgi.service.transaction.control.jpa.JPAEntityManagerProvider;
@@ -26,16 +34,15 @@ public class JPAEntityManagerProviderFac
        @Override
        public JPAEntityManagerProvider 
getProviderFor(EntityManagerFactoryBuilder emfb, Map<String, Object> 
jpaProperties,
                        Map<String, Object> resourceProviderProperties) {
-               if(checkEnlistment(resourceProviderProperties)) {
-                       return new DelayedJPAEntityManagerProvider(tx -> {
-                               
-                               Map<String, Object> toUse = 
enlistDataSource(tx, jpaProperties);
-                               
-                               return internalBuilderCreate(emfb, toUse);
+               return new DelayedJPAEntityManagerProvider(tx -> {
+                               Map<String, Object> toUse;
+                               if(checkEnlistment(resourceProviderProperties)) 
{
+                                       toUse = enlistDataSource(tx, 
jpaProperties);
+                               } else {
+                                       toUse = jpaProperties;
+                               }
+                               return tx.notSupported(() -> 
internalBuilderCreate(emfb, toUse));
                        });
-               }
-               
-               return internalBuilderCreate(emfb, jpaProperties);
        }
 
        private Map<String, Object> enlistDataSource(TransactionControl tx, 
Map<String, Object> jpaProperties) {
@@ -47,7 +54,7 @@ public class JPAEntityManagerProviderFac
                        toReturn.put("javax.persistence.jtaDataSource", ds);
                }
                
-               toReturn.put("javax.persistence.jtaDataSource", new 
EnlistingDataSource(ds));
+               toReturn.put("javax.persistence.jtaDataSource", new 
EnlistingDataSource(tx, ds));
                
                return toReturn;
        }
@@ -115,9 +122,14 @@ public class JPAEntityManagerProviderFac
        
        public class EnlistingDataSource implements DataSource {
                
+               private final TransactionControl txControl;
+               
                private final DataSource delegate;
 
-               public EnlistingDataSource(DataSource delegate) {
+               private final UUID resourceId = UUID.randomUUID();
+               
+               public EnlistingDataSource(TransactionControl txControl, 
DataSource delegate) {
+                       this.txControl = txControl;
                        this.delegate = delegate;
                }
 
@@ -138,7 +150,7 @@ public class JPAEntityManagerProviderFac
                }
 
                public Connection getConnection() throws SQLException {
-                       return delegate.getConnection();
+                       return enlistedConnection(() -> 
delegate.getConnection());
                }
 
                public void setLoginTimeout(int seconds) throws SQLException {
@@ -146,7 +158,7 @@ public class JPAEntityManagerProviderFac
                }
 
                public Connection getConnection(String username, String 
password) throws SQLException {
-                       return delegate.getConnection(username, password);
+                       return enlistedConnection(() -> 
delegate.getConnection(username, password));
                }
 
                public int getLoginTimeout() throws SQLException {
@@ -156,5 +168,63 @@ public class JPAEntityManagerProviderFac
                public Logger getParentLogger() throws 
SQLFeatureNotSupportedException {
                        return delegate.getParentLogger();
                }
+               
+               private Connection enlistedConnection(Callable<Connection> 
supplier) {
+                       TransactionContext txContext = 
txControl.getCurrentContext();
+
+                       if (txContext == null) {
+                               throw new TransactionException("The resource " 
+ resourceId
+                                               + " cannot be accessed outside 
of an active Transaction Context");
+                       }
+
+                       Connection existing = (Connection) 
txContext.getScopedValue(resourceId);
+
+                       if (existing != null) {
+                               return existing;
+                       }
+
+                       Connection toReturn;
+                       Connection toClose;
+
+                       try {
+                               toClose = supplier.call();
+                               if (txContext.getTransactionStatus() == 
NO_TRANSACTION) {
+                                       toReturn = new 
ScopedConnectionWrapper(toClose);
+                               } else if (txContext.supportsXA()) {
+                                       toReturn = new 
TxConnectionWrapper(toClose);
+                                       
txContext.registerXAResource(getXAResource(toClose));
+                               } else {
+                                       throw new TransactionException(
+                                                       "There is a transaction 
active, but it does not support XA participants");
+                               }
+                       } catch (Exception sqle) {
+                               throw new TransactionException(
+                                               "There was a problem getting 
hold of a database connection",
+                                               sqle);
+                       }
+
+                       
+                       txContext.postCompletion(x -> {
+                                       try {
+                                               toClose.close();
+                                       } catch (SQLException sqle) {
+                                               // TODO log this
+                                       }
+                               });
+                       
+                       txContext.putScopedValue(resourceId, toReturn);
+                       
+                       return toReturn;
+               }
+               
+               private XAResource getXAResource(Connection conn) throws 
SQLException {
+                       if(conn instanceof XAConnectionWrapper) {
+                               return 
((XAConnectionWrapper)conn).getXaResource();
+                       } else if(conn.isWrapperFor(XAConnectionWrapper.class)){
+                               return 
conn.unwrap(XAConnectionWrapper.class).getXaResource();
+                       } else {
+                               throw new IllegalArgumentException("The 
XAResource for the connection cannot be found");
+                       }
+               }
        }
 }


Reply via email to