[ 
https://issues.apache.org/activemq/browse/AMQ-1585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Tully updated AMQ-1585:
----------------------------

    Attachment: AMQ-1585.patch

I find that the org.apache.activemq.broker.ft.QueueMasterSlaveTest can 
reproduce this problem if the number of messages produced in increased to 200, 
greater than the default maxPageSize, 100, for a destination.

It appears that there are a few problems.
1) the decrementing inflight statistics cause messages in the slave to be lost 
once the maxpageSize for a destination is reached. This is fixed by not 
decrementing in error when queue is on the slave.
2) the dispatch notification can be late getting to the slave, so similar to 
send processing, the notification is sent in advance.
3) even with early sending, a send can be later than a dispatchNotification so 
sending sync fixes that.



> Problems with pure master/slave configuration
> ---------------------------------------------
>
>                 Key: AMQ-1585
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1585
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.1.1, 5.0.0, 5.1.0
>         Environment: Ubuntu 6.04, JDK 1.5.0_011, Spring 2.0.x
>            Reporter: Thomas Buckel
>            Assignee: Gary Tully
>         Attachments: AMQ-1585.patch
>
>
> As posted in the AMQ user forum:
> http://www.nabble.com/Problems-with-Pure-Master-Slave-in-AMQ-5.0.0-to15471491s2354.html#a15474769
> -------------------
> Hi all,
> I am having trouble setting up a *stable* ActiveMQ Pure Master/Slave topology.
> Initially I have tried v4.1.1 which failed with an exception. I found an AMQ 
> JIRA ticket which said that Pure/Master slave didn't work in v4.1.1.
> Ok, so I switched to AMQ 5.0.0, created 2 configs (master/slave, see end of 
> message) and ran two AMQ instances (on the same box) and most of the times my 
> test (see below) worked, but more often I get various error messages like:
> - On the slave:
> ERROR Service                        - Async error occurred: 
> javax.jms.JMSException: Slave broker out of sync with master: Dispatched 
> message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the 
> pending list
> javax.jms.JMSException: Slave broker out of sync with master: Dispatched 
> message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the 
> pending list
>         at 
> org.apache.activemq.broker.region.PrefetchSubscription.processMessageDispatchNotification(PrefetchSubscription.java:160)
>         at 
> org.apache.activemq.broker.region.AbstractRegion.processDispatchNotification(AbstractRegion.java:381)
>         at 
> org.apache.activemq.broker.region.RegionBroker.processDispatchNotification(RegionBroker.java:550)
>         at 
> org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>         at 
> org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>         at 
> org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201)
>         at 
> org.apache.activemq.broker.MutableBrokerFilter.processDispatchNotification(MutableBrokerFilter.java:211)
>         at 
> org.apache.activemq.broker.TransportConnection.processMessageDispatchNotification(TransportConnection.java:450)
>         at 
> org.apache.activemq.command.MessageDispatchNotification.visit(MessageDispatchNotification.java:77)
>         at 
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
>         at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
>         at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
>         at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
>         at 
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
>         at 
> org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
>         at 
> org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
> - After having killed the master, stopped the slave, copied the slave's data 
> into the master's data directory various error message came up (as described 
> in the Master/Slave recovery section), e.g. (internal) ActiveMQ topics were 
> not available, the admin webApp showed exceptions and errors on the client.
> The test I've created uses Spring 2.0.x and pumps 1000 MapMessages in a queue 
> through Spring's JmsTempate, each message is created within its own 
> transaction, using JmsTransactionManager and TransactionTemplate.
> The created messages are consumed by an initially instantiated transactional 
> DefaultMessageListenerContainer. The AMQ JARs in the test's classpath are 
> activemq-core-5.0.0.jar, geronimo-jms_1.1_spec-1.0.jar, 
> geronimo-jta_1.0.1B_spec-1.0.jar as I've noticed a really bad performance 
> when only using the activemq-all-5.0.0.jar (maybe this is the problem?).
> The test code work's without problems with OpenMQ, but I'd prefer using the 
> nice Pure Master/Active ActiveMQ if I can get it running in a *stable* config 
> ;)
> I would highly appreciate any help or suggestions. Maybe my config is wrong 
> or I miss something essential. I've also tried a recent AMQ 5.1 SNAPSHOT 
> which wasn't better...
> See below for the small program i used to test (no unit test, behaviour 
> appeared to be non deterministic to me and it's not so nice as i've changed 
> it quite often)
> Thanks in advance,
> Thomas
> <!-- MASTER config -->
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:amq="http://activemq.org/config/1.0";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.org/config/1.0 
> http://activemq.apache.org/schema/activemq-core.xsd
>   http://activemq.apache.org/camel/schema/spring 
> http://activemq.apache.org/camel/schema/spring/camel-spring.xsd";>
>   <bean 
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>   <broker xmlns="http://activemq.org/config/1.0"; brokerName="master" 
> dataDirectory="${activemq.base}/data">
>     <destinationPolicy>
>       <policyMap>
>         <policyEntries>
>           <policyEntry topic="FOO.>" producerFlowControl="false" 
> memoryLimit="1mb">
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>         </policyEntries>
>       </policyMap> 
>     </destinationPolicy>
>     <transportConnectors>
>        <transportConnector name="openwire" uri="tcp://tbuckel-desktop:7778" />
>     </transportConnectors>
>     <networkConnectors/>
>     <managementContext>
>        <managementContext connectorPort="1100" 
> jmxDomainName="org.apache.activemq"/>
>     </managementContext>
>   </broker>
>   <commandAgent xmlns="http://activemq.org/config/1.0"/>
>   <jetty xmlns="http://mortbay.com/schemas/jetty/1.0";>
>     <connectors>
>       <nioConnector port="8161" />
>     </connectors>
>     <handlers>
>       <webAppContext contextPath="/admin" 
> resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
>     </handlers>
>   </jetty>
> </beans>
> <!-- SLAVE config -->
> <beans
>   xmlns="http://www.springframework.org/schema/beans";
>   xmlns:amq="http://activemq.org/config/1.0";
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>   xsi:schemaLocation="http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.org/config/1.0 
> http://activemq.apache.org/schema/activemq-core.xsd
>   http://activemq.apache.org/camel/schema/spring 
> http://activemq.apache.org/camel/schema/spring/camel-spring.xsd";>
>   <bean 
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
>   
>   <broker xmlns="http://activemq.org/config/1.0"; brokerName="slave" 
> dataDirectory="${activemq.base}/data-slave"
>           masterConnectorURI="tcp://tbuckel-desktop:7778">
>   
>     <destinationPolicy>
>       <policyMap>
>         <policyEntries>
>           <policyEntry topic="FOO.>" producerFlowControl="false" 
> memoryLimit="1mb">
>             <dispatchPolicy>
>               <strictOrderDispatchPolicy/>
>             </dispatchPolicy>
>             <subscriptionRecoveryPolicy>
>               <lastImageSubscriptionRecoveryPolicy/>
>             </subscriptionRecoveryPolicy>
>           </policyEntry>
>         </policyEntries>
>       </policyMap>
>     </destinationPolicy>
>     <transportConnectors>
>        <transportConnector name="openwire" uri="tcp://localhost:7779"/>
>     </transportConnectors>
>     <networkConnectors/>
>     <managementContext>
>        <managementContext connectorPort="1101" 
> jmxDomainName="org.apache.activemq"/>
>     </managementContext>
>   </broker>
>   <commandAgent xmlns="http://activemq.org/config/1.0"/>
>   <jetty xmlns="http://mortbay.com/schemas/jetty/1.0";>
>     <connectors>
>       <nioConnector port="8162" />
>     </connectors>
>     <handlers>
>       <webAppContext contextPath="/admin" 
> resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
>     </handlers>
>   </jetty>
> </beans>
> ------------
> Test code:
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.springframework.jms.connection.JmsTransactionManager;
> import 
> org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import org.springframework.jms.listener.DefaultMessageListenerContainer;
> import org.springframework.transaction.TransactionStatus;
> import 
> org.springframework.transaction.support.TransactionCallbackWithoutResult;
> import org.springframework.transaction.support.TransactionTemplate;
> import javax.jms.*;
> import java.math.BigInteger;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.TimeUnit;
> public class AnotherFailoverTest {
>     public static final int MESSAGES = 1000;
>     private final static List<BigInteger> notConsumedMessages = new 
> ArrayList<BigInteger>(MESSAGES);
>     private static ConnectionFactory createCF() throws Exception {
>         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
>         
> cf.setBrokerURL("failover://(tcp://localhost:7778,tcp://localhost:7779)?randomize=false");
>         return new TransactionAwareConnectionFactoryProxy(cf);
>     }
>     private static void send() throws Exception {
>         JmsTransactionManager transactionManager = new 
> JmsTransactionManager();
>         transactionManager.setConnectionFactory(createCF());
>         transactionManager.afterPropertiesSet();
>         int i=0;
>         do {
>             i++;
>             final int number = i;
>             try {
>                 final BigInteger v = new BigInteger(Integer.toString(number));
>                 TransactionTemplate tt = new 
> TransactionTemplate(transactionManager);
>                 tt.execute(new TransactionCallbackWithoutResult() {
>                     protected void 
> doInTransactionWithoutResult(TransactionStatus status) {
>                         final JmsTemplate template = new JmsTemplate(pcf);
>                         template.setSessionTransacted(true);
>                         template.afterPropertiesSet();
>                         template.send("testqueue", new MessageCreator() {
>                             public Message createMessage(Session session) 
> throws JMSException {
>                                 ObjectMessage dummyMessage = 
> session.createObjectMessage();
>                                 dummyMessage.setObject(v);
>                                 synchronized (notConsumedMessages) {
>                                     notConsumedMessages.add(v);
>                                 }
> //                                System.out.println("Created message " + 
> number + "(" + notConsumedMessages.size() + ")");
>                                 return dummyMessage;
>                             }
>                         });
>                     }
>                 });
>             } catch (Exception e) {
>                 e.printStackTrace();
>                 System.out.println("Error creating message " + number);
>             }
>         } while (i < MESSAGES);
>     }
>     private static void setupReceiver() throws Exception {
>         JmsTransactionManager transactionManager = new 
> JmsTransactionManager();
>         transactionManager.setConnectionFactory(createCF());
>         transactionManager.afterPropertiesSet();
>         final DefaultMessageListenerContainer container = new 
> DefaultMessageListenerContainer();
>         container.setConnectionFactory(pcf);
>         container.setTransactionManager(transactionManager);
>         container.setMessageListener(new MessageListener() {
>             public void onMessage(Message message) {
>                 try {
>                     ObjectMessage msg = (ObjectMessage) message;
>                     BigInteger number = (BigInteger) msg.getObject();
>                     synchronized (notConsumedMessages) {
>                         if (!notConsumedMessages.remove(number)) {
>                            System.err.println("Message " + number + " not 
> found in list!");
>                         } else {
>    //                        System.out.println("Consumed message " + number);
>                        }
>                    }
>                 } catch (JMSException e) {
> //                    e.printStackTrace();
>                     System.out.println("Error consuming message!");
>                 }
>             }
>         });
>         container.setSessionTransacted(true);
>         container.setDestinationName("testqueue");
>         container.setExceptionListener(new ExceptionListener() {
>             public void onException(JMSException jmsException) {
>                 System.err.println(jmsException);
>             }
>         });
>         container.afterPropertiesSet();
>         container.initialize();
>         TimeUnit.SECONDS.sleep(1);
>     }
>     public static void main(String[] args) throws Exception {
>         long start = System.currentTimeMillis();
>         setupReceiver();
>         send();
>         int remainingSize = 0;
>         do {
>             Thread.sleep(500);
>             synchronized (notConsumedMessages) {
>                 remainingSize = notConsumedMessages.size();
>             }
>             System.out.println("Unconsumed " + remainingSize + ": " + sb);
>         } while (remainingSize > 0);
>         System.out.println("All messages consumed.");
>         long end = System.currentTimeMillis();
>         System.out.println((end-start));
>         System.exit(0);
>     }
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to