Repository: activemq Updated Branches: refs/heads/master 314d5a516 -> bd45d931b
[AMQ-6906] tidy up cleanup on jdbc error and combine updates in single completion to avoid prepared sequence update on non transacted add with error. More jdbc error related tests Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd45d931 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd45d931 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd45d931 Branch: refs/heads/master Commit: bd45d931ba273be4d94bf213c6befd116f99dcc8 Parents: 314d5a5 Author: gtully <[email protected]> Authored: Thu May 3 11:32:21 2018 +0100 Committer: gtully <[email protected]> Committed: Thu May 3 11:32:21 2018 +0100 ---------------------------------------------------------------------- .../store/jdbc/JDBCPersistenceAdapter.java | 6 +- .../store/jdbc/JdbcMemoryTransactionStore.java | 16 +- .../activemq/store/jdbc/TransactionContext.java | 73 +- .../activemq/broker/XARecoveryBrokerTest.java | 4 +- .../store/jdbc/JDBCCommitExceptionTest.java | 12 +- .../store/jdbc/JDBCXACommitExceptionTest.java | 1151 ++++++++++++++++++ .../activemq/store/jdbc/XACompletionTest.java | 4 + 7 files changed, 1213 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 102dec5..95b1446 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -785,12 +785,10 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements } } - public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException { + public void commitAdd(ConnectionContext context, final MessageId messageId, final long preparedSequenceId, final long newSequence) throws IOException { TransactionContext c = getTransactionContext(context); try { - long sequence = (Long)messageId.getEntryLocator(); - getAdapter().doCommitAddOp(c, preparedSequenceId, sequence); - messageId.setEntryLocator(preparedSequenceId); + getAdapter().doCommitAddOp(c, preparedSequenceId, newSequence); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ", e); throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e); http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java index 6df0860..4bbe43d 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java @@ -78,11 +78,12 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { cmd.run(ctx); } + persistenceAdapter.commitTransaction(ctx); + } catch ( IOException e ) { persistenceAdapter.rollbackTransaction(ctx); throw e; } - persistenceAdapter.commitTransaction(ctx); ctx.setXid(null); // setup for commit outcome @@ -126,13 +127,15 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator(); TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context); + long newSequence; synchronized (jdbcMessageStore.pendingAdditions) { - message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId()); - + newSequence = jdbcPersistenceAdapter.getNextSequenceId(); + final long sequenceToSet = newSequence; c.onCompletion(new Runnable() { @Override public void run() { - message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator()); + message.getMessageId().setEntryLocator(sequenceToSet); + message.getMessageId().setFutureOrSequenceLong(sequenceToSet); } }); @@ -141,7 +144,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { } } - jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence); + jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence, newSequence); jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority()); } @@ -175,8 +178,9 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore { ((LastAckCommand)removeMessageCommand).rollback(ctx); } else { MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId(); + long sequence = (Long)messageId.getEntryLocator(); // need to unset the txid flag on the existing row - ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator()); + ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence); if (removeMessageCommand instanceof RecoveredRemoveMessageCommand) { ((JDBCMessageStore) removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand) removeMessageCommand).getMessage()); http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java index ab3bef8..db2aace 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java @@ -84,7 +84,7 @@ public class TransactionContext { } catch (IllegalMonitorStateException oops) { LOG.error("Thread does not hold the context lock on close of:" + connection, oops); } - close(); + silentClose(); IOException ioe = IOExceptionSupport.create(e); if (persistenceAdapter.getBrokerService() != null) { persistenceAdapter.getBrokerService().handleIOException(ioe); @@ -137,45 +137,39 @@ public class TransactionContext { } finally { try { p.close(); - } catch (Throwable e) { - } + } catch (Throwable ignored) {} } } + private void silentClose() { + silentClosePreparedStatements(); + if (connection != null) { + try { + connection.close(); + } catch (Throwable ignored) {} + connection = null; + } + } + + public void close() throws IOException { if (!inTx) { try { - - /** - * we are not in a transaction so should not be committing ?? - * This was previously commented out - but had adverse affects - * on testing - so it's back! - * - */ - try { + // can be null for topic ops that bypass the store via existing cursor state + if (connection != null) { + final boolean needsCommit = !connection.getAutoCommit(); executeBatch(); - } finally { - if (connection != null && !connection.getAutoCommit()) { + if (needsCommit) { connection.commit(); } } - } catch (SQLException e) { JDBCPersistenceAdapter.log("Error while closing connection: ", e); IOException ioe = IOExceptionSupport.create(e); persistenceAdapter.getBrokerService().handleIOException(ioe); throw ioe; } finally { - try { - if (connection != null) { - connection.close(); - } - } catch (Throwable e) { - // ignore - LOG.trace("Closing connection failed due: " + e.getMessage() + ". This exception is ignored.", e); - } finally { - connection = null; - } + silentClose(); for (Runnable completion: completions) { completion.run(); } @@ -197,8 +191,9 @@ public class TransactionContext { throw new IOException("Not started."); } try { + final boolean needsCommit = !connection.getAutoCommit(); executeBatch(); - if (!connection.getAutoCommit()) { + if (needsCommit) { connection.commit(); } } catch (SQLException e) { @@ -230,19 +225,23 @@ public class TransactionContext { } } - private void doRollback() throws SQLException { - if (addMessageStatement != null) { - addMessageStatement.close(); - addMessageStatement = null; - } - if (removedMessageStatement != null) { - removedMessageStatement.close(); - removedMessageStatement = null; - } - if (updateLastAckStatement != null) { - updateLastAckStatement.close(); - updateLastAckStatement = null; + private PreparedStatement silentClosePreparedStatement(PreparedStatement preparedStatement) { + if (preparedStatement != null) { + try { + preparedStatement.close(); + } catch (Throwable ignored) {} } + return null; + } + + private void silentClosePreparedStatements() { + addMessageStatement = silentClosePreparedStatement(addMessageStatement); + removedMessageStatement = silentClosePreparedStatement(removedMessageStatement); + updateLastAckStatement = silentClosePreparedStatement(updateLastAckStatement); + } + + private void doRollback() throws SQLException { + silentClosePreparedStatements(); completions.clear(); connection.rollback(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java index 387e77f..6a8b3f4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; */ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class); - public boolean prioritySupport = false; + public boolean prioritySupport = true; public void testPreparedJmxView() throws Exception { @@ -712,7 +712,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport { } - public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() { + public void x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() { addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); } http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java index 6972a14..2b498a3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java @@ -76,19 +76,23 @@ public class JDBCCommitExceptionTest extends TestCase { broker.stop(); } - protected void dumpMessages() throws Exception { + protected int dumpMessages() throws Exception { + int count = 0; WireFormat wireFormat = new OpenWireFormat(); java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection(); - PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS"); + PreparedStatement statement = conn.prepareStatement("SELECT ID, XID, MSG FROM ACTIVEMQ_MSGS"); ResultSet result = statement.executeQuery(); LOG.info("Messages left in broker after test"); while(result.next()) { long id = result.getLong(1); - org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2))); - LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message); + String xid = result.getString(2); + org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(3))); + LOG.info("id: " + id + ", xid: " + xid + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message); + count++; } statement.close(); conn.close(); + return count; } protected int receiveMessages(int messagesExpected) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java index 046ab81..1529515 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java @@ -21,16 +21,72 @@ import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Session; import javax.jms.XAConnection; import javax.jms.XASession; +import javax.management.ObjectName; +import javax.sql.DataSource; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.io.InputStream; +import java.io.PrintWriter; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLRecoverableException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.DiscardingDeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,4 +222,1099 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest { assertEquals("one enque", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); } + + final AtomicInteger getAutoCommitCount = new AtomicInteger(); + private ArrayList<Integer> getAutoCommitErrors = new ArrayList<Integer>(); + private ArrayList<Integer> executeUpdateErrorOps = new ArrayList<Integer>(); + final AtomicInteger executeUpdateErrorOpsCount = new AtomicInteger(); + private ArrayList<Integer> executeBatchErrorOps = new ArrayList<Integer>(); + final AtomicInteger executeBatchErrorOpsCount = new AtomicInteger(); + + public void testXAEnqueueErrors() throws Exception { + getAutoCommitCount.set(0); + getAutoCommitErrors.clear(); + executeUpdateErrorOpsCount.set(0); + executeUpdateErrorOps.clear(); + + broker.stop(); + broker = new BrokerService(); + broker.setAdvisorySupport(false); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + + //broker.setDeleteAllMessagesOnStartup(true); + + JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + DataSource realDataSource = jdbc.getDataSource(); + jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource)); + jdbcPersistenceAdapter.setUseLock(false); + broker.setPersistenceAdapter(jdbcPersistenceAdapter); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + broker.start(); + + + // inject error + executeUpdateErrorOps.add(5); + executeUpdateErrorOps.add(9); + executeUpdateErrorOps.add(12); + + getAutoCommitErrors.add(59); + getAutoCommitErrors.add(60); + + + factory = new ActiveMQXAConnectionFactory(connectionUri); + + XAConnection c = factory.createXAConnection(); + c.start(); + XASession s = c.createXASession(); + final XAResource recoveryResource = s.getXAResource(); + + for (int i = 0; i < 10; i++) { + XAConnection connection = factory.createXAConnection(); + connection.start(); + XASession session = connection.createXASession(); + + Destination destination = session.createQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + XAResource resource = session.getXAResource(); + + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + ActiveMQMessage message = (ActiveMQMessage) session.createMessage(); + message.setTransactionId(new XATransactionId(tid)); + producer.send(message); + + resource.end(tid, XAResource.TMSUCCESS); + resource.prepare(tid); + + try { + resource.commit(tid, false); + } catch (Exception expected) { + expected.printStackTrace(); + + dumpMessages(); + + boolean done = false; + while (!done) { + // recover + Xid[] recovered = recoveryResource.recover(XAResource.TMSTARTRSCAN); + recoveryResource.recover(XAResource.TMNOFLAGS); + + try { + recoveryResource.commit(recovered[0], false); + done = true; + } catch (XAException ok) { + ok.printStackTrace(); + } + } + } + } + + dumpMessages(); + + assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + + + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + + assertEquals("qs", 10, proxy.getQueueSize()); + assertEquals("enq", 10, proxy.getEnqueueCount()); + assertEquals("curs", 10, proxy.cursorSize()); + } + + public void testNonTxEnqueueErrors() throws Exception { + getAutoCommitCount.set(0); + getAutoCommitErrors.clear(); + executeUpdateErrorOpsCount.set(0); + executeUpdateErrorOps.clear(); + executeBatchErrorOps.clear(); + executeBatchErrorOpsCount.set(0); + + broker.stop(); + broker = new BrokerService(); + broker.setAdvisorySupport(false); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + + + JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + DataSource realDataSource = jdbc.getDataSource(); + jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource)); + jdbcPersistenceAdapter.setUseLock(false); + jdbcPersistenceAdapter.setCleanupPeriod(0); + broker.setPersistenceAdapter(jdbcPersistenceAdapter); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + broker.start(); + + + executeBatchErrorOps.add(2); + executeBatchErrorOps.add(3); + getAutoCommitCount.set(0); + getAutoCommitErrors.add(10); + + + factory = new ActiveMQXAConnectionFactory(connectionUri); + + for (int i = 0; i < 10; i++) { + XAConnection connection = factory.createXAConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination = session.createQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + ActiveMQMessage message = (ActiveMQMessage) session.createMessage(); + + try { + producer.send(message); + } catch (Exception expected) { + expected.printStackTrace(); + + dumpMessages(); + + boolean done = false; + while (!done) { + try { + producer.send(message); + done = true; + } catch (Exception ok) { + ok.printStackTrace(); + } + } + } + } + + assertEquals("messages in db", 10, dumpMessages()); + + + assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + + + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + + assertEquals("qs", 10, proxy.getQueueSize()); + assertEquals("enq", 10, proxy.getEnqueueCount()); + assertEquals("curs", 10, proxy.cursorSize()); + } + + public void testNonTxEnqueueOverNetworkErrorsRestart() throws Exception { + getAutoCommitCount.set(0); + getAutoCommitErrors.clear(); + executeUpdateErrorOpsCount.set(0); + executeUpdateErrorOps.clear(); + executeBatchErrorOps.clear(); + executeBatchErrorOpsCount.set(0); + + broker.stop(); + + final AtomicBoolean done = new AtomicBoolean(false); + Thread thread = new Thread() { + @Override + public void run() { + + while (!done.get()) { + try { + + broker = new BrokerService(); + broker.setAdvisorySupport(false); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setUseCache(false); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setDeadLetterStrategy(new DiscardingDeadLetterStrategy()); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + + JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + DataSource realDataSource = jdbc.getDataSource(); + jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource)); + jdbcPersistenceAdapter.setUseLock(false); + jdbcPersistenceAdapter.setCleanupPeriod(0); + broker.setPersistenceAdapter(jdbcPersistenceAdapter); + TransportConnector transportConnector = broker.addConnector("tcp://localhost:61616"); + //transportConnector.setAuditNetworkProducers(true); + connectionUri = transportConnector.getPublishableConnectString(); + DefaultIOExceptionHandler stopOnIOEx = new DefaultIOExceptionHandler(); + stopOnIOEx.setIgnoreSQLExceptions(false); + stopOnIOEx.setStopStartConnectors(false); + broker.setIoExceptionHandler(stopOnIOEx); + broker.start(); + + broker.waitUntilStopped(); + + } catch (Exception oops) { + oops.printStackTrace(); + done.set(true); + } + } + } + }; + thread.start(); + + //executeBatchErrorOps.add(5); + //executeBatchErrorOps.add(3); + getAutoCommitCount.set(0); + getAutoCommitErrors.add(39); + + + // network broker to push messages + final BrokerService other = new BrokerService(); + other.setBrokerName("other"); + other.setAdvisorySupport(false); + other.setUseJmx(false); + other.setPersistent(false); + NetworkConnector netwokConnector = other.addNetworkConnector("static://tcp://localhost:61616"); + netwokConnector.setStaticBridge(true); + netwokConnector.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("TEST")})); + other.start(); + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://other"); + ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); + activeMQConnection.setWatchTopicAdvisories(false); + Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + activeMQConnection.start(); + Destination destination = session.createQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + ActiveMQMessage message = (ActiveMQMessage) session.createMessage(); + + for (int i = 0; i < 10; i++) { + producer.send(message); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("MESSAGES DRAINED :" + ((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + return 0 == ((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount(); + } + }); + activeMQConnection.close(); + + + assertEquals("db", 10, dumpMessages()); + assertEquals("messages count", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount()); + + + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + + assertEquals("qs", 10, proxy.getQueueSize()); + assertEquals("curs", 10, proxy.cursorSize()); + + done.set(true); + other.stop(); + } + + + private class TestDataSource implements javax.sql.DataSource { + + private final javax.sql.DataSource realDataSource; + + public TestDataSource(javax.sql.DataSource dataSource) { + realDataSource = dataSource; + } + + @Override + public Connection getConnection() throws SQLException { + Connection autoCommitCheckConnection = new AutoCommitCheckConnection(realDataSource.getConnection()); + return autoCommitCheckConnection; + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + Connection autoCommitCheckConnection = new AutoCommitCheckConnection(realDataSource.getConnection(username, password)); + + return autoCommitCheckConnection; + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return realDataSource.getLogWriter(); + } + + @Override + public void setLogWriter(PrintWriter out) throws SQLException { + realDataSource.setLogWriter(out); + } + + @Override + public void setLoginTimeout(int seconds) throws SQLException { + realDataSource.setLoginTimeout(seconds); + } + + @Override + public int getLoginTimeout() throws SQLException { + return realDataSource.getLoginTimeout(); + } + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + return realDataSource.getParentLogger(); + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return realDataSource.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return realDataSource.isWrapperFor(iface); + } + } + + private class AutoCommitCheckConnection implements Connection { + + private final Connection realConnection; + + public AutoCommitCheckConnection(Connection connection) { + this.realConnection = connection; + } + + @Override + public void commit() throws SQLException { + realConnection.commit(); + } + + // Just plumbing for wrapper. Might have been better to do a Dynamic Proxy here. + + @Override + public Statement createStatement() throws SQLException { + return realConnection.createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + //final AtomicInteger executeCount = new AtomicInteger(); + + final PreparedStatement delegate = realConnection.prepareStatement(sql); + return new PreparedStatement() { + public ResultSet executeQuery() throws SQLException { + return delegate.executeQuery(); + } + + final + public int executeUpdate() throws SQLException { + int ret = delegate.executeUpdate(); + if (executeUpdateErrorOps.contains(executeUpdateErrorOpsCount.incrementAndGet())) { + throw new SQLRecoverableException("SOME executeUpdate ERROR[" + executeUpdateErrorOpsCount.get() +"]"); + } + return ret; + } + + public void setNull(int parameterIndex, int sqlType) throws SQLException { + delegate.setNull(parameterIndex, sqlType); + } + + public void setBoolean(int parameterIndex, boolean x) throws SQLException { + delegate.setBoolean(parameterIndex, x); + } + + public void setByte(int parameterIndex, byte x) throws SQLException { + delegate.setByte(parameterIndex, x); + } + + public void setShort(int parameterIndex, short x) throws SQLException { + delegate.setShort(parameterIndex, x); + } + + public void setInt(int parameterIndex, int x) throws SQLException { + delegate.setInt(parameterIndex, x); + } + + public void setLong(int parameterIndex, long x) throws SQLException { + delegate.setLong(parameterIndex, x); + } + + public void setFloat(int parameterIndex, float x) throws SQLException { + delegate.setFloat(parameterIndex, x); + } + + public void setDouble(int parameterIndex, double x) throws SQLException { + delegate.setDouble(parameterIndex, x); + } + + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + delegate.setBigDecimal(parameterIndex, x); + } + + public void setString(int parameterIndex, String x) throws SQLException { + delegate.setString(parameterIndex, x); + } + + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + delegate.setBytes(parameterIndex, x); + } + + public void setDate(int parameterIndex, Date x) throws SQLException { + delegate.setDate(parameterIndex, x); + } + + public void setTime(int parameterIndex, Time x) throws SQLException { + delegate.setTime(parameterIndex, x); + } + + public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + delegate.setTimestamp(parameterIndex, x); + } + + public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + delegate.setAsciiStream(parameterIndex, x, length); + } + + @Deprecated + public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + delegate.setUnicodeStream(parameterIndex, x, length); + } + + public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + delegate.setBinaryStream(parameterIndex, x, length); + } + + public void clearParameters() throws SQLException { + delegate.clearParameters(); + } + + public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType); + } + + public void setObject(int parameterIndex, Object x) throws SQLException { + delegate.setObject(parameterIndex, x); + } + + public boolean execute() throws SQLException { + return delegate.execute(); + } + + public void addBatch() throws SQLException { + delegate.addBatch(); + } + + public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { + delegate.setCharacterStream(parameterIndex, reader, length); + } + + public void setRef(int parameterIndex, Ref x) throws SQLException { + delegate.setRef(parameterIndex, x); + } + + public void setBlob(int parameterIndex, Blob x) throws SQLException { + delegate.setBlob(parameterIndex, x); + } + + public void setClob(int parameterIndex, Clob x) throws SQLException { + delegate.setClob(parameterIndex, x); + } + + public void setArray(int parameterIndex, Array x) throws SQLException { + delegate.setArray(parameterIndex, x); + } + + public ResultSetMetaData getMetaData() throws SQLException { + return delegate.getMetaData(); + } + + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + delegate.setDate(parameterIndex, x, cal); + } + + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + delegate.setTime(parameterIndex, x, cal); + } + + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + delegate.setTimestamp(parameterIndex, x, cal); + } + + public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + delegate.setNull(parameterIndex, sqlType, typeName); + } + + public void setURL(int parameterIndex, URL x) throws SQLException { + delegate.setURL(parameterIndex, x); + } + + public ParameterMetaData getParameterMetaData() throws SQLException { + return delegate.getParameterMetaData(); + } + + public void setRowId(int parameterIndex, RowId x) throws SQLException { + delegate.setRowId(parameterIndex, x); + } + + public void setNString(int parameterIndex, String value) throws SQLException { + delegate.setNString(parameterIndex, value); + } + + public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException { + delegate.setNCharacterStream(parameterIndex, value, length); + } + + public void setNClob(int parameterIndex, NClob value) throws SQLException { + delegate.setNClob(parameterIndex, value); + } + + public void setClob(int parameterIndex, Reader reader, long length) throws SQLException { + delegate.setClob(parameterIndex, reader, length); + } + + public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException { + delegate.setBlob(parameterIndex, inputStream, length); + } + + public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { + delegate.setNClob(parameterIndex, reader, length); + } + + public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + delegate.setSQLXML(parameterIndex, xmlObject); + } + + public void setObject(int parameterIndex, + Object x, + int targetSqlType, + int scaleOrLength) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength); + } + + public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + delegate.setAsciiStream(parameterIndex, x, length); + } + + public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + delegate.setBinaryStream(parameterIndex, x, length); + } + + public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException { + delegate.setCharacterStream(parameterIndex, reader, length); + } + + public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + delegate.setAsciiStream(parameterIndex, x); + } + + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + delegate.setBinaryStream(parameterIndex, x); + } + + public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { + delegate.setCharacterStream(parameterIndex, reader); + } + + public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { + delegate.setNCharacterStream(parameterIndex, value); + } + + public void setClob(int parameterIndex, Reader reader) throws SQLException { + delegate.setClob(parameterIndex, reader); + } + + public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + delegate.setBlob(parameterIndex, inputStream); + } + + public void setNClob(int parameterIndex, Reader reader) throws SQLException { + delegate.setNClob(parameterIndex, reader); + } +/* + public void setObject(int parameterIndex, + Object x, + SQLType targetSqlType, + int scaleOrLength) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength); + } + + public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException { + delegate.setObject(parameterIndex, x, targetSqlType); + } + + public long executeLargeUpdate() throws SQLException { + return delegate.executeLargeUpdate(); + } +*/ + public ResultSet executeQuery(String sql) throws SQLException { + return delegate.executeQuery(sql); + } + + public int executeUpdate(String sql) throws SQLException { + return delegate.executeUpdate(sql); + } + + public void close() throws SQLException { + delegate.close(); + } + + public int getMaxFieldSize() throws SQLException { + return delegate.getMaxFieldSize(); + } + + public void setMaxFieldSize(int max) throws SQLException { + delegate.setMaxFieldSize(max); + } + + public int getMaxRows() throws SQLException { + return delegate.getMaxRows(); + } + + public void setMaxRows(int max) throws SQLException { + delegate.setMaxRows(max); + } + + public void setEscapeProcessing(boolean enable) throws SQLException { + delegate.setEscapeProcessing(enable); + } + + public int getQueryTimeout() throws SQLException { + return delegate.getQueryTimeout(); + } + + public void setQueryTimeout(int seconds) throws SQLException { + delegate.setQueryTimeout(seconds); + } + + public void cancel() throws SQLException { + delegate.cancel(); + } + + public SQLWarning getWarnings() throws SQLException { + return delegate.getWarnings(); + } + + public void clearWarnings() throws SQLException { + delegate.clearWarnings(); + } + + public void setCursorName(String name) throws SQLException { + delegate.setCursorName(name); + } + + public boolean execute(String sql) throws SQLException { + return delegate.execute(sql); + } + + public ResultSet getResultSet() throws SQLException { + return delegate.getResultSet(); + } + + public int getUpdateCount() throws SQLException { + return delegate.getUpdateCount(); + } + + public boolean getMoreResults() throws SQLException { + return delegate.getMoreResults(); + } + + public void setFetchDirection(int direction) throws SQLException { + delegate.setFetchDirection(direction); + } + + public int getFetchDirection() throws SQLException { + return delegate.getFetchDirection(); + } + + public void setFetchSize(int rows) throws SQLException { + delegate.setFetchSize(rows); + } + + public int getFetchSize() throws SQLException { + return delegate.getFetchSize(); + } + + public int getResultSetConcurrency() throws SQLException { + return delegate.getResultSetConcurrency(); + } + + public int getResultSetType() throws SQLException { + return delegate.getResultSetType(); + } + + public void addBatch(String sql) throws SQLException { + delegate.addBatch(sql); + } + + public void clearBatch() throws SQLException { + delegate.clearBatch(); + } + + public int[] executeBatch() throws SQLException { + if (executeBatchErrorOps.contains(executeBatchErrorOpsCount.incrementAndGet())) { + throw new SQLRecoverableException("SOME executeBatch ERROR[" + executeBatchErrorOpsCount.get() +"]"); + } + return delegate.executeBatch(); + } + + public Connection getConnection() throws SQLException { + return delegate.getConnection(); + } + + public boolean getMoreResults(int current) throws SQLException { + return delegate.getMoreResults(current); + } + + public ResultSet getGeneratedKeys() throws SQLException { + return delegate.getGeneratedKeys(); + } + + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return delegate.executeUpdate(sql, autoGeneratedKeys); + } + + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + return delegate.executeUpdate(sql, columnIndexes); + } + + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + return delegate.executeUpdate(sql, columnNames); + } + + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + return delegate.execute(sql, autoGeneratedKeys); + } + + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + return delegate.execute(sql, columnIndexes); + } + + public boolean execute(String sql, String[] columnNames) throws SQLException { + return delegate.execute(sql, columnNames); + } + + public int getResultSetHoldability() throws SQLException { + return delegate.getResultSetHoldability(); + } + + public boolean isClosed() throws SQLException { + return delegate.isClosed(); + } + + public void setPoolable(boolean poolable) throws SQLException { + delegate.setPoolable(poolable); + } + + public boolean isPoolable() throws SQLException { + return delegate.isPoolable(); + } + + public void closeOnCompletion() throws SQLException { + delegate.closeOnCompletion(); + } + + public boolean isCloseOnCompletion() throws SQLException { + return delegate.isCloseOnCompletion(); + } +/* + public long getLargeUpdateCount() throws SQLException { + return delegate.getLargeUpdateCount(); + } + + public void setLargeMaxRows(long max) throws SQLException { + delegate.setLargeMaxRows(max); + } + + public long getLargeMaxRows() throws SQLException { + return delegate.getLargeMaxRows(); + } + + public long[] executeLargeBatch() throws SQLException { + return delegate.executeLargeBatch(); + } + + public long executeLargeUpdate(String sql) throws SQLException { + return delegate.executeLargeUpdate(sql); + } + + public long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return delegate.executeLargeUpdate(sql, autoGeneratedKeys); + } + + public long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException { + return delegate.executeLargeUpdate(sql, columnIndexes); + } + + public long executeLargeUpdate(String sql, String[] columnNames) throws SQLException { + return delegate.executeLargeUpdate(sql, columnNames); + } +*/ + public <T> T unwrap(Class<T> iface) throws SQLException { + return delegate.unwrap(iface); + } + + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return delegate.isWrapperFor(iface); + } + }; + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return realConnection.prepareCall(sql); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return realConnection.nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + realConnection.setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + if (getAutoCommitErrors.contains(getAutoCommitCount.incrementAndGet())) { + throw new SQLRecoverableException("AutoCommit[" + getAutoCommitCount.get() +"]"); + } + return realConnection.getAutoCommit(); + } + + @Override + public void rollback() throws SQLException { + realConnection.rollback(); + } + + @Override + public void close() throws SQLException { + realConnection.close(); + } + + @Override + public boolean isClosed() throws SQLException { + return realConnection.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return realConnection.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + realConnection.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return realConnection.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + realConnection.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return realConnection.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + realConnection.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return realConnection.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return realConnection.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + realConnection.clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return realConnection.createStatement(resultSetType, resultSetConcurrency); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return realConnection.getTypeMap(); + } + + @Override + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + realConnection.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + realConnection.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return realConnection.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return realConnection.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return realConnection.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + realConnection.rollback(); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + realConnection.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return realConnection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return realConnection.prepareStatement(sql, autoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return realConnection.prepareStatement(sql, columnIndexes); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return realConnection.prepareStatement(sql, columnNames); + } + + @Override + public Clob createClob() throws SQLException { + return realConnection.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return realConnection.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return realConnection.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return realConnection.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return realConnection.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + realConnection.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + realConnection.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return realConnection.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return realConnection.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return realConnection.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return realConnection.createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + realConnection.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return realConnection.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + realConnection.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + realConnection.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return realConnection.getNetworkTimeout(); + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return realConnection.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return realConnection.isWrapperFor(iface); + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java index 8da0ff6..ee3a88c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java @@ -318,6 +318,7 @@ public class XACompletionTest extends TestSupport { resource.recover(XAResource.TMSTARTRSCAN); resource.recover(XAResource.TMNOFLAGS); + dumpMessages(); Xid tid = createXid(); resource.start(tid, XAResource.TMNOFLAGS); @@ -342,6 +343,9 @@ public class XACompletionTest extends TestSupport { consumer.close(); + LOG.info("after close"); + dumpMessages(); + assertEquals("drain", 5, drainUnack(5, "TEST")); dumpMessages();
