Repository: activemq Updated Branches: refs/heads/trunk ec249f4da -> f42d56c1f
avoid ugly npe post endpoint disassociate and add additional trace to recovery Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f42d56c1 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f42d56c1 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f42d56c1 Branch: refs/heads/trunk Commit: f42d56c1f8601d96fa53ff59ce69f9a97dd31e9c Parents: ec249f4 Author: gtully <[email protected]> Authored: Mon Feb 17 14:36:40 2014 +0000 Committer: gtully <[email protected]> Committed: Mon Feb 17 14:37:48 2014 +0000 ---------------------------------------------------------------------- .../activemq/ra/ActiveMQEndpointWorker.java | 18 +++++++++--------- .../apache/activemq/ra/LocalAndXATransaction.java | 5 ++++- .../apache/activemq/ra/ServerSessionPoolImpl.java | 8 +++++++- 3 files changed, 20 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f42d56c1/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java index b18ef29..1e12751 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java @@ -294,18 +294,18 @@ public class ActiveMQEndpointWorker { private void connect() { synchronized ( connectWork ) { - if (!running) { - return; - } + if (!running) { + return; + } - try { - workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); - } catch (WorkException e) { - running = false; - LOG.error("Work Manager did not accept work: ", e); + try { + workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); + } catch (WorkException e) { + running = false; + LOG.error("Work Manager did not accept work: ", e); + } } } - } /** * http://git-wip-us.apache.org/repos/asf/activemq/blob/f42d56c1/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java index 0f27393..c6d91ef 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/LocalAndXATransaction.java @@ -127,7 +127,10 @@ public class LocalAndXATransaction implements XAResource, LocalTransaction { } public Xid[] recover(int arg0) throws XAException { - return transactionContext.recover(arg0); + Xid[] answer = null; + answer = transactionContext.recover(arg0); + LOG.trace("{} recover({}) = {}", new Object[]{this, arg0, answer}); + return answer; } public void rollback(Xid arg0) throws XAException { http://git-wip-us.apache.org/repos/asf/activemq/blob/f42d56c1/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index c0c3320..25de03d 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -30,6 +30,7 @@ import javax.jms.Session; import javax.resource.spi.UnavailableException; import javax.resource.spi.endpoint.MessageEndpoint; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQQueueSession; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.ActiveMQTopicSession; @@ -60,7 +61,12 @@ public class ServerSessionPoolImpl implements ServerSessionPool { private ServerSessionImpl createServerSessionImpl() throws JMSException { MessageActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec(); int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession(); - final ActiveMQSession session = (ActiveMQSession)activeMQAsfEndpointWorker.getConnection().createSession(activeMQAsfEndpointWorker.transacted, acknowledge); + final ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection(); + if (connection == null) { + // redispatch of pending prefetched messages after disconnect can have a null connection + return null; + } + final ActiveMQSession session = (ActiveMQSession)connection.createSession(activeMQAsfEndpointWorker.transacted, acknowledge); MessageEndpoint endpoint; try { int batchSize = 0;
