[ 
https://issues.apache.org/jira/browse/AMQ-3732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13217443#comment-13217443
 ] 

Phillip Henry commented on AMQ-3732:
------------------------------------

OK, it's not quite a unit test - it's a stress test that uses JUnit - but it 
illustrates the point.

This code fails on my Mac (2.66GHz Intel Core 2 Duo) after a non-deterministic 
number of iterations. 

Most of the code is just simple stub implementations of ActiveMQ classes and 
interfaces with one or two methods implemented or overriden. (I tried using 
JMock but it didn't seem multi-threaded friendly). Basically, it starts two 
threads - one to pull a message, one to acknowledge. The pulling thread should 
increase prefetchExtension in pullMessage() thus:
{code}
                synchronized(this) {
                        prefetchExtension++;
                        dispatchCounterBeforePull = dispatchCounter;
                }
{code}
The other thread calls acknowledge with the stubbed objects and 
PrefetchSubscription object itself in such a state that this line is executed:
{code}
                            prefetchExtension = Math.max(prefetchExtension, 
index + 1);
{code}
where index is the index of the number of dispatched MessageReference 
collection. This should never be more than 0 since only message is ever 
dispatch()-ed by the test. All objects are instantiated anew on each iteration.

Given this, prefetchExtension should only ever be 0 (neither test has hit their 
target code) or 1 (from prefetchExtension++ or Math.max(prefetchExtension, 1) 
because index is always 0).

However, the test demonstrates that occasionally prefetchExtension is 2. I 
posit that this is because the threads can sometimes execute their lines at the 
same time; prefetchExtension++ is not atomic (it's actually prefetchExtension = 
prefetchExtension + 1); and changing it is not guarded by the same mutex. 

For example, the order might look like:

Step 1. PULL THREAD: add 1 to prefetchExtension. To do this, find out what 
prefetchExtension is. But before this thread can do so, context switches to the 
thread executing acknowledge() in step 2, immediately below.

Step 2. ACK THREAD: prefetchExtension is the maximum of its current value and 
1. It looks like its current value is 0 so set prefetchExtension to 1.

Step 3. PULL THREAD: has found that prefetchExtension is 1 and adds one to it 
giving a total of 2.


Test case:
{code}
package org.apache.activemq.broker.region;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;

import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;

import junit.framework.TestCase;

public class PrefetchSubscriptionStressTest extends TestCase {

        private PrefetchSubscriptionStub toTest;
        private final MessageId messageId = new MessageId();
        private MessageAck ack;
        private MessageReference node;

        protected void setUp() throws Exception {
                super.setUp();
        }
        
        public void testMultiThreadedAccessToPrefetchExtension() throws 
InterruptedException, InvalidSelectorException {
                final int callsPerThread = 1;
                final int numIterations = 1000;
                
                for (int i = 0 ; i < numIterations ; i++) {
                        final MessagePull pull = initializeStubs();
                        
                        Thread messagePuller = new Thread(new 
MessagePuller(callsPerThread, pull));
                        Thread dispatchAndAcknowledger = new Thread(new 
DispatchAndAcknowledger(ack, node, callsPerThread));
                
                        startThreads(messagePuller, dispatchAndAcknowledger);
                        
                        waitForThreadsToStop(messagePuller, 
dispatchAndAcknowledger);
                        
                        int actualPrefetchExtensions = 
toTest.getPrefetchExtension();
                        assertEquals("failed on iteration: " + i, 1, 
actualPrefetchExtensions);
                }
        }
        
        private void waitForThreadsToStop(Thread messagePuller, Thread 
dispatchAndAcknowledger) throws InterruptedException {
                dispatchAndAcknowledger.join();
                messagePuller.join();   
        }

        private void startThreads(Thread messagePuller, Thread 
dispatchAndAcknowledger) {
                messagePuller.start();
                dispatchAndAcknowledger.start();
        }

        private MessagePull initializeStubs() throws InvalidSelectorException {
                initialise();
                primeMessagesAndDestinations();
                final MessagePull pull = primeForPullMessage();
                return pull;
        }

        private void dispatchAndAcknowledge(final MessageAck ack, final 
MessageReference node, final int numToDispatch)
                        throws IOException, Exception {
                ConnectionContext context = null;
                for (int i = 0 ; i < numToDispatch ; i++) {
                        toTest.dispatch(node);
                }
                toTest.acknowledge(context , ack);
        }
        
        public class MessagePuller implements Runnable {
                
                private final int totalRuns;
                private final MessagePull pull;
                
                public MessagePuller(int totalRuns, MessagePull pull) {
                        super();
                        this.totalRuns = totalRuns;
                        this.pull = pull;
                }
                
                @Override
                public void run() {
                        for (int i = 0 ; i < totalRuns ; i++) {
                                try {
                                        pullMessage(pull);
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                }
                
        }

        private void pullMessage(final MessagePull pull) throws Exception {
                ConnectionContext context = null;
                toTest.pullMessage(context, pull);
        }

        private MessagePull primeForPullMessage() {
                final MessagePull pull = new MessagePull() {
                        @Override
                        public long getTimeout() {
                                return -1;
                        }
                };
                return pull;
        }

        
        public class DispatchAndAcknowledger implements Runnable {
                
                private final MessageAck ack;
                private final MessageReference node;
                private final int totalRuns;
                
                public DispatchAndAcknowledger(MessageAck ack, MessageReference 
node, int totalRuns) {
                        super();
                        this.ack = ack;
                        this.node = node;
                        this.totalRuns = totalRuns;
                }

                @Override
                public void run() {
                        for (int i = 0 ; i < totalRuns ; i++) {
                                try {
                                        dispatchAndAcknowledge(ack, node, 1);
                                } catch (IOException e) {
                                        e.printStackTrace();
                                } catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                }
        }

        private void initialise() throws InvalidSelectorException {
                SystemUsage usageManager = null;
                
                final Connection connection = new Connection() {
                        
                        @Override
                        public void stop() throws Exception {                   
}
                        
                        @Override
                        public void start() throws Exception {                  
}
                        
                        @Override
                        public void updateClient(ConnectionControl control) {   
                }
                        
                        @Override
                        public void serviceExceptionAsync(IOException e) {      
                }
                        
                        @Override
                        public void serviceException(Throwable error) {         
        }
                        
                        @Override
                        public Response service(Command command) { return null; 
}
                        
                        @Override
                        public boolean isSlow() { return false; }
                        
                        @Override
                        public boolean isNetworkConnection() { return false; }
                        
                        @Override
                        public boolean isManageable() { return false; }
                        
                        @Override
                        public boolean isFaultTolerantConnection() { return 
false; }
                        
                        @Override
                        public boolean isConnected() { return false; }
                        
                        @Override
                        public boolean isBlocked() { return false; }
                        
                        @Override
                        public boolean isActive() { return false; }
                        
                        @Override
                        public ConnectionStatistics getStatistics() { return 
null; }
                        
                        @Override
                        public String getRemoteAddress() { return null; }
                        
                        @Override
                        public int getDispatchQueueSize() { return 0; }
                        
                        @Override
                        public Connector getConnector() { return null; }
                        
                        @Override
                        public String getConnectionId() { return null; }
                        
                        @Override
                        public void dispatchSync(Command message) { }
                        
                        @Override
                        public void dispatchAsync(Command command) { }
                };
                
                final ConnectionContext context = new ConnectionContext() {
                        @Override
                    public Connection getConnection() {
                        return connection;
                    }
                }; 
                
                final Broker broker = new Broker() {
                        
                        @Override
                        public void stop() throws Exception { }
                        
                        @Override
                        public void start() throws Exception { }
                        
                        @Override
                        public void send(ProducerBrokerExchange 
producerExchange, Message message)
                                        throws Exception { }
                        
                        @Override
                        public void removeSubscription(ConnectionContext 
context,
                                        RemoveSubscriptionInfo info) throws 
Exception { }
                        
                        @Override
                        public void removeDestination(ConnectionContext context,
                                        ActiveMQDestination destination, long 
timeout) throws Exception { }
                        
                        @Override
                        public void removeConsumer(ConnectionContext context, 
ConsumerInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void processDispatchNotification(
                                        MessageDispatchNotification 
messageDispatchNotification)
                                        throws Exception { }
                        
                        @Override
                        public void 
processConsumerControl(ConsumerBrokerExchange consumerExchange,
                                        ConsumerControl control) { }
                        
                        @Override
                        public Response messagePull(ConnectionContext context, 
MessagePull pull)
                                        throws Exception { return null; }
                        
                        @Override
                        public Set<Destination> 
getDestinations(ActiveMQDestination destination) { return null; }
                        
                        @Override
                        public Map<ActiveMQDestination, Destination> 
getDestinationMap() { return null; }
                        
                        @Override
                        public void gc() { }
                        
                        @Override
                        public Destination addDestination(ConnectionContext 
context,
                                        ActiveMQDestination destination, 
boolean createIfTemporary)
                                        throws Exception { return null; }
                        
                        @Override
                        public Subscription addConsumer(ConnectionContext 
context, ConsumerInfo info)
                                        throws Exception { return null; }
                        
                        @Override
                        public void acknowledge(ConsumerBrokerExchange 
consumerExchange,
                                        MessageAck ack) throws Exception { }
                        
                        @Override
                        public void slowConsumer(ConnectionContext context,
                                        Destination destination, Subscription 
subs) { }
                        
                        @Override
                        public void setAdminConnectionContext(
                                        ConnectionContext 
adminConnectionContext) { }
                        
                        @Override
                        public void sendToDeadLetterQueue(ConnectionContext 
context,
                                        MessageReference messageReference, 
Subscription subscription) { }
                        
                        @Override
                        public void rollbackTransaction(ConnectionContext 
context, TransactionId xid)
                                        throws Exception { }
                        
                        @Override
                        public void removeSession(ConnectionContext context, 
SessionInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void removeProducer(ConnectionContext context, 
ProducerInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void removeDestinationInfo(ConnectionContext 
context,
                                        DestinationInfo info) throws Exception 
{ }
                        
                        @Override
                        public void removeConnection(ConnectionContext context,
                                        ConnectionInfo info, Throwable error) 
throws Exception { }
                        
                        @Override
                        public void removeBroker(Connection connection, 
BrokerInfo info) { }
                        
                        @Override
                        public int prepareTransaction(ConnectionContext 
context, TransactionId xid)
                                        throws Exception { return 0; }
                        
                        @Override
                        public void preProcessDispatch(MessageDispatch 
messageDispatch) { }
                        
                        @Override
                        public void postProcessDispatch(MessageDispatch 
messageDispatch) { }
                        
                        @Override
                        public void nowMasterBroker() { }
                        
                        @Override
                        public void networkBridgeStopped(BrokerInfo brokerInfo) 
{ }
                        
                        @Override
                        public void networkBridgeStarted(BrokerInfo brokerInfo,
                                        boolean createdByDuplex, String 
remoteIp) { }
                        
                        @Override
                        public void messageExpired(ConnectionContext context,
                                        MessageReference messageReference, 
Subscription subscription) { }
                        
                        @Override
                        public void messageDiscarded(ConnectionContext context, 
Subscription sub,
                                        MessageReference messageReference) { }
                        
                        @Override
                        public void messageDelivered(ConnectionContext context,
                                        MessageReference messageReference) { }
                        
                        @Override
                        public void messageConsumed(ConnectionContext context,
                                        MessageReference messageReference) { }
                        
                        @Override
                        public boolean isStopped() { return false; }
                        
                        @Override
                        public void isFull(ConnectionContext context, 
Destination destination,
                                        Usage usage) { }
                        
                        @Override
                        public boolean isFaultTolerantConfiguration() { return 
false; }
                        
                        @Override
                        public boolean isExpired(MessageReference 
messageReference) { return false; }
                        
                        @Override
                        public URI getVmConnectorURI() { return null; }
                        
                        @Override
                        public PListStore getTempDataStore() { return null; }
                        
                        @Override
                        public Scheduler getScheduler() { return null; }
                        
                        @Override
                        public Broker getRoot() { return null; }
                        
                        @Override
                        public TransactionId[] 
getPreparedTransactions(ConnectionContext context)
                                        throws Exception { return null; }
                        
                        @Override
                        public BrokerInfo[] getPeerBrokerInfos() { return null; 
}
                        
                        @Override
                        public ThreadPoolExecutor getExecutor() { return null; }
                        
                        @Override
                        public Set<ActiveMQDestination> 
getDurableDestinations() { return null; }
                        
                        @Override
                        public ActiveMQDestination[] getDestinations() throws 
Exception { return null; }
                        
                        @Override
                        public Connection[] getClients() throws Exception { 
return null; }
                        
                        @Override
                        public BrokerService getBrokerService() { return null; }
                        
                        @Override
                        public long getBrokerSequenceId() { return 0; }
                        
                        @Override
                        public String getBrokerName() { return null; }
                        
                        @Override
                        public BrokerId getBrokerId() { return null; }
                        
                        @Override
                        public ConnectionContext getAdminConnectionContext() { 
return null; }
                        
                        @Override
                        public Broker getAdaptor(Class type) { return null; }
                        
                        @Override
                        public void forgetTransaction(ConnectionContext context,
                                        TransactionId transactionId) throws 
Exception { }
                        
                        @Override
                        public void fastProducer(ConnectionContext context,
                                        ProducerInfo producerInfo) { }
                        
                        @Override
                        public void commitTransaction(ConnectionContext 
context, TransactionId xid,
                                        boolean onePhase) throws Exception { }
                        
                        @Override
                        public void brokerServiceStarted() { }
                        
                        @Override
                        public void beginTransaction(ConnectionContext context, 
TransactionId xid)
                                        throws Exception { }
                        
                        @Override
                        public void addSession(ConnectionContext context, 
SessionInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void addProducer(ConnectionContext context, 
ProducerInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void addDestinationInfo(ConnectionContext 
context,
                                        DestinationInfo info) throws Exception 
{ }
                        
                        @Override
                        public void addConnection(ConnectionContext context, 
ConnectionInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void addBroker(Connection connection, BrokerInfo 
info) { }
                };
                
                final ActiveMQDestination activeMQDestination = new 
ActiveMQDestination() {
                        
                        @Override
                        public byte getDataStructureType() {
                                return 0;
                        }
                        
                        @Override
                        protected String getQualifiedPrefix() {
                                return null;
                        }
                        
                        @Override
                        public byte getDestinationType() {
                                return 0;
                        }
                        
                        @Override
                        public String getPhysicalName() { 
                                return "test";
                        }
                        
                        @Override
                        public boolean isComposite() {
                                return false;
                        }
                };
                
                final ConsumerInfo info = new ConsumerInfo() {
                        @Override
                        public ActiveMQDestination getDestination() {
                                return activeMQDestination;
                        }
                };
                

                toTest = new PrefetchSubscriptionStub(broker, usageManager, 
context, info);
        }
        
        private class PrefetchSubscriptionStub extends PrefetchSubscription {

                public PrefetchSubscriptionStub(Broker broker,
                                SystemUsage usageManager, ConnectionContext 
context,
                                ConsumerInfo info) throws 
InvalidSelectorException {
                        super(broker, usageManager, context, info);
                }

                @Override
                public void destroy() {
                        // TODO Auto-generated method stub

                }

                @Override
                protected boolean isDropped(MessageReference node) {
                        // TODO Auto-generated method stub
                        return false;
                }

                @Override
                protected boolean canDispatch(MessageReference node)
                                throws IOException {
                        // TODO Auto-generated method stub
                        return false;
                }

                @Override
                protected void acknowledge(ConnectionContext context,
                                MessageAck ack, MessageReference node) throws 
IOException {
                        // TODO Auto-generated method stub

                }

                @Override
                public boolean isSlave() {
                        return false;
                }

                @Override
                public int getPrefetchSize() {
                        return 0;
                }

                @Override
                protected void onDispatch(final MessageReference node, final 
Message message) {
                        
                }
                
                @Override
                protected void dispatchPending() throws IOException {
                        
                }
                
                public int getPrefetchExtension() {
                        return prefetchExtension;
                }
        }

        private void primeMessagesAndDestinations() {
                final Message message = new Message() {
                        
                        @Override
                        public byte getDataStructureType() { return 0; }
                        
                        @Override
                        public Response visit(CommandVisitor visitor) throws 
Exception { return null; }
                        
                        @Override
                        public Message copy() { return null; }
                        
                        @Override
                        public void clearBody() throws JMSException { }
                };
                final Destination destination = new Destination() {
                        
                        @Override
                        public boolean iterate() { return false; }
                        
                        @Override
                        public void stop() throws Exception { }
                        
                        @Override
                        public void start() throws Exception { }
                        
                        @Override
                        public void wakeup() { }
                        
                        @Override
                        public void slowConsumer(ConnectionContext context, 
Subscription subs) { }
                        
                        @Override
                        public void setUseCache(boolean useCache) { }
                        
                        @Override
                        public void setProducerFlowControl(boolean value) { }
                        
                        @Override
                        public void setMinimumMessageSize(int 
minimumMessageSize) { }
                        
                        @Override
                        public void setMaxProducersToAudit(int 
maxProducersToAudit) { }
                        
                        @Override
                        public void setMaxPageSize(int maxPageSize) { }
                        
                        @Override
                        public void setMaxBrowsePageSize(int maxPageSize) { }
                        
                        @Override
                        public void setMaxAuditDepth(int maxAuditDepth) { }
                        
                        @Override
                        public void setLazyDispatch(boolean value) { }
                        
                        @Override
                        public void setEnableAudit(boolean enableAudit) { }
                        
                        @Override
                        public void setCursorMemoryHighWaterMark(int 
cursorMemoryHighWaterMark) { }
                        
                        @Override
                        public void setBlockedProducerWarningInterval(
                                        long blockedProducerWarningInterval) { }
                        
                        @Override
                        public void setAlwaysRetroactive(boolean value) { }
                        
                        @Override
                        public void send(ProducerBrokerExchange 
producerExchange,
                                        Message messageSend) throws Exception { 
}
                        
                        @Override
                        public void removeSubscription(ConnectionContext 
context, Subscription sub,
                                        long lastDeliveredSequenceId) throws 
Exception { }
                        
                        @Override
                        public void removeProducer(ConnectionContext context, 
ProducerInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void processDispatchNotification(
                                        MessageDispatchNotification 
messageDispatchNotification)
                                        throws Exception { }
                        
                        @Override
                        public void messageExpired(ConnectionContext context, 
Subscription subs,
                                        MessageReference node) { }
                        
                        @Override
                        public void messageDiscarded(ConnectionContext context, 
Subscription sub,
                                        MessageReference messageReference) { }
                        
                        @Override
                        public void messageDelivered(ConnectionContext context,
                                        MessageReference messageReference) { }
                        
                        @Override
                        public void messageConsumed(ConnectionContext context,
                                        MessageReference messageReference) {    
}
                        
                        @Override
                        public void markForGC(long timeStamp) { }
                        
                        @Override
                        public boolean isUseCache() { return false; }
                        
                        @Override
                        public boolean isProducerFlowControl() { return false; }
                        
                        @Override
                        public boolean isPrioritizedMessages() { return false; }
                        
                        @Override
                        public boolean isLazyDispatch() { return false; }
                        
                        @Override
                        public void isFull(ConnectionContext context, Usage<?> 
usage) { }
                        
                        @Override
                        public boolean isEnableAudit() { return false; }
                        
                        @Override
                        public boolean isDisposed() { return false; }
                        
                        @Override
                        public boolean isAlwaysRetroactive() { return false; }
                        
                        @Override
                        public boolean isActive() { return false; }
                        
                        @Override
                        public SlowConsumerStrategy getSlowConsumerStrategy() { 
return null; }
                        
                        @Override
                        public String getName() { return null; }
                        
                        @Override
                        public int getMinimumMessageSize() { return 0; }
                        
                        @Override
                        public MessageStore getMessageStore() { return null; }
                        
                        @Override
                        public MemoryUsage getMemoryUsage() { return null; }
                        
                        @Override
                        public int getMaxProducersToAudit() { return 0; }
                        
                        @Override
                        public int getMaxPageSize() { return 0; }
                        
                        @Override
                        public int getMaxBrowsePageSize() { return 0; }
                        
                        @Override
                        public int getMaxAuditDepth() {         return 0; }
                        
                        @Override
                        public long getInactiveTimoutBeforeGC() { return 0; }
                        
                        @Override
                        public DestinationStatistics getDestinationStatistics() 
{ return null; }
                        
                        @Override
                        public DeadLetterStrategy getDeadLetterStrategy() { 
return null; }
                        
                        @Override
                        public int getCursorMemoryHighWaterMark() { return 0; }
                        
                        @Override
                        public List<Subscription> getConsumers() { return null; 
}
                        
                        @Override
                        public long getBlockedProducerWarningInterval() { 
return 0; }
                        
                        @Override
                        public ActiveMQDestination getActiveMQDestination() { 
return null;      }
                        
                        @Override
                        public void gc() {      }
                        
                        @Override
                        public void fastProducer(ConnectionContext context,
                                        ProducerInfo producerInfo) { }
                        
                        @Override
                        public void dispose(ConnectionContext context) throws 
IOException { }
                        
                        @Override
                        public boolean canGC() { return false; }
                        
                        @Override
                        public Message[] browse() { return null; }
                        
                        @Override
                        public void addSubscription(ConnectionContext context, 
Subscription sub)
                                        throws Exception { }
                        
                        @Override
                        public void addProducer(ConnectionContext context, 
ProducerInfo info)
                                        throws Exception { }
                        
                        @Override
                        public void acknowledge(ConnectionContext context, 
Subscription sub,
                                        MessageAck ack, MessageReference node) 
throws IOException { }
                };
                
                ack = new MessageAck() {

                        @Override
                        public boolean isDeliveredAck() {
                                return true;
                        }
                        @Override
                        public MessageId getLastMessageId() {
                                return messageId;
                        }
                        
                };
                node = new MessageReference() {
                        
                        @Override
                        public boolean isPersistent() {return false; }
                        
                        @Override
                        public boolean isExpired() { 
                                return false; 
                        }
                        
                        @Override
                        public boolean isDropped() { return false; }
                        
                        @Override
                        public boolean isAdvisory() { return false; }
                        
                        @Override
                        public int incrementReferenceCount() { return 0; }
                        
                        @Override
                        public void incrementRedeliveryCounter() { }
                        
                        @Override
                        public ConsumerId getTargetConsumerId() { return null; }
                        
                        @Override
                        public int getSize() { return 0; }
                        
                        @Override
                        public Destination getRegionDestination() { return 
destination; }
                        
                        @Override
                        public int getReferenceCount() { return 0; }
                        
                        @Override
                        public int getRedeliveryCounter() { return 0;   }
                        
                        @Override
                        public MessageId getMessageId() { return messageId; }
                        
                        @Override
                        public Message getMessageHardRef() { return null; }
                        
                        @Override
                        public Message getMessage() { return message; }
                        
                        @Override
                        public int getGroupSequence() { return 0;       }
                        
                        @Override
                        public String getGroupID() { return null; }
                        
                        @Override
                        public long getExpiration() { return 0; }
                        
                        @Override
                        public int decrementReferenceCount() { return 0; }
                };
        }
}
{code}
                
> Different methods synchronizing on different mutexes when changing the same 
> field
> ---------------------------------------------------------------------------------
>
>                 Key: AMQ-3732
>                 URL: https://issues.apache.org/jira/browse/AMQ-3732
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.1
>         Environment: Darwin phillip.local 9.8.0 Darwin Kernel Version 9.8.0: 
> Wed Jul 15 16:55:01 PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
>            Reporter: Phillip Henry
>              Labels: concurrency
>
> org.apache.activemq.broker.region.PrefetchSubscription.prefetchExtension is 
> changed while guarded by a mutex on this (PrefetchSubscription) in 
> PrefetchSubscription.pullMessage(...) and PrefetchSubscription.dispatchLock 
> in PrefetchSubscription.acknowledge(...). 
> This can lead to the corruption of the prefetchExtension variable (eg, 
> prefetchExtension++ in pullMessage() is not an atomic operation so 
> prefetchExtension may change in acknowledge() mid-way through this operation).

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to