Updated Branches: refs/heads/trunk 0918430dc -> e7703f70e
https://issues.apache.org/jira/browse/AMQ-4977 Don't increase the cache size for repeated pull commands for the same destination + consumer combo since we only keep one instance in the map at any given time. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e7703f70 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e7703f70 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e7703f70 Branch: refs/heads/trunk Commit: e7703f70e0f679d0534379be26aa3de612747f93 Parents: 0918430 Author: Timothy Bish <[email protected]> Authored: Fri Jan 17 12:29:55 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Jan 17 12:29:55 2014 -0500 ---------------------------------------------------------------------- .../apache/activemq/command/MessagePull.java | 18 +++- .../activemq/state/ConnectionStateTracker.java | 106 ++++++++++++++----- .../state/ConnectionStateTrackerTest.java | 100 +++++++++++++++++ 3 files changed, 192 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java index 0ae58c4..e39aeae 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java @@ -20,10 +20,10 @@ import org.apache.activemq.state.CommandVisitor; /** * Used to pull messages on demand. - * + * * @openwire:marshaller code="20" - * - * + * + * */ public class MessagePull extends BaseCommand { @@ -35,10 +35,14 @@ public class MessagePull extends BaseCommand { private MessageId messageId; private String correlationId; + private transient boolean tracked = false; + + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public Response visit(CommandVisitor visitor) throws Exception { return visitor.processMessagePull(this); } @@ -112,4 +116,12 @@ public class MessagePull extends BaseCommand { public void setMessageId(MessageId messageId) { this.messageId = messageId; } + + public void setTracked(boolean tracked) { + this.tracked = tracked; + } + + public boolean isTracked() { + return this.tracked; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java index 5e05a48..41b4577 100755 --- a/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java +++ b/activemq-client/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java @@ -20,8 +20,8 @@ import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Vector; import java.util.Map.Entry; +import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import javax.jms.TransactionRolledBackException; @@ -52,15 +52,15 @@ import org.slf4j.LoggerFactory; /** * Tracks the state of a connection so a newly established transport can be * re-initialized to the state that was tracked. - * - * + * + * */ public class ConnectionStateTracker extends CommandVisitorAdapter { private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class); private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); private static final int MESSAGE_PULL_SIZE = 400; - protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); + protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); private boolean trackTransactions; private boolean restoreSessions = true; @@ -70,8 +70,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { private boolean trackMessages = true; private boolean trackTransactionProducers = true; private int maxCacheSize = 128 * 1024; - private int currentCacheSize; - private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){ + private long currentCacheSize; // use long to prevent overflow for folks who set high max. + + private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){ + @Override protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) { boolean result = currentCacheSize > maxCacheSize; if (result) { @@ -87,7 +89,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return result; } }; - + private class RemoveTransactionAction implements ResponseHandler { private final TransactionInfo info; @@ -95,6 +97,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { this.info = info; } + @Override public void onResponse(Command response) { ConnectionId connectionId = info.getConnectionId(); ConnectionState cs = connectionStates.get(connectionId); @@ -103,13 +106,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } } } - - private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { + private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { public PrepareReadonlyTransactionAction(TransactionInfo info) { super(info); } + @Override public void onResponse(Command command) { if (command instanceof IntegerResponse) { IntegerResponse response = (IntegerResponse) command; @@ -122,11 +125,16 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } /** - * - * + * Entry point for all tracked commands in the tracker. Commands should be tracked before + * there is an attempt to send them on the wire. Upon a successful send of a command it is + * necessary to call the trackBack method to complete the tracking of the given command. + * * @param command + * The command that is to be tracked by this tracker. + * * @return null if the command is not state tracked. - * @throws IOException + * + * @throws IOException if an error occurs during setup of the tracking operation. */ public Tracked track(Command command) throws IOException { try { @@ -137,7 +145,15 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { throw IOExceptionSupport.create(e); } } - + + /** + * Completes the two phase tracking operation for a command that is sent on the wire. Once + * the command is sent successfully to complete the tracking operation or otherwise update + * the state of the tracker. + * + * @param command + * The command that was previously provided to the track method. + */ public void trackBack(Command command) { if (command != null) { if (trackMessages && command.isMessage()) { @@ -146,8 +162,12 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { currentCacheSize = currentCacheSize + message.getSize(); } } else if (command instanceof MessagePull) { - // just needs to be a rough estimate of size, ~4 identifiers - currentCacheSize += MESSAGE_PULL_SIZE; + // We only track one MessagePull per consumer so only add to cache size + // when the command has been marked as tracked. + if (((MessagePull)command).isTracked()) { + // just needs to be a rough estimate of size, ~4 identifiers + currentCacheSize += MESSAGE_PULL_SIZE; + } } } } @@ -171,8 +191,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { restoreTransactions(transport, connectionState); } } - //now flush messages - for (Command msg:messageCache.values()) { + + // now flush messages and MessagePull commands. + for (Command msg : messageCache.values()) { if (LOG.isDebugEnabled()) { LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg)); } @@ -186,7 +207,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { if (LOG.isDebugEnabled()) { LOG.debug("tx: " + transactionState.getId()); } - + // rollback any completed transactions - no way to know if commit got there // or if reply went missing // @@ -203,7 +224,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } } } - + // replay short lived producers that may have been involved in the transaction for (ProducerState producerState : transactionState.getProducerStates().values()) { if (LOG.isDebugEnabled()) { @@ -211,14 +232,14 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } transport.oneway(producerState.getInfo()); } - + for (Command command : transactionState.getCommands()) { if (LOG.isDebugEnabled()) { LOG.debug("tx replay: " + command); } transport.oneway(command); } - + for (ProducerState producerState : transactionState.getProducerStates().values()) { if (LOG.isDebugEnabled()) { LOG.debug("tx remove replayed producer :" + producerState.getInfo()); @@ -226,7 +247,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { transport.oneway(producerState.getInfo().createRemoveCommand()); } } - + for (TransactionInfo command: toRollback) { // respond to the outstanding commit ExceptionResponse response = new ExceptionResponse(); @@ -269,7 +290,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId()); final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete(); - for (ConsumerState consumerState : sessionState.getConsumerStates()) { + for (ConsumerState consumerState : sessionState.getConsumerStates()) { ConsumerInfo infoToSend = consumerState.getInfo(); if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) { infoToSend = consumerState.getInfo().copy(); @@ -319,6 +340,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { } } + @Override public Response processAddDestination(DestinationInfo info) { if (info != null) { ConnectionState cs = connectionStates.get(info.getConnectionId()); @@ -329,6 +351,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processRemoveDestination(DestinationInfo info) { if (info != null) { ConnectionState cs = connectionStates.get(info.getConnectionId()); @@ -339,6 +362,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processAddProducer(ProducerInfo info) { if (info != null && info.getProducerId() != null) { SessionId sessionId = info.getProducerId().getParentId(); @@ -358,6 +382,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processRemoveProducer(ProducerId id) { if (id != null) { SessionId sessionId = id.getParentId(); @@ -377,6 +402,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processAddConsumer(ConsumerInfo info) { if (info != null) { SessionId sessionId = info.getConsumerId().getParentId(); @@ -396,6 +422,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { if (id != null) { SessionId sessionId = id.getParentId(); @@ -416,6 +443,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processAddSession(SessionInfo info) { if (info != null) { ConnectionId connectionId = info.getSessionId().getParentId(); @@ -429,6 +457,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { if (id != null) { ConnectionId connectionId = id.getParentId(); @@ -442,6 +471,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processAddConnection(ConnectionInfo info) { if (info != null) { connectionStates.put(info.getConnectionId(), new ConnectionState(info)); @@ -449,6 +479,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { if (id != null) { connectionStates.remove(id); @@ -456,6 +487,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return TRACKED_RESPONSE_MARKER; } + @Override public Response processMessage(Message send) throws Exception { if (send != null) { if (trackTransactions && send.getTransactionId() != null) { @@ -467,13 +499,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); if (transactionState != null) { transactionState.addCommand(send); - + if (trackTransactionProducers) { // for jmstemplate, track the producer in case it is closed before commit // and needs to be replayed SessionState ss = cs.getSessionState(producerId.getParentId()); ProducerState producerState = ss.getProducerState(producerId); - producerState.setTransactionState(transactionState); + producerState.setTransactionState(transactionState); } } } @@ -486,6 +518,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return null; } + @Override public Response processBeginTransaction(TransactionInfo info) { if (trackTransactions && info != null && info.getTransactionId() != null) { ConnectionId connectionId = info.getConnectionId(); @@ -502,6 +535,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return null; } + @Override public Response processPrepareTransaction(TransactionInfo info) throws Exception { if (trackTransactions && info != null) { ConnectionId connectionId = info.getConnectionId(); @@ -519,6 +553,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return null; } + @Override public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { if (trackTransactions && info != null) { ConnectionId connectionId = info.getConnectionId(); @@ -536,6 +571,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return null; } + @Override public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { if (trackTransactions && info != null) { ConnectionId connectionId = info.getConnectionId(); @@ -553,6 +589,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return null; } + @Override public Response processRollbackTransaction(TransactionInfo info) throws Exception { if (trackTransactions && info != null) { ConnectionId connectionId = info.getConnectionId(); @@ -570,6 +607,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { return null; } + @Override public Response processEndTransaction(TransactionInfo info) throws Exception { if (trackTransactions && info != null) { ConnectionId connectionId = info.getConnectionId(); @@ -592,7 +630,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { if (pull != null) { // leave a single instance in the cache final String id = pull.getDestination() + "::" + pull.getConsumerId(); - messageCache.put(id.intern(), pull); + if (messageCache.put(id.intern(), pull) == null) { + // Only marked as tracked if this is the first request we've seen. + pull.setTracked(true); + } } return null; } @@ -628,7 +669,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { public void setTrackTransactions(boolean trackTransactions) { this.trackTransactions = trackTransactions; } - + public boolean isTrackTransactionProducers() { return this.trackTransactionProducers; } @@ -636,7 +677,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { public void setTrackTransactionProducers(boolean trackTransactionProducers) { this.trackTransactionProducers = trackTransactionProducers; } - + public boolean isRestoreTransaction() { return restoreTransaction; } @@ -661,6 +702,13 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { this.maxCacheSize = maxCacheSize; } + /** + * @return the current cache size for the Message and MessagePull Command cache. + */ + public long getCurrentCacheSize() { + return this.currentCacheSize; + } + public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) { ConnectionState connectionState = connectionStates.get(connectionId); if (connectionState != null) { @@ -675,7 +723,7 @@ public class ConnectionStateTracker extends CommandVisitorAdapter { if (LOG.isDebugEnabled()) { LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch()); } - transport.oneway(control); + transport.oneway(control); } catch (Exception ex) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to submit control for consumer: " + control.getConsumerId() http://git-wip-us.apache.org/repos/asf/activemq/blob/e7703f70/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java b/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java new file mode 100644 index 0000000..1a46757 --- /dev/null +++ b/activemq-client/src/test/java/org/apache/activemq/state/ConnectionStateTrackerTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.state; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.SessionId; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ConnectionStateTrackerTest { + + private final ActiveMQQueue queue = new ActiveMQQueue("Test"); + private ConnectionId testConnectionId; + private SessionId testSessionId; + + private int connectionId = 0; + private int sessionId = 0; + private int consumerId = 0; + + @Before + public void setUp() throws Exception { + testConnectionId = createConnectionId(); + testSessionId = createSessionId(testConnectionId); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testCacheSizeWithMessagePulls() throws IOException { + + final ConsumerId consumer1 = createConsumerId(testSessionId); + + ConnectionStateTracker tracker = new ConnectionStateTracker(); + + assertEquals(0, tracker.getCurrentCacheSize()); + + MessagePull pullCommand = createPullCommand(consumer1); + tracker.track(pullCommand); + + assertEquals(0, tracker.getCurrentCacheSize()); + + tracker.trackBack(pullCommand); + long currentSize = tracker.getCurrentCacheSize(); + + assertTrue(currentSize > 0); + + pullCommand = createPullCommand(consumer1); + tracker.track(pullCommand); + tracker.trackBack(pullCommand); + + assertEquals(currentSize, tracker.getCurrentCacheSize()); + } + + private MessagePull createPullCommand(ConsumerId id) { + MessagePull pullCommand = new MessagePull(); + pullCommand.setDestination(queue); + pullCommand.setConsumerId(id); + return pullCommand; + } + + private ConnectionId createConnectionId() { + ConnectionId id = new ConnectionId(); + id.setValue(UUID.randomUUID() + ":" + connectionId++); + return id; + } + + private SessionId createSessionId(ConnectionId connectionId) { + return new SessionId(connectionId, sessionId++); + } + + private ConsumerId createConsumerId(SessionId sessionId) { + return new ConsumerId(sessionId, consumerId++); + } +}
