[
https://issues.apache.org/jira/browse/AMQ-3732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13217443#comment-13217443
]
Phillip Henry commented on AMQ-3732:
------------------------------------
OK, it's not quite a unit test - it's a stress test that uses JUnit - but it
illustrates the point.
This code fails on my Mac (2.66GHz Intel Core 2 Duo) after a non-deterministic
number of iterations.
Most of the code is just simple stub implementations of ActiveMQ classes and
interfaces with one or two methods implemented or overriden. (I tried using
JMock but it didn't seem multi-threaded friendly). Basically, it starts two
threads - one to pull a message, one to acknowledge. The pulling thread should
increase prefetchExtension in pullMessage() thus:
{code}
synchronized(this) {
prefetchExtension++;
dispatchCounterBeforePull = dispatchCounter;
}
{code}
The other thread calls acknowledge with the stubbed objects and
PrefetchSubscription object itself in such a state that this line is executed:
{code}
prefetchExtension = Math.max(prefetchExtension,
index + 1);
{code}
where index is the index of the number of dispatched MessageReference
collection. This should never be more than 0 since only message is ever
dispatch()-ed by the test. All objects are instantiated anew on each iteration.
Given this, prefetchExtension should only ever be 0 (neither test has hit their
target code) or 1 (from prefetchExtension++ or Math.max(prefetchExtension, 1)
because index is always 0).
However, the test demonstrates that occasionally prefetchExtension is 2. I
posit that this is because the threads can sometimes execute their lines at the
same time; prefetchExtension++ is not atomic (it's actually prefetchExtension =
prefetchExtension + 1); and changing it is not guarded by the same mutex.
For example, the order might look like:
Step 1. PULL THREAD: add 1 to prefetchExtension. To do this, find out what
prefetchExtension is. But before this thread can do so, context switches to the
thread executing acknowledge() in step 2, immediately below.
Step 2. ACK THREAD: prefetchExtension is the maximum of its current value and
1. It looks like its current value is 0 so set prefetchExtension to 1.
Step 3. PULL THREAD: has found that prefetchExtension is 1 and adds one to it
giving a total of 2.
Test case:
{code}
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import junit.framework.TestCase;
public class PrefetchSubscriptionStressTest extends TestCase {
private PrefetchSubscriptionStub toTest;
private final MessageId messageId = new MessageId();
private MessageAck ack;
private MessageReference node;
protected void setUp() throws Exception {
super.setUp();
}
public void testMultiThreadedAccessToPrefetchExtension() throws
InterruptedException, InvalidSelectorException {
final int callsPerThread = 1;
final int numIterations = 1000;
for (int i = 0 ; i < numIterations ; i++) {
final MessagePull pull = initializeStubs();
Thread messagePuller = new Thread(new
MessagePuller(callsPerThread, pull));
Thread dispatchAndAcknowledger = new Thread(new
DispatchAndAcknowledger(ack, node, callsPerThread));
startThreads(messagePuller, dispatchAndAcknowledger);
waitForThreadsToStop(messagePuller,
dispatchAndAcknowledger);
int actualPrefetchExtensions =
toTest.getPrefetchExtension();
assertEquals("failed on iteration: " + i, 1,
actualPrefetchExtensions);
}
}
private void waitForThreadsToStop(Thread messagePuller, Thread
dispatchAndAcknowledger) throws InterruptedException {
dispatchAndAcknowledger.join();
messagePuller.join();
}
private void startThreads(Thread messagePuller, Thread
dispatchAndAcknowledger) {
messagePuller.start();
dispatchAndAcknowledger.start();
}
private MessagePull initializeStubs() throws InvalidSelectorException {
initialise();
primeMessagesAndDestinations();
final MessagePull pull = primeForPullMessage();
return pull;
}
private void dispatchAndAcknowledge(final MessageAck ack, final
MessageReference node, final int numToDispatch)
throws IOException, Exception {
ConnectionContext context = null;
for (int i = 0 ; i < numToDispatch ; i++) {
toTest.dispatch(node);
}
toTest.acknowledge(context , ack);
}
public class MessagePuller implements Runnable {
private final int totalRuns;
private final MessagePull pull;
public MessagePuller(int totalRuns, MessagePull pull) {
super();
this.totalRuns = totalRuns;
this.pull = pull;
}
@Override
public void run() {
for (int i = 0 ; i < totalRuns ; i++) {
try {
pullMessage(pull);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void pullMessage(final MessagePull pull) throws Exception {
ConnectionContext context = null;
toTest.pullMessage(context, pull);
}
private MessagePull primeForPullMessage() {
final MessagePull pull = new MessagePull() {
@Override
public long getTimeout() {
return -1;
}
};
return pull;
}
public class DispatchAndAcknowledger implements Runnable {
private final MessageAck ack;
private final MessageReference node;
private final int totalRuns;
public DispatchAndAcknowledger(MessageAck ack, MessageReference
node, int totalRuns) {
super();
this.ack = ack;
this.node = node;
this.totalRuns = totalRuns;
}
@Override
public void run() {
for (int i = 0 ; i < totalRuns ; i++) {
try {
dispatchAndAcknowledge(ack, node, 1);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void initialise() throws InvalidSelectorException {
SystemUsage usageManager = null;
final Connection connection = new Connection() {
@Override
public void stop() throws Exception {
}
@Override
public void start() throws Exception {
}
@Override
public void updateClient(ConnectionControl control) {
}
@Override
public void serviceExceptionAsync(IOException e) {
}
@Override
public void serviceException(Throwable error) {
}
@Override
public Response service(Command command) { return null;
}
@Override
public boolean isSlow() { return false; }
@Override
public boolean isNetworkConnection() { return false; }
@Override
public boolean isManageable() { return false; }
@Override
public boolean isFaultTolerantConnection() { return
false; }
@Override
public boolean isConnected() { return false; }
@Override
public boolean isBlocked() { return false; }
@Override
public boolean isActive() { return false; }
@Override
public ConnectionStatistics getStatistics() { return
null; }
@Override
public String getRemoteAddress() { return null; }
@Override
public int getDispatchQueueSize() { return 0; }
@Override
public Connector getConnector() { return null; }
@Override
public String getConnectionId() { return null; }
@Override
public void dispatchSync(Command message) { }
@Override
public void dispatchAsync(Command command) { }
};
final ConnectionContext context = new ConnectionContext() {
@Override
public Connection getConnection() {
return connection;
}
};
final Broker broker = new Broker() {
@Override
public void stop() throws Exception { }
@Override
public void start() throws Exception { }
@Override
public void send(ProducerBrokerExchange
producerExchange, Message message)
throws Exception { }
@Override
public void removeSubscription(ConnectionContext
context,
RemoveSubscriptionInfo info) throws
Exception { }
@Override
public void removeDestination(ConnectionContext context,
ActiveMQDestination destination, long
timeout) throws Exception { }
@Override
public void removeConsumer(ConnectionContext context,
ConsumerInfo info)
throws Exception { }
@Override
public void processDispatchNotification(
MessageDispatchNotification
messageDispatchNotification)
throws Exception { }
@Override
public void
processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) { }
@Override
public Response messagePull(ConnectionContext context,
MessagePull pull)
throws Exception { return null; }
@Override
public Set<Destination>
getDestinations(ActiveMQDestination destination) { return null; }
@Override
public Map<ActiveMQDestination, Destination>
getDestinationMap() { return null; }
@Override
public void gc() { }
@Override
public Destination addDestination(ConnectionContext
context,
ActiveMQDestination destination,
boolean createIfTemporary)
throws Exception { return null; }
@Override
public Subscription addConsumer(ConnectionContext
context, ConsumerInfo info)
throws Exception { return null; }
@Override
public void acknowledge(ConsumerBrokerExchange
consumerExchange,
MessageAck ack) throws Exception { }
@Override
public void slowConsumer(ConnectionContext context,
Destination destination, Subscription
subs) { }
@Override
public void setAdminConnectionContext(
ConnectionContext
adminConnectionContext) { }
@Override
public void sendToDeadLetterQueue(ConnectionContext
context,
MessageReference messageReference,
Subscription subscription) { }
@Override
public void rollbackTransaction(ConnectionContext
context, TransactionId xid)
throws Exception { }
@Override
public void removeSession(ConnectionContext context,
SessionInfo info)
throws Exception { }
@Override
public void removeProducer(ConnectionContext context,
ProducerInfo info)
throws Exception { }
@Override
public void removeDestinationInfo(ConnectionContext
context,
DestinationInfo info) throws Exception
{ }
@Override
public void removeConnection(ConnectionContext context,
ConnectionInfo info, Throwable error)
throws Exception { }
@Override
public void removeBroker(Connection connection,
BrokerInfo info) { }
@Override
public int prepareTransaction(ConnectionContext
context, TransactionId xid)
throws Exception { return 0; }
@Override
public void preProcessDispatch(MessageDispatch
messageDispatch) { }
@Override
public void postProcessDispatch(MessageDispatch
messageDispatch) { }
@Override
public void nowMasterBroker() { }
@Override
public void networkBridgeStopped(BrokerInfo brokerInfo)
{ }
@Override
public void networkBridgeStarted(BrokerInfo brokerInfo,
boolean createdByDuplex, String
remoteIp) { }
@Override
public void messageExpired(ConnectionContext context,
MessageReference messageReference,
Subscription subscription) { }
@Override
public void messageDiscarded(ConnectionContext context,
Subscription sub,
MessageReference messageReference) { }
@Override
public void messageDelivered(ConnectionContext context,
MessageReference messageReference) { }
@Override
public void messageConsumed(ConnectionContext context,
MessageReference messageReference) { }
@Override
public boolean isStopped() { return false; }
@Override
public void isFull(ConnectionContext context,
Destination destination,
Usage usage) { }
@Override
public boolean isFaultTolerantConfiguration() { return
false; }
@Override
public boolean isExpired(MessageReference
messageReference) { return false; }
@Override
public URI getVmConnectorURI() { return null; }
@Override
public PListStore getTempDataStore() { return null; }
@Override
public Scheduler getScheduler() { return null; }
@Override
public Broker getRoot() { return null; }
@Override
public TransactionId[]
getPreparedTransactions(ConnectionContext context)
throws Exception { return null; }
@Override
public BrokerInfo[] getPeerBrokerInfos() { return null;
}
@Override
public ThreadPoolExecutor getExecutor() { return null; }
@Override
public Set<ActiveMQDestination>
getDurableDestinations() { return null; }
@Override
public ActiveMQDestination[] getDestinations() throws
Exception { return null; }
@Override
public Connection[] getClients() throws Exception {
return null; }
@Override
public BrokerService getBrokerService() { return null; }
@Override
public long getBrokerSequenceId() { return 0; }
@Override
public String getBrokerName() { return null; }
@Override
public BrokerId getBrokerId() { return null; }
@Override
public ConnectionContext getAdminConnectionContext() {
return null; }
@Override
public Broker getAdaptor(Class type) { return null; }
@Override
public void forgetTransaction(ConnectionContext context,
TransactionId transactionId) throws
Exception { }
@Override
public void fastProducer(ConnectionContext context,
ProducerInfo producerInfo) { }
@Override
public void commitTransaction(ConnectionContext
context, TransactionId xid,
boolean onePhase) throws Exception { }
@Override
public void brokerServiceStarted() { }
@Override
public void beginTransaction(ConnectionContext context,
TransactionId xid)
throws Exception { }
@Override
public void addSession(ConnectionContext context,
SessionInfo info)
throws Exception { }
@Override
public void addProducer(ConnectionContext context,
ProducerInfo info)
throws Exception { }
@Override
public void addDestinationInfo(ConnectionContext
context,
DestinationInfo info) throws Exception
{ }
@Override
public void addConnection(ConnectionContext context,
ConnectionInfo info)
throws Exception { }
@Override
public void addBroker(Connection connection, BrokerInfo
info) { }
};
final ActiveMQDestination activeMQDestination = new
ActiveMQDestination() {
@Override
public byte getDataStructureType() {
return 0;
}
@Override
protected String getQualifiedPrefix() {
return null;
}
@Override
public byte getDestinationType() {
return 0;
}
@Override
public String getPhysicalName() {
return "test";
}
@Override
public boolean isComposite() {
return false;
}
};
final ConsumerInfo info = new ConsumerInfo() {
@Override
public ActiveMQDestination getDestination() {
return activeMQDestination;
}
};
toTest = new PrefetchSubscriptionStub(broker, usageManager,
context, info);
}
private class PrefetchSubscriptionStub extends PrefetchSubscription {
public PrefetchSubscriptionStub(Broker broker,
SystemUsage usageManager, ConnectionContext
context,
ConsumerInfo info) throws
InvalidSelectorException {
super(broker, usageManager, context, info);
}
@Override
public void destroy() {
// TODO Auto-generated method stub
}
@Override
protected boolean isDropped(MessageReference node) {
// TODO Auto-generated method stub
return false;
}
@Override
protected boolean canDispatch(MessageReference node)
throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
protected void acknowledge(ConnectionContext context,
MessageAck ack, MessageReference node) throws
IOException {
// TODO Auto-generated method stub
}
@Override
public boolean isSlave() {
return false;
}
@Override
public int getPrefetchSize() {
return 0;
}
@Override
protected void onDispatch(final MessageReference node, final
Message message) {
}
@Override
protected void dispatchPending() throws IOException {
}
public int getPrefetchExtension() {
return prefetchExtension;
}
}
private void primeMessagesAndDestinations() {
final Message message = new Message() {
@Override
public byte getDataStructureType() { return 0; }
@Override
public Response visit(CommandVisitor visitor) throws
Exception { return null; }
@Override
public Message copy() { return null; }
@Override
public void clearBody() throws JMSException { }
};
final Destination destination = new Destination() {
@Override
public boolean iterate() { return false; }
@Override
public void stop() throws Exception { }
@Override
public void start() throws Exception { }
@Override
public void wakeup() { }
@Override
public void slowConsumer(ConnectionContext context,
Subscription subs) { }
@Override
public void setUseCache(boolean useCache) { }
@Override
public void setProducerFlowControl(boolean value) { }
@Override
public void setMinimumMessageSize(int
minimumMessageSize) { }
@Override
public void setMaxProducersToAudit(int
maxProducersToAudit) { }
@Override
public void setMaxPageSize(int maxPageSize) { }
@Override
public void setMaxBrowsePageSize(int maxPageSize) { }
@Override
public void setMaxAuditDepth(int maxAuditDepth) { }
@Override
public void setLazyDispatch(boolean value) { }
@Override
public void setEnableAudit(boolean enableAudit) { }
@Override
public void setCursorMemoryHighWaterMark(int
cursorMemoryHighWaterMark) { }
@Override
public void setBlockedProducerWarningInterval(
long blockedProducerWarningInterval) { }
@Override
public void setAlwaysRetroactive(boolean value) { }
@Override
public void send(ProducerBrokerExchange
producerExchange,
Message messageSend) throws Exception {
}
@Override
public void removeSubscription(ConnectionContext
context, Subscription sub,
long lastDeliveredSequenceId) throws
Exception { }
@Override
public void removeProducer(ConnectionContext context,
ProducerInfo info)
throws Exception { }
@Override
public void processDispatchNotification(
MessageDispatchNotification
messageDispatchNotification)
throws Exception { }
@Override
public void messageExpired(ConnectionContext context,
Subscription subs,
MessageReference node) { }
@Override
public void messageDiscarded(ConnectionContext context,
Subscription sub,
MessageReference messageReference) { }
@Override
public void messageDelivered(ConnectionContext context,
MessageReference messageReference) { }
@Override
public void messageConsumed(ConnectionContext context,
MessageReference messageReference) {
}
@Override
public void markForGC(long timeStamp) { }
@Override
public boolean isUseCache() { return false; }
@Override
public boolean isProducerFlowControl() { return false; }
@Override
public boolean isPrioritizedMessages() { return false; }
@Override
public boolean isLazyDispatch() { return false; }
@Override
public void isFull(ConnectionContext context, Usage<?>
usage) { }
@Override
public boolean isEnableAudit() { return false; }
@Override
public boolean isDisposed() { return false; }
@Override
public boolean isAlwaysRetroactive() { return false; }
@Override
public boolean isActive() { return false; }
@Override
public SlowConsumerStrategy getSlowConsumerStrategy() {
return null; }
@Override
public String getName() { return null; }
@Override
public int getMinimumMessageSize() { return 0; }
@Override
public MessageStore getMessageStore() { return null; }
@Override
public MemoryUsage getMemoryUsage() { return null; }
@Override
public int getMaxProducersToAudit() { return 0; }
@Override
public int getMaxPageSize() { return 0; }
@Override
public int getMaxBrowsePageSize() { return 0; }
@Override
public int getMaxAuditDepth() { return 0; }
@Override
public long getInactiveTimoutBeforeGC() { return 0; }
@Override
public DestinationStatistics getDestinationStatistics()
{ return null; }
@Override
public DeadLetterStrategy getDeadLetterStrategy() {
return null; }
@Override
public int getCursorMemoryHighWaterMark() { return 0; }
@Override
public List<Subscription> getConsumers() { return null;
}
@Override
public long getBlockedProducerWarningInterval() {
return 0; }
@Override
public ActiveMQDestination getActiveMQDestination() {
return null; }
@Override
public void gc() { }
@Override
public void fastProducer(ConnectionContext context,
ProducerInfo producerInfo) { }
@Override
public void dispose(ConnectionContext context) throws
IOException { }
@Override
public boolean canGC() { return false; }
@Override
public Message[] browse() { return null; }
@Override
public void addSubscription(ConnectionContext context,
Subscription sub)
throws Exception { }
@Override
public void addProducer(ConnectionContext context,
ProducerInfo info)
throws Exception { }
@Override
public void acknowledge(ConnectionContext context,
Subscription sub,
MessageAck ack, MessageReference node)
throws IOException { }
};
ack = new MessageAck() {
@Override
public boolean isDeliveredAck() {
return true;
}
@Override
public MessageId getLastMessageId() {
return messageId;
}
};
node = new MessageReference() {
@Override
public boolean isPersistent() {return false; }
@Override
public boolean isExpired() {
return false;
}
@Override
public boolean isDropped() { return false; }
@Override
public boolean isAdvisory() { return false; }
@Override
public int incrementReferenceCount() { return 0; }
@Override
public void incrementRedeliveryCounter() { }
@Override
public ConsumerId getTargetConsumerId() { return null; }
@Override
public int getSize() { return 0; }
@Override
public Destination getRegionDestination() { return
destination; }
@Override
public int getReferenceCount() { return 0; }
@Override
public int getRedeliveryCounter() { return 0; }
@Override
public MessageId getMessageId() { return messageId; }
@Override
public Message getMessageHardRef() { return null; }
@Override
public Message getMessage() { return message; }
@Override
public int getGroupSequence() { return 0; }
@Override
public String getGroupID() { return null; }
@Override
public long getExpiration() { return 0; }
@Override
public int decrementReferenceCount() { return 0; }
};
}
}
{code}
> Different methods synchronizing on different mutexes when changing the same
> field
> ---------------------------------------------------------------------------------
>
> Key: AMQ-3732
> URL: https://issues.apache.org/jira/browse/AMQ-3732
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.5.1
> Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0:
> Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
> Reporter: Phillip Henry
> Labels: concurrency
>
> org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is
> changed while guarded by a mutex on this (PrefetchSubscription) in
> PrefetchSubscription.pullMessage(...) and PrefetchSubscription.dispatchLock
> in PrefetchSubscription.acknowledge(...).
> This can lead to the corruption of the prefetchExtension variable (eg,
> prefetchExtension++ in pullMessage() is not an atomic operation so
> prefetchExtension may change in acknowledge() mid-way through this operation).
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira