Repository: activemq Updated Branches: refs/heads/trunk 33b88d34a -> 8188f7f88
https://issues.apache.org/jira/browse/AMQ-5090 - improve failover transaction tracking Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8188f7f8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8188f7f8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8188f7f8 Branch: refs/heads/trunk Commit: 8188f7f884c96fe2174315be784bea2e560a09ba Parents: 33b88d3 Author: Dejan Bosanac <[email protected]> Authored: Fri Mar 7 12:04:20 2014 +0100 Committer: Dejan Bosanac <[email protected]> Committed: Fri Mar 7 12:04:49 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/state/ConnectionStateTracker.java | 10 +++++----- .../activemq/transport/failover/FailoverTransport.java | 11 ++++++++--- 2 files changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8188f7f8/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index effbc83..2a0cbb9 100755 --- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -537,7 +537,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { @Override public Response processPrepareTransaction(TransactionInfo info) throws Exception { - if (trackTransactions && info != null) { + if (trackTransactions && info != null && info.getTransactionId() != null) { ConnectionId connectionId = info.getConnectionId(); if (connectionId != null) { ConnectionState cs = connectionStates.get(connectionId); @@ -555,7 +555,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { @Override public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { - if (trackTransactions && info != null) { + if (trackTransactions && info != null && info.getTransactionId() != null) { ConnectionId connectionId = info.getConnectionId(); if (connectionId != null) { ConnectionState cs = connectionStates.get(connectionId); @@ -573,7 +573,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { @Override public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { - if (trackTransactions && info != null) { + if (trackTransactions && info != null && info.getTransactionId() != null) { ConnectionId connectionId = info.getConnectionId(); if (connectionId != null) { ConnectionState cs = connectionStates.get(connectionId); @@ -591,7 +591,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { @Override public Response processRollbackTransaction(TransactionInfo info) throws Exception { - if (trackTransactions && info != null) { + if (trackTransactions && info != null && info.getTransactionId() != null) { ConnectionId connectionId = info.getConnectionId(); if (connectionId != null) { ConnectionState cs = connectionStates.get(connectionId); @@ -609,7 +609,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { @Override public Response processEndTransaction(TransactionInfo info) throws Exception { - if (trackTransactions && info != null) { + if (trackTransactions && info != null && info.getTransactionId() != null) { ConnectionId connectionId = info.getConnectionId(); if (connectionId != null) { ConnectionState cs = connectionStates.get(connectionId); http://git-wip-us.apache.org/repos/asf/activemq/blob/8188f7f8/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 2ea1b37..2829d41 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -257,8 +257,8 @@ public class FailoverTransport implements CompositeTransport { if (canReconnect()) { reconnectOk = true; } - LOG.warn("Transport (" + transport + ") failed, reason: " + e - + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect"); + LOG.warn("Transport (" + transport + ") failed, reason: " + + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e); initialized = false; failedConnectTransportURI = connectedTransportURI; @@ -635,11 +635,16 @@ public class FailoverTransport implements CompositeTransport { break; } + Tracked tracked = null; + try { + tracked = stateTracker.track(command); + } catch (IOException ioe) { + LOG.debug("Cannot track the command " + command, ioe); + } // If it was a request and it was not being tracked by // the state tracker, // then hold it in the requestMap so that we can replay // it later. - Tracked tracked = stateTracker.track(command); synchronized (requestMap) { if (tracked != null && tracked.isWaitingForResponse()) { requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
