Repository: activemq Updated Branches: refs/heads/trunk 2360fb859 -> e8818fafe
https://issues.apache.org/jira/browse/AMQ-5080 - fix up rar transacted delivery and redelivery processing, xarecovery and transaction completion afer failover - using failover maxReconnectAttempts=0 to avoid blocking periodic recovery and negate replay of aborted transaction state Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e8818faf Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e8818faf Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e8818faf Branch: refs/heads/trunk Commit: e8818fafea0c46fb9fc3029b27f5740f55616eef Parents: 2360fb8 Author: gtully <[email protected]> Authored: Mon Mar 3 13:22:30 2014 +0000 Committer: gtully <[email protected]> Committed: Mon Mar 3 13:22:30 2014 +0000 ---------------------------------------------------------------------- .../activemq/broker/ProducerBrokerExchange.java | 15 +- .../activemq/broker/TransportConnection.java | 2 + .../org/apache/activemq/ActiveMQConnection.java | 6 + .../org/apache/activemq/ActiveMQSession.java | 49 ++++++- .../org/apache/activemq/TransactionContext.java | 38 +++-- .../activemq/command/XATransactionId.java | 45 +++++- .../activemq/ra/ActiveMQConnectionFactory.java | 4 +- .../activemq/ra/ActiveMQEndpointWorker.java | 4 +- .../activemq/ra/ActiveMQManagedConnection.java | 20 +-- .../activemq/ra/ActiveMQResourceAdapter.java | 146 ++++++++++++++++--- .../activemq/ra/LocalAndXATransaction.java | 6 +- .../apache/activemq/ra/ServerSessionImpl.java | 9 +- .../activemq/ra/ServerSessionPoolImpl.java | 19 ++- .../ra/ActiveMQConnectionFactoryTest.java | 6 + 14 files changed, 306 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index b3b383e..bf1d21e 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -143,15 +143,22 @@ public class ProducerBrokerExchange { long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId()); if (producerSequenceId <= lastStoredForMessageProducer) { canDispatch = false; - LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ + LOG.warn("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{ (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastStoredForMessageProducer }); } } else if (producerSequenceId <= lastSendSequenceNumber.get()) { canDispatch = false; - LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] less than last stored: {}", new Object[]{ - (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber - }); + if (messageSend.isInTransaction()) { + LOG.warn("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{ + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber + }); + } else { + LOG.debug("suppressing duplicated message send [{}] with producerSequenceId [{}] <= last stored: {}", new Object[]{ + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()), producerSequenceId, lastSendSequenceNumber + }); + + } } else { // track current so we can suppress duplicates later in the stream lastSendSequenceNumber.set(producerSequenceId); http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 65d044b..557b88c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -503,6 +503,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor { ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); if (consumerExchange != null) { broker.acknowledge(consumerExchange, ack); + } else if (ack.isInTransaction()) { + LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); } return null; } http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index d5f1c17..3f17c1b 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -1849,6 +1849,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon dispatchers.remove(consumerId); } + public boolean hasDispatcher(ConsumerId consumerId) { + return dispatchers.containsKey(consumerId); + } + /** * @param o - the command to consume */ @@ -1878,6 +1882,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon md.setMessage(msg); } dispatcher.dispatch(md); + } else { + LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); } return null; } http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 47ed980..1f9ef32 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -649,7 +649,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } } + final AtomicInteger clearRequestsCounter = new AtomicInteger(0); void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { + clearRequestsCounter.incrementAndGet(); executor.clearMessagesInProgress(); // we are called from inside the transport reconnection logic which involves us // clearing all the connections' consumers dispatch and delivered lists. So rather @@ -860,9 +862,25 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta while ((messageDispatch = executor.dequeueNoWait()) != null) { final MessageDispatch md = messageDispatch; ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); - if (message.isExpired() || connection.isDuplicate(ActiveMQSession.this, message)) { - // TODO: Ack it without delivery to client - continue; + + MessageAck earlyAck = null; + if (message.isExpired()) { + earlyAck = new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1); + } else if (connection.isDuplicate(ActiveMQSession.this, message)) { + LOG.debug("{} got duplicate: {}", this, message.getMessageId()); + earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); + earlyAck.setFirstMessageId(md.getMessage().getMessageId()); + earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); + } + if (earlyAck != null) { + try { + asyncSendPacket(earlyAck); + } catch (Throwable t) { + LOG.error("error dispatching ack: {} ", earlyAck, t); + connection.onClientInternalException(t); + } finally { + continue; + } } if (isClientAcknowledge()||isIndividualAcknowledge()) { @@ -886,16 +904,36 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta if (ack.getTransactionId() != null) { getTransactionContext().addSynchronization(new Synchronization() { - @Override + final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); public void beforeEnd() throws Exception { - asyncSendPacket(ack); + // validate our consumer so we don't push stale acks that get ignored + if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { + LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); + throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); + } + LOG.trace("beforeEnd ack {}", ack); + sendAck(ack); } @Override public void afterRollback() throws Exception { + LOG.trace("rollback {}", ack, new Throwable("here")); md.getMessage().onMessageRolledBack(); // ensure we don't filter this as a duplicate connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); + + // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect + if (clearRequestsCounter.get() > clearRequestCount) { + LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); + return; + } + + // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched + if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { + LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); + return; + } + RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); int redeliveryCounter = md.getMessage().getRedeliveryCounter(); if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES @@ -932,6 +970,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta }); } + LOG.trace("{} onMessage({})", this, message.getMessageId()); messageListener.onMessage(message); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index 43aacf7..e780783 100755 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -40,6 +40,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.transaction.Synchronization; +import org.apache.activemq.transport.failover.FailoverTransport; import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.slf4j.Logger; @@ -71,9 +72,8 @@ public class TransactionContext implements XAResource { private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = new HashMap<TransactionId, List<TransactionContext>>(); - private final ActiveMQConnection connection; + private ActiveMQConnection connection; private final LongSequenceGenerator localTransactionIdGenerator; - private final ConnectionId connectionId; private List<Synchronization> synchronizations; // To track XA transactions. @@ -82,10 +82,14 @@ public class TransactionContext implements XAResource { private LocalTransactionEventListener localTransactionEventListener; private int beforeEndIndex; + // for RAR recovery + public TransactionContext() { + localTransactionIdGenerator = null; + } + public TransactionContext(ActiveMQConnection connection) { this.connection = connection; this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); - this.connectionId = connection.getConnectionInfo().getConnectionId(); } public boolean isInXATransaction() { @@ -231,7 +235,7 @@ public class TransactionContext implements XAResource { if (transactionId == null) { synchronizations = null; beforeEndIndex = 0; - this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId()); + this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); this.connection.ensureConnectionInfoSent(); this.connection.asyncSendPacket(info); @@ -646,6 +650,13 @@ public class TransactionContext implements XAResource { TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); try { this.connection.checkClosedOrFailed(); + final FailoverTransport failoverTransport = this.connection.getTransport().narrow(FailoverTransport.class); + if (failoverTransport != null && !failoverTransport.isConnected()) { + // otherwise call will block on reconnect forfeting any app level periodic check + XAException xaException = new XAException("Failover transport not connected: " + this.getConnection().getTransport()); + xaException.errorCode = XAException.XAER_RMERR; + throw xaException; + } this.connection.ensureConnectionInfoSent(); DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); @@ -657,6 +668,7 @@ public class TransactionContext implements XAResource { answer = new XATransactionId[data.length]; System.arraycopy(data, 0, answer, 0, data.length); } + LOG.trace("recover({})={}", flag, answer); return answer; } catch (JMSException e) { throw toXAException(e); @@ -676,7 +688,7 @@ public class TransactionContext implements XAResource { // Helper methods. // // /////////////////////////////////////////////////////////// - private String getResourceManagerId() throws JMSException { + protected String getResourceManagerId() throws JMSException { return this.connection.getResourceManagerId(); } @@ -695,11 +707,11 @@ public class TransactionContext implements XAResource { associatedXid = xid; transactionId = new XATransactionId(xid); - TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN); + TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); try { this.connection.asyncSendPacket(info); if (LOG.isDebugEnabled()) { - LOG.debug("Started XA transaction: " + transactionId); + LOG.debug("{} started XA transaction {} ", this, transactionId); } } catch (JMSException e) { disassociate(); @@ -709,11 +721,11 @@ public class TransactionContext implements XAResource { } else { if (transactionId != null) { - TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END); + TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); try { syncSendPacketWithInterruptionHandling(info); if (LOG.isDebugEnabled()) { - LOG.debug("Ended XA transaction: " + transactionId); + LOG.debug("{} ended XA transaction {}", this, transactionId); } } catch (JMSException e) { disassociate(); @@ -800,6 +812,14 @@ public class TransactionContext implements XAResource { return connection; } + + // for RAR xa recovery where xaresource connection is per request + public ActiveMQConnection setConnection(ActiveMQConnection connection) { + ActiveMQConnection existing = this.connection; + this.connection = connection; + return existing; + } + public void cleanup() { associatedXid = null; transactionId = null; http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java b/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java index 5f786e5..84fea7a 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/XATransactionId.java @@ -102,19 +102,52 @@ public class XATransactionId extends TransactionId implements Xid, Comparable { if (transactionKey == null) { StringBuffer s = new StringBuffer(); s.append("XID:[" + formatId + ",globalId="); - for (int i = 0; i < globalTransactionId.length; i++) { - s.append(Integer.toHexString(globalTransactionId[i])); - } + s.append(stringForm(formatId, globalTransactionId)); s.append(",branchId="); - for (int i = 0; i < branchQualifier.length; i++) { - s.append(Integer.toHexString(branchQualifier[i])); - } + s.append(stringForm(formatId, branchQualifier)); s.append("]"); transactionKey = s.toString(); } return transactionKey; } + private String stringForm(int format, byte[] uid) { + StringBuffer s = new StringBuffer(); + switch (format) { + case 131077: // arjuna + stringFormArj(s, uid); + break; + default: // aries + stringFormDefault(s, uid); + } + return s.toString(); + } + + private void stringFormDefault(StringBuffer s, byte[] uid) { + for (int i = 0; i < uid.length; i++) { + s.append(Integer.toHexString(uid[i])); + } + } + + private void stringFormArj(StringBuffer s, byte[] uid) { + try { + DataByteArrayInputStream byteArrayInputStream = new DataByteArrayInputStream(uid); + s.append(Long.toString(byteArrayInputStream.readLong(), 16)); + s.append(':'); + s.append(Long.toString(byteArrayInputStream.readLong(), 16)); + s.append(':'); + + s.append(Integer.toString(byteArrayInputStream.readInt(), 16)); + s.append(':'); + s.append(Integer.toString(byteArrayInputStream.readInt(), 16)); + s.append(':'); + s.append(Integer.toString(byteArrayInputStream.readInt(), 16)); + + } catch (Exception ignored) { + stringFormDefault(s, uid); + } + } + public String toString() { return getTransactionKey(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java index 4ef3684..234fc30 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQConnectionFactory.java @@ -98,7 +98,9 @@ public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnec throw (JMSException)e.getCause(); } LOG.debug("Connection could not be created:", e); - throw new JMSException(e.getMessage()); + JMSException jmsException = new JMSException(e.getMessage()); + jmsException.setLinkedException(e); + throw jmsException; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java index 1e12751..ac35959 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java @@ -235,7 +235,7 @@ public class ActiveMQEndpointWorker { c.close(); } } catch (JMSException e) { - // + LOG.trace("failed to close c {}", c, e); } } @@ -249,7 +249,7 @@ public class ActiveMQEndpointWorker { cc.close(); } } catch (JMSException e) { - // + LOG.trace("failed to close cc {}", cc, e); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java index f8caf09..a694c12 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQManagedConnection.java @@ -201,13 +201,15 @@ public class ActiveMQManagedConnection implements ManagedConnection, ExceptionLi return; } - cleanup(); - try { - physicalConnection.close(); - destroyed = true; - } catch (JMSException e) { - LOG.info("Error occurred during close of a JMS connection.", e); + cleanup(); + } finally { + try { + physicalConnection.close(); + destroyed = true; + } catch (JMSException e) { + LOG.trace("Error occurred during close of a JMS connection.", e); + } } } @@ -233,10 +235,10 @@ public class ActiveMQManagedConnection implements ManagedConnection, ExceptionLi physicalConnection.cleanup(); } catch (JMSException e) { throw new ResourceException("Could cleanup the ActiveMQ connection: " + e, e); + } finally { + // defer transaction cleanup till after close so that close is aware of the current tx + localAndXATransaction.cleanup(); } - // defer transaction cleanup till after close so that close is aware of the current tx - localAndXATransaction.cleanup(); - } /** http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index 68b2178..32fdb13 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -16,22 +16,20 @@ */ package org.apache.activemq.ra; -import java.lang.reflect.Method; import java.net.URI; import java.util.HashMap; -import javax.jms.Connection; import javax.jms.JMSException; -import javax.jms.XAConnection; -import javax.jms.XASession; import javax.resource.NotSupportedException; import javax.resource.ResourceException; import javax.resource.spi.ActivationSpec; import javax.resource.spi.BootstrapContext; import javax.resource.spi.ResourceAdapterInternalException; import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; @@ -39,6 +37,8 @@ import org.apache.activemq.TransactionContext; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.util.ServiceSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Knows how to connect to one ActiveMQ server. It can then activate endpoints @@ -50,7 +50,7 @@ import org.apache.activemq.util.ServiceSupport; * */ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implements MessageResourceAdapter { - + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQResourceAdapter.class); private final HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker> endpointWorkers = new HashMap<ActiveMQEndpointActivationKey, ActiveMQEndpointWorker>(); private BootstrapContext bootstrapContext; @@ -233,21 +233,129 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement */ public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException { try { - return new XAResource[]{(XAResource) - java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{XAResource.class}, - new java.lang.reflect.InvocationHandler () { - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - ActiveMQConnection connection = makeConnection(); - try { - return method.invoke(new TransactionContext(connection), args); - } finally { - try { - connection.close(); - } catch (Throwable ignore) {} - } + return new XAResource[]{ + new TransactionContext() { + + @Override + public boolean isSameRM(XAResource xaresource) throws XAException { + ActiveMQConnection original = null; + try { + original = setConnection(newConnection()); + boolean result = super.isSameRM(xaresource); + LOG.trace("{}.recover({})={}", getConnection(), xaresource, result); + return result; + + } catch (JMSException e) { + LOG.trace("isSameRM({}) failed", xaresource, e); + XAException xaException = new XAException(e.getMessage()); + throw xaException; + } finally { + closeConnection(original); + } + } + + @Override + protected String getResourceManagerId() throws JMSException { + ActiveMQConnection original = null; + try { + original = setConnection(newConnection()); + return super.getResourceManagerId(); + } finally { + closeConnection(original); + } + } + + @Override + public void commit(Xid xid, boolean onePhase) throws XAException { + ActiveMQConnection original = null; + try { + setConnection(newConnection()); + super.commit(xid, onePhase); + LOG.trace("{}.commit({},{})", getConnection(), xid); + + } catch (JMSException e) { + LOG.trace("{}.commit({},{}) failed", getConnection(), xid, onePhase, e); + throwXAException(e); + } finally { + closeConnection(original); + } + } + + @Override + public void rollback(Xid xid) throws XAException { + ActiveMQConnection original = null; + try { + original = setConnection(newConnection()); + super.rollback(xid); + LOG.trace("{}.rollback({})", getConnection(), xid); + + } catch (JMSException e) { + LOG.trace("{}.rollback({}) failed", getConnection(), xid, e); + throwXAException(e); + } finally { + closeConnection(original); + } + } + + @Override + public Xid[] recover(int flags) throws XAException { + Xid[] result = new Xid[]{}; + ActiveMQConnection original = null; + try { + original = setConnection(newConnection()); + result = super.recover(flags); + LOG.trace("{}.recover({})={}", getConnection(), flags, result); + + } catch (JMSException e) { + LOG.trace("{}.recover({}) failed", getConnection(), flags, e); + throwXAException(e); + } finally { + closeConnection(original); + } + return result; + } + + @Override + public void forget(Xid xid) throws XAException { + ActiveMQConnection original = null; + try { + original = setConnection(newConnection()); + super.forget(xid); + LOG.trace("{}.forget({})", getConnection(), xid); + + } catch (JMSException e) { + LOG.trace("{}.forget({}) failed", getConnection(), xid, e); + throwXAException(e); + } finally { + closeConnection(original); + } + } + + private void throwXAException(JMSException e) throws XAException { + XAException xaException = new XAException(e.getMessage()); + xaException.errorCode = XAException.XAER_RMFAIL; + throw xaException; + } + + private ActiveMQConnection newConnection() throws JMSException { + ActiveMQConnection connection = makeConnection(); + connection.start(); + return connection; + } + + private void closeConnection(ActiveMQConnection original) { + ActiveMQConnection connection = getConnection(); + if (connection != null) { + try { + connection.close(); + } catch (JMSException ignored) { + + } finally { + setConnection(original); } - })}; + } + } + }}; } catch (Exception e) { throw new ResourceException(e); http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java index c6d91ef..f93ee0f 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java @@ -33,13 +33,17 @@ import org.slf4j.LoggerFactory; public class LocalAndXATransaction implements XAResource, LocalTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalAndXATransaction.class); - private final TransactionContext transactionContext; + private TransactionContext transactionContext; private boolean inManagedTx; public LocalAndXATransaction(TransactionContext transactionContext) { this.transactionContext = transactionContext; } + public void setTransactionContext(TransactionContext transactionContext) { + this.transactionContext = transactionContext; + } + public void setInManagedTx(boolean inManagedTx) throws JMSException { this.inManagedTx = inManagedTx; if (!inManagedTx) { http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java index 27c75b1..a4382ee 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java @@ -168,15 +168,15 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D if ( session.isRunning() ) { session.run(); } else { - log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale"); + log.debug("JMS Session {} with unconsumed {} is no longer running (maybe due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size()); stale = true; } } catch (Throwable e) { stale = true; if ( log.isDebugEnabled() ) { - log.debug("Endpoint failed to process message.", e); + log.debug("Endpoint {} failed to process message.", session, e); } else if ( log.isInfoEnabled() ) { - log.info("Endpoint failed to process message. Reason: " + e.getMessage()); + log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), session); } } finally { InboundContextSupport.unregister(this); @@ -190,6 +190,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D } if (!session.hasUncomsumedMessages()) { runningFlag = false; + log.debug("Session has no unconsumed message, returning to pool"); pool.returnToPool(this); break; } @@ -255,7 +256,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D */ @Override public String toString() { - return "ServerSessionImpl:" + serverSessionId; + return "ServerSessionImpl:" + serverSessionId + "{" + session +"}"; } public void close() { http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index 25de03d..393ed35 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -227,11 +227,24 @@ public class ServerSessionPoolImpl implements ServerSessionPool { try { ActiveMQSession session = (ActiveMQSession)ss.getSession(); List l = session.getUnconsumedMessages(); - for (Iterator i = l.iterator(); i.hasNext();) { - dispatchToSession((MessageDispatch)i.next()); + if (!l.isEmpty()) { + ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); + if (connection != null) { + for (Iterator i = l.iterator(); i.hasNext();) { + MessageDispatch md = (MessageDispatch)i.next(); + if (connection.hasDispatcher(md.getConsumerId())) { + dispatchToSession(md); + LOG.trace("on remove of {} redispatch of {}", session, md); + } else { + LOG.trace("on remove not redispatching {}, dispatcher no longer present on {}", md, session.getConnection()); + } + } + } else { + LOG.trace("on remove of {} not redispatching while disconnected", session); + } } } catch (Throwable t) { - LOG.error("Error redispatching unconsumed messages from stale session", t); + LOG.error("Error redispatching unconsumed messages from stale server session {}", ss, t); } ss.close(); synchronized (closing) { http://git-wip-us.apache.org/repos/asf/activemq/blob/e8818faf/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java index 2191148..e511a12 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionFactoryTest.java @@ -117,5 +117,11 @@ public class ActiveMQConnectionFactoryTest extends TestCase { assertEquals("one resource", 1, resources.length); assertEquals("no pending transactions", 0, resources[0].recover(100).length); + + // validate equality + XAResource[] resource2 = ra.getXAResources(null); + assertEquals("one resource", 1, resource2.length); + assertTrue("isSameRM true", resources[0].isSameRM(resource2[0])); + assertFalse("no tthe same instance", resources[0].equals(resource2[0])); } }
