Repository: activemq
Updated Branches:
  refs/heads/master 314d5a516 -> bd45d931b


[AMQ-6906] tidy up cleanup on jdbc error and combine updates in single 
completion to avoid prepared sequence update on non transacted add with error. 
More jdbc error related tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd45d931
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd45d931
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd45d931

Branch: refs/heads/master
Commit: bd45d931ba273be4d94bf213c6befd116f99dcc8
Parents: 314d5a5
Author: gtully <[email protected]>
Authored: Thu May 3 11:32:21 2018 +0100
Committer: gtully <[email protected]>
Committed: Thu May 3 11:32:21 2018 +0100

----------------------------------------------------------------------
 .../store/jdbc/JDBCPersistenceAdapter.java      |    6 +-
 .../store/jdbc/JdbcMemoryTransactionStore.java  |   16 +-
 .../activemq/store/jdbc/TransactionContext.java |   73 +-
 .../activemq/broker/XARecoveryBrokerTest.java   |    4 +-
 .../store/jdbc/JDBCCommitExceptionTest.java     |   12 +-
 .../store/jdbc/JDBCXACommitExceptionTest.java   | 1151 ++++++++++++++++++
 .../activemq/store/jdbc/XACompletionTest.java   |    4 +
 7 files changed, 1213 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 102dec5..95b1446 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -785,12 +785,10 @@ public class JDBCPersistenceAdapter extends 
DataSourceServiceSupport implements
         }
     }
 
-    public void commitAdd(ConnectionContext context, MessageId messageId, long 
preparedSequenceId) throws IOException {
+    public void commitAdd(ConnectionContext context, final MessageId 
messageId, final long preparedSequenceId, final long newSequence) throws 
IOException {
         TransactionContext c = getTransactionContext(context);
         try {
-            long sequence = (Long)messageId.getEntryLocator();
-            getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
-            messageId.setEntryLocator(preparedSequenceId);
+            getAdapter().doCommitAddOp(c, preparedSequenceId, newSequence);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to commit add: " + 
messageId + ". Reason: " + e, e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 6df0860..4bbe43d 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -78,11 +78,12 @@ public class JdbcMemoryTransactionStore extends 
MemoryTransactionStore {
                 cmd.run(ctx);
             }
 
+            persistenceAdapter.commitTransaction(ctx);
+
         } catch ( IOException e ) {
             persistenceAdapter.rollbackTransaction(ctx);
             throw e;
         }
-        persistenceAdapter.commitTransaction(ctx);
 
         ctx.setXid(null);
         // setup for commit outcome
@@ -126,13 +127,15 @@ public class JdbcMemoryTransactionStore extends 
MemoryTransactionStore {
             final Long preparedEntrySequence = (Long) 
message.getMessageId().getEntryLocator();
             TransactionContext c = 
jdbcPersistenceAdapter.getTransactionContext(context);
 
+            long newSequence;
             synchronized (jdbcMessageStore.pendingAdditions) {
-                
message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId());
-
+                newSequence = jdbcPersistenceAdapter.getNextSequenceId();
+                final long sequenceToSet = newSequence;
                 c.onCompletion(new Runnable() {
                     @Override
                     public void run() {
-                        
message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator());
+                        message.getMessageId().setEntryLocator(sequenceToSet);
+                        
message.getMessageId().setFutureOrSequenceLong(sequenceToSet);
                     }
                 });
 
@@ -141,7 +144,7 @@ public class JdbcMemoryTransactionStore extends 
MemoryTransactionStore {
                 }
             }
 
-            jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), 
preparedEntrySequence);
+            jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), 
preparedEntrySequence, newSequence);
             jdbcMessageStore.onAdd(message, 
(Long)message.getMessageId().getEntryLocator(), message.getPriority());
         }
 
@@ -175,8 +178,9 @@ public class JdbcMemoryTransactionStore extends 
MemoryTransactionStore {
                             
((LastAckCommand)removeMessageCommand).rollback(ctx);
                         } else {
                             MessageId messageId = 
removeMessageCommand.getMessageAck().getLastMessageId();
+                            long sequence = (Long)messageId.getEntryLocator();
                             // need to unset the txid flag on the existing row
-                            ((JDBCPersistenceAdapter) 
persistenceAdapter).commitAdd(ctx, messageId, 
(Long)messageId.getEntryLocator());
+                            ((JDBCPersistenceAdapter) 
persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence);
 
                             if (removeMessageCommand instanceof 
RecoveredRemoveMessageCommand) {
                                 ((JDBCMessageStore) 
removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand)
 removeMessageCommand).getMessage());

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index ab3bef8..db2aace 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -84,7 +84,7 @@ public class TransactionContext {
                 } catch (IllegalMonitorStateException oops) {
                     LOG.error("Thread does not hold the context lock on close 
of:"  + connection, oops);
                 }
-                close();
+                silentClose();
                 IOException ioe = IOExceptionSupport.create(e);
                 if (persistenceAdapter.getBrokerService() != null) {
                     
persistenceAdapter.getBrokerService().handleIOException(ioe);
@@ -137,45 +137,39 @@ public class TransactionContext {
         } finally {
             try {
                 p.close();
-            } catch (Throwable e) {
-            }
+            } catch (Throwable ignored) {}
         }
     }
 
+    private void silentClose() {
+        silentClosePreparedStatements();
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Throwable ignored) {}
+            connection = null;
+        }
+    }
+
+
     public void close() throws IOException {
         if (!inTx) {
             try {
-
-                /**
-                 * we are not in a transaction so should not be committing ??
-                 * This was previously commented out - but had adverse affects
-                 * on testing - so it's back!
-                 * 
-                 */
-                try {
+                // can be null for topic ops that bypass the store via 
existing cursor state
+                if (connection != null) {
+                    final boolean needsCommit = !connection.getAutoCommit();
                     executeBatch();
-                } finally {
-                    if (connection != null && !connection.getAutoCommit()) {
+                    if (needsCommit) {
                         connection.commit();
                     }
                 }
-
             } catch (SQLException e) {
                 JDBCPersistenceAdapter.log("Error while closing connection: ", 
e);
                 IOException ioe = IOExceptionSupport.create(e);
                 persistenceAdapter.getBrokerService().handleIOException(ioe);
                 throw ioe;
             } finally {
-                try {
-                    if (connection != null) {
-                        connection.close();
-                    }
-                } catch (Throwable e) {
-                    // ignore
-                    LOG.trace("Closing connection failed due: " + 
e.getMessage() + ". This exception is ignored.", e);
-                } finally {
-                    connection = null;
-                }
+                silentClose();
                 for (Runnable completion: completions) {
                     completion.run();
                 }
@@ -197,8 +191,9 @@ public class TransactionContext {
             throw new IOException("Not started.");
         }
         try {
+            final boolean needsCommit = !connection.getAutoCommit();
             executeBatch();
-            if (!connection.getAutoCommit()) {
+            if (needsCommit) {
                 connection.commit();
             }
         } catch (SQLException e) {
@@ -230,19 +225,23 @@ public class TransactionContext {
         }
     }
 
-    private void doRollback() throws SQLException {
-        if (addMessageStatement != null) {
-            addMessageStatement.close();
-            addMessageStatement = null;
-        }
-        if (removedMessageStatement != null) {
-            removedMessageStatement.close();
-            removedMessageStatement = null;
-        }
-        if (updateLastAckStatement != null) {
-            updateLastAckStatement.close();
-            updateLastAckStatement = null;
+    private PreparedStatement silentClosePreparedStatement(PreparedStatement 
preparedStatement) {
+        if (preparedStatement != null) {
+            try {
+                preparedStatement.close();
+            } catch (Throwable ignored) {}
         }
+        return null;
+    }
+
+    private void silentClosePreparedStatements() {
+        addMessageStatement = 
silentClosePreparedStatement(addMessageStatement);
+        removedMessageStatement = 
silentClosePreparedStatement(removedMessageStatement);
+        updateLastAckStatement = 
silentClosePreparedStatement(updateLastAckStatement);
+    }
+
+    private void doRollback() throws SQLException {
+        silentClosePreparedStatements();
         completions.clear();
         connection.rollback();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 387e77f..6a8b3f4 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
  */
 public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
     protected static final Logger LOG = 
LoggerFactory.getLogger(XARecoveryBrokerTest.class);
-    public boolean prioritySupport = false;
+    public boolean prioritySupport = true;
 
     public void testPreparedJmxView() throws Exception {
 
@@ -712,7 +712,7 @@ public class XARecoveryBrokerTest extends 
BrokerRestartTestSupport {
 
     }
 
-    public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() 
{
+    public void 
x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
         addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, 
Boolean.TRUE});
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
index 6972a14..2b498a3 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
@@ -76,19 +76,23 @@ public class JDBCCommitExceptionTest extends TestCase {
         broker.stop();
     }
 
-     protected void dumpMessages() throws Exception {
+     protected int dumpMessages() throws Exception {
+        int count = 0;
         WireFormat wireFormat = new OpenWireFormat();
         java.sql.Connection conn = ((JDBCPersistenceAdapter) 
broker.getPersistenceAdapter()).getDataSource().getConnection();
-        PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG 
FROM ACTIVEMQ_MSGS");
+        PreparedStatement statement = conn.prepareStatement("SELECT ID, XID, 
MSG FROM ACTIVEMQ_MSGS");
         ResultSet result = statement.executeQuery();
         LOG.info("Messages left in broker after test");
         while(result.next()) {
             long id = result.getLong(1);
-            org.apache.activemq.command.Message message = 
(org.apache.activemq.command.Message)wireFormat.unmarshal(new 
ByteSequence(result.getBytes(2)));
-            LOG.info("id: " + id + ", message SeqId: " + 
message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+            String xid = result.getString(2);
+            org.apache.activemq.command.Message message = 
(org.apache.activemq.command.Message)wireFormat.unmarshal(new 
ByteSequence(result.getBytes(3)));
+            LOG.info("id: " + id + ", xid: " + xid + ", message SeqId: " + 
message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+            count++;
         }
         statement.close();
         conn.close();
+        return count;
     }
 
     protected int receiveMessages(int messagesExpected) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
index 046ab81..1529515 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
@@ -21,16 +21,72 @@ import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
+import javax.management.ObjectName;
+import javax.sql.DataSource;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLRecoverableException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.DiscardingDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,4 +222,1099 @@ public class JDBCXACommitExceptionTest extends 
JDBCCommitExceptionTest {
         assertEquals("one enque", 1, 
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
     }
 
+
+    final AtomicInteger getAutoCommitCount = new AtomicInteger();
+    private ArrayList<Integer> getAutoCommitErrors = new ArrayList<Integer>();
+    private ArrayList<Integer> executeUpdateErrorOps = new 
ArrayList<Integer>();
+    final AtomicInteger executeUpdateErrorOpsCount = new AtomicInteger();
+    private ArrayList<Integer> executeBatchErrorOps = new ArrayList<Integer>();
+    final AtomicInteger executeBatchErrorOpsCount = new AtomicInteger();
+
+    public void testXAEnqueueErrors() throws Exception {
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.clear();
+        executeUpdateErrorOpsCount.set(0);
+        executeUpdateErrorOps.clear();
+
+        broker.stop();
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        //broker.setDeleteAllMessagesOnStartup(true);
+
+        JDBCPersistenceAdapter jdbcPersistenceAdapter = new 
JDBCPersistenceAdapter();
+        DataSource realDataSource = jdbc.getDataSource();
+        jdbcPersistenceAdapter.setDataSource(new 
TestDataSource(realDataSource));
+        jdbcPersistenceAdapter.setUseLock(false);
+        broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+        connectionUri = 
broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+
+
+        // inject error
+        executeUpdateErrorOps.add(5);
+        executeUpdateErrorOps.add(9);
+        executeUpdateErrorOps.add(12);
+
+        getAutoCommitErrors.add(59);
+        getAutoCommitErrors.add(60);
+
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri);
+
+        XAConnection c = factory.createXAConnection();
+        c.start();
+        XASession s = c.createXASession();
+        final XAResource recoveryResource = s.getXAResource();
+
+        for (int i = 0; i < 10; i++) {
+            XAConnection connection = factory.createXAConnection();
+            connection.start();
+            XASession session = connection.createXASession();
+
+            Destination destination = session.createQueue("TEST");
+            MessageProducer producer = session.createProducer(destination);
+
+            XAResource resource = session.getXAResource();
+
+            Xid tid = createXid();
+            resource.start(tid, XAResource.TMNOFLAGS);
+            ActiveMQMessage message = (ActiveMQMessage) 
session.createMessage();
+            message.setTransactionId(new XATransactionId(tid));
+            producer.send(message);
+
+            resource.end(tid, XAResource.TMSUCCESS);
+            resource.prepare(tid);
+
+            try {
+                resource.commit(tid, false);
+            } catch (Exception expected) {
+                expected.printStackTrace();
+
+                dumpMessages();
+
+                boolean done = false;
+                while (!done) {
+                    // recover
+                    Xid[] recovered = 
recoveryResource.recover(XAResource.TMSTARTRSCAN);
+                    recoveryResource.recover(XAResource.TMNOFLAGS);
+
+                    try {
+                        recoveryResource.commit(recovered[0], false);
+                        done = true;
+                    } catch (XAException ok) {
+                        ok.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        dumpMessages();
+
+        assertEquals("en-queue", 10, 
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("en-queue", 10, 
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+        ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+           .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("qs", 10, proxy.getQueueSize());
+        assertEquals("enq", 10, proxy.getEnqueueCount());
+        assertEquals("curs", 10, proxy.cursorSize());
+    }
+
+    public void testNonTxEnqueueErrors() throws Exception {
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.clear();
+        executeUpdateErrorOpsCount.set(0);
+        executeUpdateErrorOps.clear();
+        executeBatchErrorOps.clear();
+        executeBatchErrorOpsCount.set(0);
+
+        broker.stop();
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+
+        JDBCPersistenceAdapter jdbcPersistenceAdapter = new 
JDBCPersistenceAdapter();
+        DataSource realDataSource = jdbc.getDataSource();
+        jdbcPersistenceAdapter.setDataSource(new 
TestDataSource(realDataSource));
+        jdbcPersistenceAdapter.setUseLock(false);
+        jdbcPersistenceAdapter.setCleanupPeriod(0);
+        broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+        connectionUri = 
broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+
+
+        executeBatchErrorOps.add(2);
+        executeBatchErrorOps.add(3);
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.add(10);
+
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri);
+
+        for (int i = 0; i < 10; i++) {
+            XAConnection connection = factory.createXAConnection();
+            connection.start();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            Destination destination = session.createQueue("TEST");
+            MessageProducer producer = session.createProducer(destination);
+            ActiveMQMessage message = (ActiveMQMessage) 
session.createMessage();
+
+            try {
+                producer.send(message);
+            } catch (Exception expected) {
+                expected.printStackTrace();
+
+                dumpMessages();
+
+                boolean done = false;
+                while (!done) {
+                    try {
+                        producer.send(message);
+                        done = true;
+                    } catch (Exception ok) {
+                        ok.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        assertEquals("messages in db", 10, dumpMessages());
+
+
+        assertEquals("en-queue", 10, 
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("en-queue", 10, 
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+        ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+           .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("qs", 10, proxy.getQueueSize());
+        assertEquals("enq", 10, proxy.getEnqueueCount());
+        assertEquals("curs", 10, proxy.cursorSize());
+    }
+
+    public void testNonTxEnqueueOverNetworkErrorsRestart() throws Exception {
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.clear();
+        executeUpdateErrorOpsCount.set(0);
+        executeUpdateErrorOps.clear();
+        executeBatchErrorOps.clear();
+        executeBatchErrorOpsCount.set(0);
+
+        broker.stop();
+
+        final AtomicBoolean done = new AtomicBoolean(false);
+        Thread thread = new Thread() {
+            @Override
+            public void run() {
+
+                while (!done.get()) {
+                    try {
+
+                        broker = new BrokerService();
+                        broker.setAdvisorySupport(false);
+                        PolicyMap policyMap = new PolicyMap();
+                        PolicyEntry policyEntry = new PolicyEntry();
+                        policyEntry.setUseCache(false);
+                        policyEntry.setExpireMessagesPeriod(0);
+                        policyEntry.setDeadLetterStrategy(new 
DiscardingDeadLetterStrategy());
+                        policyMap.setDefaultEntry(policyEntry);
+                        broker.setDestinationPolicy(policyMap);
+
+                        JDBCPersistenceAdapter jdbcPersistenceAdapter = new 
JDBCPersistenceAdapter();
+                        DataSource realDataSource = jdbc.getDataSource();
+                        jdbcPersistenceAdapter.setDataSource(new 
TestDataSource(realDataSource));
+                        jdbcPersistenceAdapter.setUseLock(false);
+                        jdbcPersistenceAdapter.setCleanupPeriod(0);
+                        broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+                        TransportConnector transportConnector = 
broker.addConnector("tcp://localhost:61616");
+                        //transportConnector.setAuditNetworkProducers(true);
+                        connectionUri = 
transportConnector.getPublishableConnectString();
+                        DefaultIOExceptionHandler stopOnIOEx = new 
DefaultIOExceptionHandler();
+                        stopOnIOEx.setIgnoreSQLExceptions(false);
+                        stopOnIOEx.setStopStartConnectors(false);
+                        broker.setIoExceptionHandler(stopOnIOEx);
+                        broker.start();
+
+                        broker.waitUntilStopped();
+
+                    } catch (Exception oops) {
+                        oops.printStackTrace();
+                        done.set(true);
+                    }
+                }
+            }
+        };
+        thread.start();
+
+        //executeBatchErrorOps.add(5);
+        //executeBatchErrorOps.add(3);
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.add(39);
+
+
+        // network broker to push messages
+        final BrokerService other = new BrokerService();
+        other.setBrokerName("other");
+        other.setAdvisorySupport(false);
+        other.setUseJmx(false);
+        other.setPersistent(false);
+        NetworkConnector netwokConnector = 
other.addNetworkConnector("static://tcp://localhost:61616");
+        netwokConnector.setStaticBridge(true);
+        netwokConnector.setStaticallyIncludedDestinations(Arrays.asList(new 
ActiveMQDestination[]{new ActiveMQQueue("TEST")}));
+        other.start();
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://other");
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) 
connectionFactory.createConnection();
+        activeMQConnection.setWatchTopicAdvisories(false);
+        Session session = activeMQConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        activeMQConnection.start();
+        Destination destination = session.createQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(message);
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("MESSAGES DRAINED :" + 
((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+                return 0 == 
((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount();
+            }
+        });
+        activeMQConnection.close();
+
+
+        assertEquals("db", 10, dumpMessages());
+        assertEquals("messages count", 10, 
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+        ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+           .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("qs", 10, proxy.getQueueSize());
+        assertEquals("curs", 10, proxy.cursorSize());
+
+        done.set(true);
+        other.stop();
+    }
+
+
+    private class TestDataSource implements javax.sql.DataSource {
+
+        private final javax.sql.DataSource realDataSource;
+
+        public TestDataSource(javax.sql.DataSource dataSource) {
+            realDataSource = dataSource;
+        }
+
+        @Override
+        public Connection getConnection() throws SQLException {
+            Connection autoCommitCheckConnection = new 
AutoCommitCheckConnection(realDataSource.getConnection());
+            return autoCommitCheckConnection;
+        }
+
+        @Override
+        public Connection getConnection(String username, String password) 
throws SQLException {
+            Connection autoCommitCheckConnection = new 
AutoCommitCheckConnection(realDataSource.getConnection(username, password));
+
+            return autoCommitCheckConnection;
+        }
+
+        @Override
+        public PrintWriter getLogWriter() throws SQLException {
+            return realDataSource.getLogWriter();
+        }
+
+        @Override
+        public void setLogWriter(PrintWriter out) throws SQLException {
+            realDataSource.setLogWriter(out);
+        }
+
+        @Override
+        public void setLoginTimeout(int seconds) throws SQLException {
+            realDataSource.setLoginTimeout(seconds);
+        }
+
+        @Override
+        public int getLoginTimeout() throws SQLException {
+            return realDataSource.getLoginTimeout();
+        }
+
+        @Override
+        public java.util.logging.Logger getParentLogger() throws 
SQLFeatureNotSupportedException {
+            return realDataSource.getParentLogger();
+        }
+
+        @Override
+        public <T> T unwrap(Class<T> iface) throws SQLException {
+            return realDataSource.unwrap(iface);
+        }
+
+        @Override
+        public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            return realDataSource.isWrapperFor(iface);
+        }
+    }
+
+    private class AutoCommitCheckConnection implements Connection {
+
+        private final Connection realConnection;
+
+        public AutoCommitCheckConnection(Connection connection) {
+            this.realConnection = connection;
+        }
+
+        @Override
+        public void commit() throws SQLException {
+            realConnection.commit();
+        }
+
+        // Just plumbing for wrapper. Might have been better to do a Dynamic 
Proxy here.
+
+        @Override
+        public Statement createStatement() throws SQLException {
+            return realConnection.createStatement();
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql) throws 
SQLException {
+            //final AtomicInteger executeCount = new AtomicInteger();
+
+            final PreparedStatement delegate = 
realConnection.prepareStatement(sql);
+            return new PreparedStatement() {
+                public ResultSet executeQuery() throws SQLException {
+                    return delegate.executeQuery();
+                }
+
+                final
+                public int executeUpdate() throws SQLException {
+                    int ret = delegate.executeUpdate();
+                    if 
(executeUpdateErrorOps.contains(executeUpdateErrorOpsCount.incrementAndGet())) {
+                        throw new SQLRecoverableException("SOME executeUpdate 
ERROR[" + executeUpdateErrorOpsCount.get() +"]");
+                    }
+                    return ret;
+                }
+
+                public void setNull(int parameterIndex, int sqlType) throws 
SQLException {
+                    delegate.setNull(parameterIndex, sqlType);
+                }
+
+                public void setBoolean(int parameterIndex, boolean x) throws 
SQLException {
+                    delegate.setBoolean(parameterIndex, x);
+                }
+
+                public void setByte(int parameterIndex, byte x) throws 
SQLException {
+                    delegate.setByte(parameterIndex, x);
+                }
+
+                public void setShort(int parameterIndex, short x) throws 
SQLException {
+                    delegate.setShort(parameterIndex, x);
+                }
+
+                public void setInt(int parameterIndex, int x) throws 
SQLException {
+                    delegate.setInt(parameterIndex, x);
+                }
+
+                public void setLong(int parameterIndex, long x) throws 
SQLException {
+                    delegate.setLong(parameterIndex, x);
+                }
+
+                public void setFloat(int parameterIndex, float x) throws 
SQLException {
+                    delegate.setFloat(parameterIndex, x);
+                }
+
+                public void setDouble(int parameterIndex, double x) throws 
SQLException {
+                    delegate.setDouble(parameterIndex, x);
+                }
+
+                public void setBigDecimal(int parameterIndex, BigDecimal x) 
throws SQLException {
+                    delegate.setBigDecimal(parameterIndex, x);
+                }
+
+                public void setString(int parameterIndex, String x) throws 
SQLException {
+                    delegate.setString(parameterIndex, x);
+                }
+
+                public void setBytes(int parameterIndex, byte[] x) throws 
SQLException {
+                    delegate.setBytes(parameterIndex, x);
+                }
+
+                public void setDate(int parameterIndex, Date x) throws 
SQLException {
+                    delegate.setDate(parameterIndex, x);
+                }
+
+                public void setTime(int parameterIndex, Time x) throws 
SQLException {
+                    delegate.setTime(parameterIndex, x);
+                }
+
+                public void setTimestamp(int parameterIndex, Timestamp x) 
throws SQLException {
+                    delegate.setTimestamp(parameterIndex, x);
+                }
+
+                public void setAsciiStream(int parameterIndex, InputStream x, 
int length) throws SQLException {
+                    delegate.setAsciiStream(parameterIndex, x, length);
+                }
+
+                @Deprecated
+                public void setUnicodeStream(int parameterIndex, InputStream 
x, int length) throws SQLException {
+                    delegate.setUnicodeStream(parameterIndex, x, length);
+                }
+
+                public void setBinaryStream(int parameterIndex, InputStream x, 
int length) throws SQLException {
+                    delegate.setBinaryStream(parameterIndex, x, length);
+                }
+
+                public void clearParameters() throws SQLException {
+                    delegate.clearParameters();
+                }
+
+                public void setObject(int parameterIndex, Object x, int 
targetSqlType) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType);
+                }
+
+                public void setObject(int parameterIndex, Object x) throws 
SQLException {
+                    delegate.setObject(parameterIndex, x);
+                }
+
+                public boolean execute() throws SQLException {
+                    return delegate.execute();
+                }
+
+                public void addBatch() throws SQLException {
+                    delegate.addBatch();
+                }
+
+                public void setCharacterStream(int parameterIndex, Reader 
reader, int length) throws SQLException {
+                    delegate.setCharacterStream(parameterIndex, reader, 
length);
+                }
+
+                public void setRef(int parameterIndex, Ref x) throws 
SQLException {
+                    delegate.setRef(parameterIndex, x);
+                }
+
+                public void setBlob(int parameterIndex, Blob x) throws 
SQLException {
+                    delegate.setBlob(parameterIndex, x);
+                }
+
+                public void setClob(int parameterIndex, Clob x) throws 
SQLException {
+                    delegate.setClob(parameterIndex, x);
+                }
+
+                public void setArray(int parameterIndex, Array x) throws 
SQLException {
+                    delegate.setArray(parameterIndex, x);
+                }
+
+                public ResultSetMetaData getMetaData() throws SQLException {
+                    return delegate.getMetaData();
+                }
+
+                public void setDate(int parameterIndex, Date x, Calendar cal) 
throws SQLException {
+                    delegate.setDate(parameterIndex, x, cal);
+                }
+
+                public void setTime(int parameterIndex, Time x, Calendar cal) 
throws SQLException {
+                    delegate.setTime(parameterIndex, x, cal);
+                }
+
+                public void setTimestamp(int parameterIndex, Timestamp x, 
Calendar cal) throws SQLException {
+                    delegate.setTimestamp(parameterIndex, x, cal);
+                }
+
+                public void setNull(int parameterIndex, int sqlType, String 
typeName) throws SQLException {
+                    delegate.setNull(parameterIndex, sqlType, typeName);
+                }
+
+                public void setURL(int parameterIndex, URL x) throws 
SQLException {
+                    delegate.setURL(parameterIndex, x);
+                }
+
+                public ParameterMetaData getParameterMetaData() throws 
SQLException {
+                    return delegate.getParameterMetaData();
+                }
+
+                public void setRowId(int parameterIndex, RowId x) throws 
SQLException {
+                    delegate.setRowId(parameterIndex, x);
+                }
+
+                public void setNString(int parameterIndex, String value) 
throws SQLException {
+                    delegate.setNString(parameterIndex, value);
+                }
+
+                public void setNCharacterStream(int parameterIndex, Reader 
value, long length) throws SQLException {
+                    delegate.setNCharacterStream(parameterIndex, value, 
length);
+                }
+
+                public void setNClob(int parameterIndex, NClob value) throws 
SQLException {
+                    delegate.setNClob(parameterIndex, value);
+                }
+
+                public void setClob(int parameterIndex, Reader reader, long 
length) throws SQLException {
+                    delegate.setClob(parameterIndex, reader, length);
+                }
+
+                public void setBlob(int parameterIndex, InputStream 
inputStream, long length) throws SQLException {
+                    delegate.setBlob(parameterIndex, inputStream, length);
+                }
+
+                public void setNClob(int parameterIndex, Reader reader, long 
length) throws SQLException {
+                    delegate.setNClob(parameterIndex, reader, length);
+                }
+
+                public void setSQLXML(int parameterIndex, SQLXML xmlObject) 
throws SQLException {
+                    delegate.setSQLXML(parameterIndex, xmlObject);
+                }
+
+                public void setObject(int parameterIndex,
+                                      Object x,
+                                      int targetSqlType,
+                                      int scaleOrLength) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType, 
scaleOrLength);
+                }
+
+                public void setAsciiStream(int parameterIndex, InputStream x, 
long length) throws SQLException {
+                    delegate.setAsciiStream(parameterIndex, x, length);
+                }
+
+                public void setBinaryStream(int parameterIndex, InputStream x, 
long length) throws SQLException {
+                    delegate.setBinaryStream(parameterIndex, x, length);
+                }
+
+                public void setCharacterStream(int parameterIndex, Reader 
reader, long length) throws SQLException {
+                    delegate.setCharacterStream(parameterIndex, reader, 
length);
+                }
+
+                public void setAsciiStream(int parameterIndex, InputStream x) 
throws SQLException {
+                    delegate.setAsciiStream(parameterIndex, x);
+                }
+
+                public void setBinaryStream(int parameterIndex, InputStream x) 
throws SQLException {
+                    delegate.setBinaryStream(parameterIndex, x);
+                }
+
+                public void setCharacterStream(int parameterIndex, Reader 
reader) throws SQLException {
+                    delegate.setCharacterStream(parameterIndex, reader);
+                }
+
+                public void setNCharacterStream(int parameterIndex, Reader 
value) throws SQLException {
+                    delegate.setNCharacterStream(parameterIndex, value);
+                }
+
+                public void setClob(int parameterIndex, Reader reader) throws 
SQLException {
+                    delegate.setClob(parameterIndex, reader);
+                }
+
+                public void setBlob(int parameterIndex, InputStream 
inputStream) throws SQLException {
+                    delegate.setBlob(parameterIndex, inputStream);
+                }
+
+                public void setNClob(int parameterIndex, Reader reader) throws 
SQLException {
+                    delegate.setNClob(parameterIndex, reader);
+                }
+/*
+                public void setObject(int parameterIndex,
+                                      Object x,
+                                      SQLType targetSqlType,
+                                      int scaleOrLength) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType, 
scaleOrLength);
+                }
+
+                public void setObject(int parameterIndex, Object x, SQLType 
targetSqlType) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType);
+                }
+
+                public long executeLargeUpdate() throws SQLException {
+                    return delegate.executeLargeUpdate();
+                }
+*/
+                public ResultSet executeQuery(String sql) throws SQLException {
+                    return delegate.executeQuery(sql);
+                }
+
+                public int executeUpdate(String sql) throws SQLException {
+                    return delegate.executeUpdate(sql);
+                }
+
+                public void close() throws SQLException {
+                    delegate.close();
+                }
+
+                public int getMaxFieldSize() throws SQLException {
+                    return delegate.getMaxFieldSize();
+                }
+
+                public void setMaxFieldSize(int max) throws SQLException {
+                    delegate.setMaxFieldSize(max);
+                }
+
+                public int getMaxRows() throws SQLException {
+                    return delegate.getMaxRows();
+                }
+
+                public void setMaxRows(int max) throws SQLException {
+                    delegate.setMaxRows(max);
+                }
+
+                public void setEscapeProcessing(boolean enable) throws 
SQLException {
+                    delegate.setEscapeProcessing(enable);
+                }
+
+                public int getQueryTimeout() throws SQLException {
+                    return delegate.getQueryTimeout();
+                }
+
+                public void setQueryTimeout(int seconds) throws SQLException {
+                    delegate.setQueryTimeout(seconds);
+                }
+
+                public void cancel() throws SQLException {
+                    delegate.cancel();
+                }
+
+                public SQLWarning getWarnings() throws SQLException {
+                    return delegate.getWarnings();
+                }
+
+                public void clearWarnings() throws SQLException {
+                    delegate.clearWarnings();
+                }
+
+                public void setCursorName(String name) throws SQLException {
+                    delegate.setCursorName(name);
+                }
+
+                public boolean execute(String sql) throws SQLException {
+                    return delegate.execute(sql);
+                }
+
+                public ResultSet getResultSet() throws SQLException {
+                    return delegate.getResultSet();
+                }
+
+                public int getUpdateCount() throws SQLException {
+                    return delegate.getUpdateCount();
+                }
+
+                public boolean getMoreResults() throws SQLException {
+                    return delegate.getMoreResults();
+                }
+
+                public void setFetchDirection(int direction) throws 
SQLException {
+                    delegate.setFetchDirection(direction);
+                }
+
+                public int getFetchDirection() throws SQLException {
+                    return delegate.getFetchDirection();
+                }
+
+                public void setFetchSize(int rows) throws SQLException {
+                    delegate.setFetchSize(rows);
+                }
+
+                public int getFetchSize() throws SQLException {
+                    return delegate.getFetchSize();
+                }
+
+                public int getResultSetConcurrency() throws SQLException {
+                    return delegate.getResultSetConcurrency();
+                }
+
+                public int getResultSetType() throws SQLException {
+                    return delegate.getResultSetType();
+                }
+
+                public void addBatch(String sql) throws SQLException {
+                    delegate.addBatch(sql);
+                }
+
+                public void clearBatch() throws SQLException {
+                    delegate.clearBatch();
+                }
+
+                public int[] executeBatch() throws SQLException {
+                    if 
(executeBatchErrorOps.contains(executeBatchErrorOpsCount.incrementAndGet())) {
+                        throw new SQLRecoverableException("SOME executeBatch 
ERROR[" + executeBatchErrorOpsCount.get() +"]");
+                    }
+                    return delegate.executeBatch();
+                }
+
+                public Connection getConnection() throws SQLException {
+                    return delegate.getConnection();
+                }
+
+                public boolean getMoreResults(int current) throws SQLException 
{
+                    return delegate.getMoreResults(current);
+                }
+
+                public ResultSet getGeneratedKeys() throws SQLException {
+                    return delegate.getGeneratedKeys();
+                }
+
+                public int executeUpdate(String sql, int autoGeneratedKeys) 
throws SQLException {
+                    return delegate.executeUpdate(sql, autoGeneratedKeys);
+                }
+
+                public int executeUpdate(String sql, int[] columnIndexes) 
throws SQLException {
+                    return delegate.executeUpdate(sql, columnIndexes);
+                }
+
+                public int executeUpdate(String sql, String[] columnNames) 
throws SQLException {
+                    return delegate.executeUpdate(sql, columnNames);
+                }
+
+                public boolean execute(String sql, int autoGeneratedKeys) 
throws SQLException {
+                    return delegate.execute(sql, autoGeneratedKeys);
+                }
+
+                public boolean execute(String sql, int[] columnIndexes) throws 
SQLException {
+                    return delegate.execute(sql, columnIndexes);
+                }
+
+                public boolean execute(String sql, String[] columnNames) 
throws SQLException {
+                    return delegate.execute(sql, columnNames);
+                }
+
+                public int getResultSetHoldability() throws SQLException {
+                    return delegate.getResultSetHoldability();
+                }
+
+                public boolean isClosed() throws SQLException {
+                    return delegate.isClosed();
+                }
+
+                public void setPoolable(boolean poolable) throws SQLException {
+                    delegate.setPoolable(poolable);
+                }
+
+                public boolean isPoolable() throws SQLException {
+                    return delegate.isPoolable();
+                }
+
+                public void closeOnCompletion() throws SQLException {
+                    delegate.closeOnCompletion();
+                }
+
+                public boolean isCloseOnCompletion() throws SQLException {
+                    return delegate.isCloseOnCompletion();
+                }
+/*
+                public long getLargeUpdateCount() throws SQLException {
+                    return delegate.getLargeUpdateCount();
+                }
+
+                public void setLargeMaxRows(long max) throws SQLException {
+                    delegate.setLargeMaxRows(max);
+                }
+
+                public long getLargeMaxRows() throws SQLException {
+                    return delegate.getLargeMaxRows();
+                }
+
+                public long[] executeLargeBatch() throws SQLException {
+                    return delegate.executeLargeBatch();
+                }
+
+                public long executeLargeUpdate(String sql) throws SQLException 
{
+                    return delegate.executeLargeUpdate(sql);
+                }
+
+                public long executeLargeUpdate(String sql, int 
autoGeneratedKeys) throws SQLException {
+                    return delegate.executeLargeUpdate(sql, autoGeneratedKeys);
+                }
+
+                public long executeLargeUpdate(String sql, int[] 
columnIndexes) throws SQLException {
+                    return delegate.executeLargeUpdate(sql, columnIndexes);
+                }
+
+                public long executeLargeUpdate(String sql, String[] 
columnNames) throws SQLException {
+                    return delegate.executeLargeUpdate(sql, columnNames);
+                }
+*/
+                public <T> T unwrap(Class<T> iface) throws SQLException {
+                    return delegate.unwrap(iface);
+                }
+
+                public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
+                    return delegate.isWrapperFor(iface);
+                }
+            };
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql) throws SQLException {
+            return realConnection.prepareCall(sql);
+        }
+
+        @Override
+        public String nativeSQL(String sql) throws SQLException {
+            return realConnection.nativeSQL(sql);
+        }
+
+        @Override
+        public void setAutoCommit(boolean autoCommit) throws SQLException {
+            realConnection.setAutoCommit(autoCommit);
+        }
+
+        @Override
+        public boolean getAutoCommit() throws SQLException {
+            if 
(getAutoCommitErrors.contains(getAutoCommitCount.incrementAndGet())) {
+                throw new SQLRecoverableException("AutoCommit[" + 
getAutoCommitCount.get() +"]");
+            }
+            return realConnection.getAutoCommit();
+        }
+
+        @Override
+        public void rollback() throws SQLException {
+            realConnection.rollback();
+        }
+
+        @Override
+        public void close() throws SQLException {
+            realConnection.close();
+        }
+
+        @Override
+        public boolean isClosed() throws SQLException {
+            return realConnection.isClosed();
+        }
+
+        @Override
+        public DatabaseMetaData getMetaData() throws SQLException {
+            return realConnection.getMetaData();
+        }
+
+        @Override
+        public void setReadOnly(boolean readOnly) throws SQLException {
+            realConnection.setReadOnly(readOnly);
+        }
+
+        @Override
+        public boolean isReadOnly() throws SQLException {
+            return realConnection.isReadOnly();
+        }
+
+        @Override
+        public void setCatalog(String catalog) throws SQLException {
+            realConnection.setCatalog(catalog);
+        }
+
+        @Override
+        public String getCatalog() throws SQLException {
+            return realConnection.getCatalog();
+        }
+
+        @Override
+        public void setTransactionIsolation(int level) throws SQLException {
+            realConnection.setTransactionIsolation(level);
+        }
+
+        @Override
+        public int getTransactionIsolation() throws SQLException {
+            return realConnection.getTransactionIsolation();
+        }
+
+        @Override
+        public SQLWarning getWarnings() throws SQLException {
+            return realConnection.getWarnings();
+        }
+
+        @Override
+        public void clearWarnings() throws SQLException {
+            realConnection.clearWarnings();
+        }
+
+        @Override
+        public Statement createStatement(int resultSetType, int 
resultSetConcurrency) throws SQLException {
+            return realConnection.createStatement(resultSetType, 
resultSetConcurrency);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int 
resultSetType, int resultSetConcurrency) throws SQLException {
+            return realConnection.prepareStatement(sql, resultSetType, 
resultSetConcurrency);
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, 
int resultSetConcurrency) throws SQLException {
+            return realConnection.prepareCall(sql, resultSetType, 
resultSetConcurrency);
+        }
+
+        @Override
+        public Map<String, Class<?>> getTypeMap() throws SQLException {
+            return realConnection.getTypeMap();
+        }
+
+        @Override
+        public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+            realConnection.setTypeMap(map);
+        }
+
+        @Override
+        public void setHoldability(int holdability) throws SQLException {
+            realConnection.setHoldability(holdability);
+        }
+
+        @Override
+        public int getHoldability() throws SQLException {
+            return realConnection.getHoldability();
+        }
+
+        @Override
+        public Savepoint setSavepoint() throws SQLException {
+            return realConnection.setSavepoint();
+        }
+
+        @Override
+        public Savepoint setSavepoint(String name) throws SQLException {
+            return realConnection.setSavepoint(name);
+        }
+
+        @Override
+        public void rollback(Savepoint savepoint) throws SQLException {
+            realConnection.rollback();
+        }
+
+        @Override
+        public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+            realConnection.releaseSavepoint(savepoint);
+        }
+
+        @Override
+        public Statement createStatement(int resultSetType, int 
resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return realConnection.createStatement(resultSetType, 
resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int 
resultSetType, int resultSetConcurrency, int resultSetHoldability) throws 
SQLException {
+            return realConnection.prepareStatement(sql, resultSetType, 
resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, 
int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return realConnection.prepareCall(sql, resultSetType, 
resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int 
autoGeneratedKeys) throws SQLException {
+            return realConnection.prepareStatement(sql, autoGeneratedKeys);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int[] 
columnIndexes) throws SQLException {
+            return realConnection.prepareStatement(sql, columnIndexes);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, String[] 
columnNames) throws SQLException {
+            return realConnection.prepareStatement(sql, columnNames);
+        }
+
+        @Override
+        public Clob createClob() throws SQLException {
+            return realConnection.createClob();
+        }
+
+        @Override
+        public Blob createBlob() throws SQLException {
+            return realConnection.createBlob();
+        }
+
+        @Override
+        public NClob createNClob() throws SQLException {
+            return realConnection.createNClob();
+        }
+
+        @Override
+        public SQLXML createSQLXML() throws SQLException {
+            return realConnection.createSQLXML();
+        }
+
+        @Override
+        public boolean isValid(int timeout) throws SQLException {
+            return realConnection.isValid(timeout);
+        }
+
+        @Override
+        public void setClientInfo(String name, String value) throws 
SQLClientInfoException {
+            realConnection.setClientInfo(name, value);
+        }
+
+        @Override
+        public void setClientInfo(Properties properties) throws 
SQLClientInfoException {
+            realConnection.setClientInfo(properties);
+        }
+
+        @Override
+        public String getClientInfo(String name) throws SQLException {
+            return realConnection.getClientInfo(name);
+        }
+
+        @Override
+        public Properties getClientInfo() throws SQLException {
+            return realConnection.getClientInfo();
+        }
+
+        @Override
+        public Array createArrayOf(String typeName, Object[] elements) throws 
SQLException {
+            return realConnection.createArrayOf(typeName, elements);
+        }
+
+        @Override
+        public Struct createStruct(String typeName, Object[] attributes) 
throws SQLException {
+            return realConnection.createStruct(typeName, attributes);
+        }
+
+        @Override
+        public void setSchema(String schema) throws SQLException {
+            realConnection.setSchema(schema);
+        }
+
+        @Override
+        public String getSchema() throws SQLException {
+            return realConnection.getSchema();
+        }
+
+        @Override
+        public void abort(Executor executor) throws SQLException {
+            realConnection.abort(executor);
+        }
+
+        @Override
+        public void setNetworkTimeout(Executor executor, int milliseconds) 
throws SQLException {
+            realConnection.setNetworkTimeout(executor, milliseconds);
+        }
+
+        @Override
+        public int getNetworkTimeout() throws SQLException {
+            return realConnection.getNetworkTimeout();
+        }
+
+        @Override
+        public <T> T unwrap(Class<T> iface) throws SQLException {
+            return realConnection.unwrap(iface);
+        }
+
+        @Override
+        public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            return realConnection.isWrapperFor(iface);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index 8da0ff6..ee3a88c 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -318,6 +318,7 @@ public class XACompletionTest extends TestSupport {
         resource.recover(XAResource.TMSTARTRSCAN);
         resource.recover(XAResource.TMNOFLAGS);
 
+        dumpMessages();
         Xid tid = createXid();
 
         resource.start(tid, XAResource.TMNOFLAGS);
@@ -342,6 +343,9 @@ public class XACompletionTest extends TestSupport {
 
         consumer.close();
 
+        LOG.info("after close");
+        dumpMessages();
+
         assertEquals("drain", 5, drainUnack(5, "TEST"));
 
         dumpMessages();

Reply via email to