TOMEE-1900 better xa pooling handling
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/b4bd095d Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/b4bd095d Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/b4bd095d Branch: refs/heads/tomee-1.7.x Commit: b4bd095d71dce43eeae756ec56f7a037c2791eff Parents: 280808f Author: Romain manni-Bucau <[email protected]> Authored: Thu Aug 11 13:59:59 2016 +0200 Committer: Jonathan Gallimore <[email protected]> Committed: Thu Aug 11 13:59:17 2016 +0100 ---------------------------------------------------------------------- .../jdbc/managed/local/ManagedConnection.java | 55 +++++---- .../tomee/jdbc/TomcatXADataSourceTest.java | 113 +++++++++++++++++++ 2 files changed, 145 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/b4bd095d/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java index 5b03dd5..c73250a 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/ManagedConnection.java @@ -101,9 +101,12 @@ public class ManagedConnection implements InvocationHandler { return null; } - closeConnection(xaConnection, delegate); + closeConnection(true); return null; } + if ("isClosed".equals(mtdName) && closed) { + return true; + } if (delegate == null) { newConnection(); } @@ -115,27 +118,29 @@ public class ManagedConnection implements InvocationHandler { if (!currentTransaction.equals(transaction)) { throw new SQLException("Connection can not be used while enlisted in another transaction"); } - return invokeUnderTransaction(delegate, method, args); + return invokeUnderTransaction(method, args); } // get the already bound connection to the current transaction or enlist this one in the tx - if (isUnderTransaction(transaction.getStatus())) { + final int transactionStatus = transaction.getStatus(); + if (isUnderTransaction(transactionStatus)) { Connection connection = Connection.class.cast(registry.getResource(key)); if (connection == null && delegate == null) { newConnection(); - connection = delegate; - registry.putResource(key, delegate); currentTransaction = transaction; try { - transaction.enlistResource(getXAResource()); + if (!transaction.enlistResource(getXAResource())) { + throw new SQLException("Unable to enlist connection in transaction: enlistResource returns 'false'."); + } } catch (final RollbackException ignored) { // no-op } catch (final SystemException e) { throw new SQLException("Unable to enlist connection the transaction", e); } - transaction.registerSynchronization(new ClosingSynchronization(xaConnection, delegate)); + registry.putResource(key, delegate); + transaction.registerSynchronization(new ClosingSynchronization()); try { setAutoCommit(false); @@ -152,7 +157,14 @@ public class ManagedConnection implements InvocationHandler { delegate = connection; } - return invokeUnderTransaction(connection, method, args); + return invokeUnderTransaction(method, args); + } + + if ("isClosed".equals(mtdName) && closed) { + return true; + } + if ("close".equals(mtdName)) { // let it be handled by the ClosingSynchronisation since we have a tx there + return close(); } // we shouldn't come here, tempted to just throw an exception @@ -171,8 +183,8 @@ public class ManagedConnection implements InvocationHandler { (key.user == null ? XADataSource.class.cast(key.ds).getXAConnection() : XADataSource.class.cast(key.ds).getXAConnection(key.user, key.pwd)); if (XAConnection.class.isInstance(connection)) { xaConnection = XAConnection.class.cast(connection); - delegate = xaConnection.getConnection(); xaResource = xaConnection.getXAResource(); + delegate = xaConnection.getConnection(); } else { delegate = Connection.class.cast(connection); xaResource = new LocalXAResource(delegate); @@ -194,7 +206,7 @@ public class ManagedConnection implements InvocationHandler { } } - private Object invokeUnderTransaction(final Connection delegate, final Method method, final Object[] args) throws Exception { + private Object invokeUnderTransaction(final Method method, final Object[] args) throws Exception { final String mtdName = method.getName(); if ("setAutoCommit".equals(mtdName) || "commit".equals(mtdName) @@ -227,15 +239,7 @@ public class ManagedConnection implements InvocationHandler { return new SQLException("can't call " + mtdName + " when the connection is JtaManaged"); } - private static class ClosingSynchronization implements Synchronization { - private final XAConnection xaConnection; - private final Connection connection; - - public ClosingSynchronization(final XAConnection xaConnection, final Connection delegate) { - this.xaConnection = xaConnection; - this.connection = delegate; - } - + private class ClosingSynchronization implements Synchronization { @Override public void beforeCompletion() { // no-op @@ -243,19 +247,24 @@ public class ManagedConnection implements InvocationHandler { @Override public void afterCompletion(final int status) { - closeConnection(xaConnection, connection); + closeConnection(true); } } - private static void closeConnection(final XAConnection xaConnection, final Connection connection) { + private void closeConnection(final boolean force) { + if (!force && closed) { + return; + } try { if (xaConnection != null) { // handles the underlying connection xaConnection.close(); - } else if (connection != null && !connection.isClosed()) { - connection.close(); + } else if (delegate != null && !delegate.isClosed()) { + delegate.close(); } } catch (final SQLException e) { // no-op + } finally { + close(); // set the flag } } http://git-wip-us.apache.org/repos/asf/tomee/blob/b4bd095d/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java ---------------------------------------------------------------------- diff --git a/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java b/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java index 96739d0..56a95ce 100644 --- a/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java +++ b/tomee/tomee-jdbc/src/test/java/org/apache/tomee/jdbc/TomcatXADataSourceTest.java @@ -16,9 +16,11 @@ */ package org.apache.tomee.jdbc; +import org.apache.openejb.OpenEJB; import org.apache.openejb.jee.EjbJar; import org.apache.openejb.junit.ApplicationComposer; import org.apache.openejb.resource.jdbc.managed.local.ManagedDataSource; +import org.apache.openejb.testing.Classes; import org.apache.openejb.testing.Configuration; import org.apache.openejb.testing.Module; import org.apache.openejb.testng.PropertiesBuilder; @@ -28,24 +30,34 @@ import org.junit.Test; import org.junit.runner.RunWith; import javax.annotation.Resource; +import javax.ejb.EJB; +import javax.ejb.Singleton; import javax.sql.DataSource; +import javax.transaction.Synchronization; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @RunWith(ApplicationComposer.class) public class TomcatXADataSourceTest { @Resource(name = "xadb") private DataSource ds; + @EJB + private TxP tx; + @Module + @Classes(TxP.class) public EjbJar mandatory() { return new EjbJar(); } @@ -115,5 +127,106 @@ public class TomcatXADataSourceTest { assertEquals(0, tds.getActive()); assertEquals(25, tds.getIdle()); } + // in tx - closing in tx + for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time + for (int i = 0; i < 25; i++) { + tx.run(new Runnable() { + @Override + public void run() { + try { + Connection c = null; + for (int i = 0; i < 25; i++) { + final Connection connection = ds.getConnection(); + connection.getMetaData(); // trigger connection retrieving otherwise nothing is done (pool is not used) + if (c != null) { + assertEquals(c.unwrap(Connection.class), connection.unwrap(Connection.class)); + } else { + c = connection; + } + } + c.close(); // ensure we handle properly eager close invocations + } catch (final SQLException sql) { + fail(sql.getMessage()); + } + } + }); + } + assertEquals(0, tds.getActive()); + assertEquals(25, tds.getIdle()); + } + + + // in tx - closing after tx + for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time + for (int i = 0; i < 25; i++) { + final AtomicReference<Connection> ref = new AtomicReference<Connection>(); + tx.run(new Runnable() { + @Override + public void run() { + try { + Connection c = null; + for (int i = 0; i < 25; i++) { + final Connection connection = ds.getConnection(); + connection.getMetaData(); // trigger connection retrieving otherwise nothing is done (pool is not used) + if (c != null) { + assertEquals(c.unwrap(Connection.class), connection.unwrap(Connection.class)); + } else { + c = connection; + ref.set(c); + } + } + } catch (final SQLException sql) { + fail(sql.getMessage()); + } + } + }); + assertTrue(ref.get().isClosed()); // closed with tx + ref.get().close(); + assertTrue(ref.get().isClosed()); + } + assertEquals(0, tds.getActive()); + assertEquals(25, tds.getIdle()); + } + + // in tx - closing in commit + for (int it = 0; it < 5; it++) { // ensures it always works and not only the first time + for (int i = 0; i < 25; i++) { + tx.run(new Runnable() { + @Override + public void run() { + try { + final Connection ref = ds.getConnection(); + ref.getMetaData(); + OpenEJB.getTransactionManager().getTransaction().registerSynchronization(new Synchronization() { + @Override + public void beforeCompletion() { + // no-op + } + + @Override + public void afterCompletion(final int status) { // JPA does it + try { + ref.close(); + } catch (final SQLException e) { + fail(e.getMessage()); + } + } + }); + } catch (final Exception sql) { + fail(sql.getMessage()); + } + } + }); + } + assertEquals(0, tds.getActive()); + assertEquals(25, tds.getIdle()); + } + } + + @Singleton + public static class TxP { + public void run(final Runnable r) { + r.run(); + } } }
