Author: rmannibucau
Date: Tue Jul 24 22:04:16 2012
New Revision: 1365321

URL: http://svn.apache.org/viewvc?rev=1365321&view=rev
Log:
using java.util.concurrent to lock on xaresource and adding some tests (ut and 
// threads)

Added:
    
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
    
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
Modified:
    
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java

Modified: 
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java
URL: 
http://svn.apache.org/viewvc/openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java?rev=1365321&r1=1365320&r2=1365321&view=diff
==============================================================================
--- 
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java
 (original)
+++ 
openejb/branches/openejb-pool/container/openejb-core/src/main/java/org/apache/openejb/resource/jdbc/managed/local/LocalXAResource.java
 Tue Jul 24 22:04:16 2012
@@ -5,25 +5,35 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-// seems synchronized is faster than java.util.concurrent for 1 or 2 threads
-// so should be fine here
-// moreover all operations are very short so synchronized is faster
 public class LocalXAResource implements XAResource {
     private final Connection connection;
     private Xid currentXid;
     private boolean originalAutoCommit;
+    private final Lock lock = new ReentrantLock();
 
     public LocalXAResource(final Connection localTransaction) {
         connection = localTransaction;
     }
 
-    public synchronized Xid getXid() {
+    public Xid getXid() {
+        checkLock();
         return currentXid;
     }
 
     @Override
-    public synchronized void start(final Xid xid, int flag) throws XAException 
{
+    public void start(final Xid xid, int flag) throws XAException {
+        try {
+            if (!lock.tryLock(10, TimeUnit.MINUTES)) {
+
+            }
+        } catch (InterruptedException e) {
+            throw (XAException) new XAException("can't get 
lock").initCause(cantGetLock());
+        }
+
         if (flag == XAResource.TMNOFLAGS) {
             if (currentXid != null) {
                 throw new XAException("Already enlisted in another transaction 
with xid " + xid);
@@ -52,18 +62,28 @@ public class LocalXAResource implements 
         }
     }
 
+    private RuntimeException cantGetLock() {
+        return new IllegalStateException("can't get lock on resource with Xid 
" + currentXid + " from thread " + Thread.currentThread().getName());
+    }
+
     @Override
-    public synchronized void end(final Xid xid, int flag) throws XAException {
-        if (xid == null) {
-            throw new NullPointerException("xid is null");
-        }
-        if (!this.currentXid.equals(xid)) {
-            throw new XAException("Invalid Xid: expected " + this.currentXid + 
", but was " + xid);
+    public void end(final Xid xid, int flag) throws XAException {
+        try {
+            if (xid == null) {
+                throw new NullPointerException("xid is null");
+            }
+            if (!this.currentXid.equals(xid)) {
+                throw new XAException("Invalid Xid: expected " + 
this.currentXid + ", but was " + xid);
+            }
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public synchronized int prepare(final Xid xid) {
+    public int prepare(final Xid xid) {
+        checkLock();
+
         try {
             if (connection.isReadOnly()) {
                 connection.setAutoCommit(originalAutoCommit);
@@ -77,7 +97,9 @@ public class LocalXAResource implements 
     }
 
     @Override
-    public synchronized void commit(final Xid xid, boolean flag) throws 
XAException {
+    public void commit(final Xid xid, boolean flag) throws XAException {
+        checkLock();
+
         if (xid == null) {
             throw new NullPointerException("xid is null");
         }
@@ -106,7 +128,9 @@ public class LocalXAResource implements 
     }
 
     @Override
-    public synchronized void rollback(final Xid xid) throws XAException {
+    public void rollback(final Xid xid) throws XAException {
+        checkLock();
+
         if (xid == null) {
             throw new NullPointerException("xid is null");
         }
@@ -134,7 +158,8 @@ public class LocalXAResource implements 
     }
 
     @Override
-    public synchronized void forget(final Xid xid) {
+    public void forget(final Xid xid) {
+        checkLock();
         if (xid != null && currentXid.equals(xid)) {
             currentXid = null;
         }
@@ -154,4 +179,10 @@ public class LocalXAResource implements 
     public boolean setTransactionTimeout(int transactionTimeout) {
         return false;
     }
+
+    private void checkLock() {
+        if (!lock.tryLock()) {
+            throw cantGetLock();
+        }
+    }
 }

Added: 
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
URL: 
http://svn.apache.org/viewvc/openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java?rev=1365321&view=auto
==============================================================================
--- 
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
 (added)
+++ 
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/MultiThreadedManagedDataSourceTest.java
 Tue Jul 24 22:04:16 2012
@@ -0,0 +1,231 @@
+package org.apache.openejb.resource.jdbc;
+
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.SingletonBean;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.junit.Configuration;
+import org.apache.openejb.junit.Module;
+import org.apache.openejb.resource.jdbc.managed.local.ManagedConnection;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.EJBContext;
+import javax.ejb.LocalBean;
+import javax.ejb.Singleton;
+import javax.sql.DataSource;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(ApplicationComposer.class)
+public class MultiThreadedManagedDataSourceTest {
+    private static final String URL = 
"jdbc:hsqldb:mem:multi-tx-managed;hsqldb.tx=MVCC"; // mvcc otherwise multiple 
transaction tests will fail
+    private static final String USER = "sa";
+    private static final String PASSWORD = "";
+    private static final String TABLE = 
"PUBLIC.MULTI_TX_MANAGED_DATASOURCE_TEST";
+    private static final int INSERTS_NB = 200;
+
+    @EJB
+    private Persister persistManager;
+
+    @BeforeClass
+    public static void createTable() throws SQLException, 
ClassNotFoundException {
+        Class.forName("org.hsqldb.jdbcDriver");
+
+        final Connection connection = DriverManager.getConnection(URL, USER, 
PASSWORD);
+        final Statement statement = connection.createStatement();
+        statement.execute("CREATE TABLE " + TABLE + "(ID INTEGER)");
+        statement.close();
+        connection.commit();
+        connection.close();
+    }
+
+    @Configuration
+    public Properties config() {
+        final Properties p = new Properties();
+        p.put("openejb.jdbc.datasource-creator", "dbcp-alternative");
+
+        p.put("managed", "new://Resource?type=DataSource");
+        p.put("managed.JdbcDriver", "org.hsqldb.jdbcDriver");
+        p.put("managed.JdbcUrl", URL);
+        p.put("managed.UserName", USER);
+        p.put("managed.Password", PASSWORD);
+        p.put("managed.JtaManaged", "true");
+        return p;
+    }
+
+    @Module
+    public EjbJar app() throws Exception {
+        return new EjbJar()
+                .enterpriseBean(new 
SingletonBean(Persister.class).localBean());
+    }
+
+    @LocalBean
+    @Singleton
+    public static class Persister {
+        private static final AtomicInteger ID = new AtomicInteger(1);
+
+        @Resource(name = "managed")
+        private DataSource ds;
+
+        @Resource
+        private EJBContext context;
+
+        public int save() throws SQLException {
+            int id = ID.getAndIncrement();
+            MultiThreadedManagedDataSourceTest.save(ds, id);
+            return id;
+        }
+
+        public int saveRollback(boolean ok) throws SQLException {
+            int id = ID.getAndIncrement();
+            MultiThreadedManagedDataSourceTest.save(ds, id);
+            if (!ok) {
+                context.setRollbackOnly();
+            }
+            return id;
+        }
+    }
+
+    @Test
+    public void inserts() throws SQLException {
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicInteger fail = new AtomicInteger(0);
+        run(new Runnable() {
+            @Override
+            public void run() {
+                int id = -1;
+                try {
+                    id = persistManager.save();
+                } catch (SQLException e) {
+                    errors.incrementAndGet();
+                }
+                try {
+                    if (!exists(id)) {
+                        fail.incrementAndGet();
+                    }
+                } catch (SQLException e) {
+                    errors.incrementAndGet();
+                }
+            }
+        });
+        assertEquals(0, errors.get());
+        assertEquals(0, fail.get());
+        assertEquals(INSERTS_NB, count(""));
+    }
+
+    @Test
+    public void insertsWithRollback() throws SQLException {
+        final AtomicInteger errors = new AtomicInteger(0);
+        final AtomicInteger fail = new AtomicInteger(0);
+        final AtomicInteger ok = new AtomicInteger(0);
+        run(new Runnable() {
+            @Override
+            public void run() {
+                boolean rollback = Math.random() > 0.5;
+                if (!rollback) {
+                    ok.incrementAndGet();
+                }
+                int id = -1;
+                try {
+                    id = persistManager.saveRollback(!rollback);
+                } catch (SQLException e) {
+                    errors.incrementAndGet();
+                }
+                if (!rollback) {
+                    try {
+                        if (!exists(id)) {
+                            fail.incrementAndGet();
+                        }
+                    } catch (SQLException e) {
+                        errors.incrementAndGet();
+                    }
+                }
+            }
+        });
+        assertEquals(0, errors.get());
+        assertEquals(0, fail.get());
+        assertEquals(ok.get(), count(""));
+    }
+
+    @After
+    public void checkTxMapIsEmpty() throws Exception { // avoid memory leak
+        final Field map = 
ManagedConnection.class.getDeclaredField("CONNECTION_BY_TX");
+        map.setAccessible(true);
+        final Map<?, ?> instance = (Map<?, ?>) map.get(null);
+        assertEquals(0, instance.size());
+
+        execute(DriverManager.getConnection(URL, USER, PASSWORD), "DELETE FROM 
" + TABLE);
+    }
+
+    private static boolean exists(int id) throws SQLException {
+        return count(" WHERE ID = " + id) == 1;
+    }
+
+    private static int count(String where) throws SQLException {
+        final Connection connection = DriverManager.getConnection(URL, USER, 
PASSWORD);
+        final Statement statement = connection.createStatement();
+        final ResultSet result = statement.executeQuery("SELECT count(*) AS NB 
FROM " + TABLE + where);
+        try {
+            assertTrue(result.next());
+            return result.getInt(1);
+        } finally {
+            statement.close();
+            connection.close();
+        }
+    }
+
+    private static void save(final DataSource ds, int id) throws SQLException {
+        execute(ds, "INSERT INTO " + TABLE + "(ID) VALUES(" + id + ")");
+    }
+
+    private static void execute(final DataSource ds, final String sql) throws 
SQLException {
+        final Connection connection = ds.getConnection();
+        final Statement statement = connection.createStatement();
+        statement.executeUpdate(sql);
+        statement.close();
+        connection.close();
+    }
+
+    private static void execute(final Connection connection, final String sql) 
throws SQLException {
+        final Statement statement = connection.createStatement();
+        statement.executeUpdate(sql);
+        statement.close();
+        connection.close();
+    }
+
+    private void run(final Runnable runnable) {
+        final ExecutorService es = Executors.newFixedThreadPool(20);
+        for (int i = 0; i < INSERTS_NB; i++) {
+            es.submit(new Runnable() {
+                @Override
+                public void run() {
+                    runnable.run();
+                }
+            });
+        }
+        es.shutdown();
+        try {
+            es.awaitTermination(5, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            fail();
+        }
+    }
+}

Added: 
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
URL: 
http://svn.apache.org/viewvc/openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java?rev=1365321&view=auto
==============================================================================
--- 
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
 (added)
+++ 
openejb/branches/openejb-pool/container/openejb-core/src/test/java/org/apache/openejb/resource/jdbc/UTManagedDataSourceTest.java
 Tue Jul 24 22:04:16 2012
@@ -0,0 +1,227 @@
+package org.apache.openejb.resource.jdbc;
+
+import org.apache.openejb.jee.EjbJar;
+import org.apache.openejb.jee.SingletonBean;
+import org.apache.openejb.junit.ApplicationComposer;
+import org.apache.openejb.junit.Configuration;
+import org.apache.openejb.junit.Module;
+import org.apache.openejb.resource.jdbc.managed.local.ManagedConnection;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.annotation.Resource;
+import javax.ejb.EJB;
+import javax.ejb.LocalBean;
+import javax.ejb.Singleton;
+import javax.ejb.TransactionManagement;
+import javax.ejb.TransactionManagementType;
+import javax.sql.DataSource;
+import javax.transaction.UserTransaction;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(ApplicationComposer.class)
+public class UTManagedDataSourceTest {
+    private static final String URL = 
"jdbc:hsqldb:mem:managed;hsqldb.tx=MVCC"; // mvcc otherwise multiple 
transaction tests will fail
+    private static final String USER = "sa";
+    private static final String PASSWORD = "";
+    private static final String TABLE = "PUBLIC.MANAGED_DATASOURCE_TEST";
+
+    @EJB
+    private Persister persistManager;
+
+    @BeforeClass
+    public static void createTable() throws SQLException, 
ClassNotFoundException {
+        Class.forName("org.hsqldb.jdbcDriver");
+
+        final Connection connection = DriverManager.getConnection(URL, USER, 
PASSWORD);
+        final Statement statement = connection.createStatement();
+        statement.execute("CREATE TABLE " + TABLE + "(ID INTEGER)");
+        statement.close();
+        connection.commit();
+        connection.close();
+    }
+
+    @Configuration
+    public Properties config() {
+        final Properties p = new Properties();
+        p.put("openejb.jdbc.datasource-creator", "dbcp-alternative");
+
+        p.put("managed", "new://Resource?type=DataSource");
+        p.put("managed.JdbcDriver", "org.hsqldb.jdbcDriver");
+        p.put("managed.JdbcUrl", URL);
+        p.put("managed.UserName", USER);
+        p.put("managed.Password", PASSWORD);
+        p.put("managed.JtaManaged", "true");
+        return p;
+    }
+
+    @Module
+    public EjbJar app() throws Exception {
+        return new EjbJar()
+                .enterpriseBean(new SingletonBean(Persister.class).localBean())
+                .enterpriseBean(new 
SingletonBean(OtherPersister.class).localBean());
+    }
+
+    @LocalBean
+    @Singleton
+    @TransactionManagement(TransactionManagementType.BEAN)
+    public static class OtherPersister {
+        @Resource(name = "managed")
+        private DataSource ds;
+
+        @Resource
+        private UserTransaction ut;
+
+        public void save() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 10);
+            ut.commit();
+        }
+
+        public void saveAndRollback() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 11);
+            ut.rollback();
+        }
+    }
+
+    @LocalBean
+    @Singleton
+    @TransactionManagement(TransactionManagementType.BEAN)
+    public static class Persister {
+        @Resource(name = "managed")
+        private DataSource ds;
+
+        @EJB
+        private OtherPersister other;
+
+        @Resource
+        private UserTransaction ut;
+
+        public void save() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 1);
+            ut.commit();
+        }
+
+        public void saveAndRollback() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 2);
+            ut.rollback();
+        }
+
+        public void saveTwice() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 3);
+            UTManagedDataSourceTest.save(ds, 4);
+            ut.commit();
+        }
+
+        public void rollbackMultipleSave() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 5);
+            UTManagedDataSourceTest.save(ds, 6);
+            ut.rollback();
+        }
+
+        public void saveInThisTxAndAnotherOne() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 7);
+            other.save();
+            ut.commit();
+        }
+
+        public void saveInThisTxAndRollbackInAnotherOne() throws Exception {
+            ut.begin();
+            UTManagedDataSourceTest.save(ds, 8);
+            other.saveAndRollback();
+            ut.commit();
+        }
+    }
+
+    @Test
+    public void commit() throws Exception {
+        persistManager.save();
+        assertTrue(exists(1));
+    }
+
+    @Test
+    public void rollback() throws Exception {
+        persistManager.saveAndRollback();
+        assertFalse(exists(2));
+    }
+
+    @Test
+    public void commit2() throws Exception {
+        persistManager.saveTwice();
+        assertTrue(exists(3));
+        assertTrue(exists(4));
+    }
+
+    @Test
+    public void rollback2() throws Exception {
+        persistManager.rollbackMultipleSave();
+        assertFalse(exists(5));
+        assertFalse(exists(6));
+    }
+
+    @Test
+    public void saveDifferentTx() throws Exception {
+        persistManager.saveInThisTxAndAnotherOne();
+        assertTrue(exists(7));
+        assertTrue(exists(10));
+    }
+
+    @Test
+    public void saveRollbackDifferentTx() throws Exception {
+        persistManager.saveInThisTxAndRollbackInAnotherOne();
+        assertTrue(exists(8));
+        assertFalse(exists(12));
+    }
+
+    @After
+    public void checkTxMapIsEmpty() throws Exception { // avoid memory leak
+        final Field map = 
ManagedConnection.class.getDeclaredField("CONNECTION_BY_TX");
+        map.setAccessible(true);
+        final Map<?, ?> instance = (Map<?, ?>) map.get(null);
+        assertEquals(0, instance.size());
+    }
+
+    private static boolean exists(int id) throws Exception {
+        final Connection connection = DriverManager.getConnection(URL, USER, 
PASSWORD);
+        final Statement statement = connection.createStatement();
+        final ResultSet result = statement.executeQuery("SELECT count(*) AS NB 
FROM " + TABLE + " WHERE ID = " + id);
+        try {
+            assertTrue(result.next());
+            return result.getInt(1) == 1;
+        } finally {
+            statement.close();
+            connection.close();
+        }
+    }
+
+    private static void save(final DataSource ds, int id) throws Exception {
+        execute(ds, "INSERT INTO " + TABLE + "(ID) VALUES(" + id + ")");
+    }
+
+    private static void execute(final DataSource ds, final String sql) throws 
Exception {
+        final Connection connection = ds.getConnection();
+        final Statement statement = connection.createStatement();
+        statement.executeUpdate(sql);
+        statement.close();
+        connection.close();
+    }
+}


Reply via email to