Author: rajdavies Date: Tue Feb 26 07:00:37 2008 New Revision: 631244 URL: http://svn.apache.org/viewvc?rev=631244&view=rev Log: optionally replay messages on the fault tolerant transport
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=631244&r1=631243&r2=631244&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Tue Feb 26 07:00:37 2008 @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.command.Command; @@ -28,6 +30,7 @@ import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; @@ -48,14 +51,24 @@ private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); - + private boolean trackTransactions; private boolean restoreSessions = true; private boolean restoreConsumers = true; private boolean restoreProducers = true; private boolean restoreTransaction = true; - - + private boolean trackMessages = true; + private int maxCacheSize = 128 * 1024; + private int currentCacheSize; + private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){ + protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) { + boolean result = currentCacheSize > maxCacheSize; + currentCacheSize -= eldest.getValue().getSize(); + return result; + } + }; + + private class RemoveTransactionAction implements Runnable { private final TransactionInfo info; @@ -86,6 +99,15 @@ throw IOExceptionSupport.create(e); } } + + public void trackBack(Command command) { + if (trackMessages && command != null && command.isMessage()) { + Message message = (Message) command; + if (message.getTransactionId()==null) { + currentCacheSize+=message.getSize(); + } + } + } public void restore(Transport transport) throws IOException { // Restore the connections. @@ -102,6 +124,10 @@ restoreTransactions(transport, connectionState); } } + //now flush messages + for (Message msg:messageCache.values()) { + transport.oneway(msg); + } } private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { @@ -311,18 +337,22 @@ } public Response processMessage(Message send) throws Exception { - if (trackTransactions && send != null && send.getTransactionId() != null) { - ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); - if (connectionId != null) { - ConnectionState cs = connectionStates.get(connectionId); - if (cs != null) { - TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); - if (transactionState != null) { - transactionState.addCommand(send); + if (send != null) { + if (trackTransactions && send.getTransactionId() != null) { + ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); + if (connectionId != null) { + ConnectionState cs = connectionStates.get(connectionId); + if (cs != null) { + TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); + if (transactionState != null) { + transactionState.addCommand(send); + } } } + return TRACKED_RESPONSE_MARKER; + }else if (trackMessages) { + messageCache.put(send.getMessageId(), send.copy()); } - return TRACKED_RESPONSE_MARKER; } return null; } @@ -481,6 +511,22 @@ public void setRestoreTransaction(boolean restoreTransaction) { this.restoreTransaction = restoreTransaction; + } + + public boolean isTrackMessages() { + return trackMessages; + } + + public void setTrackMessages(boolean trackMessages) { + this.trackMessages = trackMessages; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=631244&r1=631243&r2=631244&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Tue Feb 26 07:00:37 2008 @@ -93,7 +93,7 @@ future.set(response); } else { if (debug) { - LOG.debug("Received unexpected response for command id: " + response.getCorrelationId()); + LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId()); } } } else { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=631244&r1=631243&r2=631244&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue Feb 26 07:00:37 2008 @@ -32,6 +32,7 @@ import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.state.ConnectionStateTracker; @@ -92,6 +93,8 @@ private boolean backup=false; private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>(); private int backupPoolSize=1; + private boolean trackMessages = true; + private int maxCacheSize = 128 * 1024; private final TransportListener myTransportListener = createTransportListener(); @@ -223,6 +226,8 @@ return; } started = true; + stateTracker.setMaxCacheSize(getMaxCacheSize()); + stateTracker.setTrackMessages(isTrackMessages()); if (connectedTransport != null) { stateTracker.restore(connectedTransport); } else { @@ -336,6 +341,22 @@ this.backupPoolSize = backupPoolSize; } + public boolean isTrackMessages() { + return trackMessages; + } + + public void setTrackMessages(boolean trackMessages) { + this.trackMessages = trackMessages; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } + /** * @return Returns true if the command is one sent when a connection * is being closed. @@ -407,6 +428,7 @@ // Send the message. try { connectedTransport.oneway(command); + stateTracker.trackBack(command); } catch (IOException e) { // If the command was not tracked.. we will retry in @@ -548,6 +570,10 @@ protected void restoreTransport(Transport t) throws Exception, IOException { t.start(); + //send information to the broker - informing it we are an ft client + ConnectionControl cc = new ConnectionControl(); + cc.setFaultTolerant(true); + t.oneway(cc); stateTracker.restore(t); for (Iterator<Command> iter2 = requestMap.values().iterator(); iter2.hasNext();) { Command command = iter2.next(); @@ -753,5 +779,4 @@ public void reconnect(URI uri) throws IOException { add(new URI[] {uri}); } - }