Updated Branches: refs/heads/trunk 6509b1f81 -> f69cbd8ec
AMQ-4950: java.lang.ClassCastException: org.apache.activemq.command.ExceptionResponse cannot be cast to org.apache.activemq.command.IntegerResponse, attempting to automatically reconnect Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f69cbd8e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f69cbd8e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f69cbd8e Branch: refs/heads/trunk Commit: f69cbd8ec6ec7cfa78a83892d32c7bb3bbd3a7d1 Parents: 6509b1f Author: Torsten Mielke <[email protected]> Authored: Fri Dec 20 14:41:02 2013 +0100 Committer: Torsten Mielke <[email protected]> Committed: Mon Jan 6 13:43:26 2014 +0100 ---------------------------------------------------------------------- .../activemq/state/ConnectionStateTracker.java | 10 +- .../org/apache/activemq/bugs/AMQ4950Test.java | 182 +++++++++++++++++++ 2 files changed, 188 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f69cbd8e/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 8fbf81b..5e05a48 100755 --- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -111,10 +111,12 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } public void onResponse(Command command) { - IntegerResponse response = (IntegerResponse) command; - if (XAResource.XA_RDONLY == response.getResult()) { - // all done, no commit or rollback from TM - super.onResponse(command); + if (command instanceof IntegerResponse) { + IntegerResponse response = (IntegerResponse) command; + if (XAResource.XA_RDONLY == response.getResult()) { + // all done, no commit or rollback from TM + super.onResponse(command); + } } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f69cbd8e/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java new file mode 100644 index 0000000..d7d14bd --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java @@ -0,0 +1,182 @@ +/** + * + */ +package org.apache.activemq.bugs; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.XASession; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.ActiveMQXAConnection; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerRestartTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.transport.failover.FailoverTransport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test for AMQ-4950. + * Simulates an error during XA prepare call. + */ +public class AMQ4950Test extends BrokerRestartTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AMQ4950Test.class); + protected static final String simulatedExceptionMessage = "Simulating error inside tx prepare()."; + public boolean prioritySupport = false; + protected String connectionUri = null; + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + broker.setDestinationPolicy(policyMap); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(false); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + broker.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + + @Override + public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { + getNext().prepareTransaction(context, xid); + LOG.debug("BrokerPlugin.prepareTransaction() will throw an exception."); + throw new XAException(simulatedExceptionMessage); + } + + @Override + public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { + LOG.debug("BrokerPlugin.commitTransaction()."); + super.commitTransaction(context, xid, onePhase); + } + } + }); + } + + /** + * Creates XA transaction and invokes XA prepare(). + * Due to registered BrokerFilter prepare will be handled by broker + * but then throw an exception. + * Prior to fixing AMQ-4950, this resulted in a ClassCastException + * in ConnectionStateTracker.PrepareReadonlyTransactionAction.onResponse() + * causing the failover transport to reconnect and replay the XA prepare(). + */ + public void testXAPrepareFailure() throws Exception { + + assertNotNull(connectionUri); + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + connectionUri + ")"); + ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf.createConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + + MessageProducer producer = session.createProducer(session.createQueue(this.getClass().getName())); + Message message = session.createTextMessage("Sample Message"); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + try { + LOG.debug("Calling XA prepare(), expecting an exception"); + int ret = resource.prepare(tid); + if (XAResource.XA_OK == ret) + resource.commit(tid, false); + } catch (XAException xae) { + LOG.info("Received excpected XAException: {}", xae.getMessage()); + LOG.info("Rolling back transaction {}", tid); + + // with bug AMQ-4950 the thrown error reads "Cannot call prepare now" + // we check that we receive the original exception message as + // thrown by the BrokerPlugin + assertEquals(simulatedExceptionMessage, xae.getMessage()); + resource.rollback(tid); + } + // couple of assertions + assertTransactionGoneFromBroker(tid); + assertTransactionGoneFromConnection(broker.getBrokerName(), xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid); + assertTransactionGoneFromFailoverState(xaConnection, tid); + + //cleanup + producer.close(); + session.close(); + xaConnection.close(); + LOG.debug("testXAPrepareFailure() finished."); + } + + + public Xid createXid() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + } + + + private void assertTransactionGoneFromFailoverState( + ActiveMQXAConnection connection1, Xid tid) throws Exception { + + FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class); + TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE); + assertNull("transaction should not exist in the state tracker", + transport.getStateTracker().processCommitTransactionOnePhase(info)); + } + + + private void assertTransactionGoneFromBroker(Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup("localhost"); + TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class); + try { + transactionBroker.getTransaction(null, new XATransactionId(tid), false); + fail("expected exception on tx not found"); + } catch (XAException expectedOnNotFound) { + } + } + + + private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); + CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections(); + for (TransportConnection connection: connections) { + if (connection.getConnectionId().equals(clientId)) { + try { + connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE)); + fail("did not get expected excepton on missing transaction, it must be still there in error!"); + } catch (IllegalStateException expectedOnNoTransaction) { + } + } + } + } +}
