[ https://issues.apache.org/activemq/browse/AMQ-1585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=43831#action_43831 ]
Gary Tully commented on AMQ-1585: --------------------------------- I have been able to reproduce again by pushing more messages through org.apache.activemq.broker.ft.QueueMasterSlaveTest, have some more work to do! > Problems with pure master/slave configuration > --------------------------------------------- > > Key: AMQ-1585 > URL: https://issues.apache.org/activemq/browse/AMQ-1585 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 4.1.1, 5.0.0, 5.1.0 > Environment: Ubuntu 6.04, JDK 1.5.0_011, Spring 2.0.x > Reporter: Thomas Buckel > Assignee: Gary Tully > Attachments: AMQ-1585.patch > > > As posted in the AMQ user forum: > http://www.nabble.com/Problems-with-Pure-Master-Slave-in-AMQ-5.0.0-to15471491s2354.html#a15474769 > ------------------- > Hi all, > I am having trouble setting up a *stable* ActiveMQ Pure Master/Slave topology. > Initially I have tried v4.1.1 which failed with an exception. I found an AMQ > JIRA ticket which said that Pure/Master slave didn't work in v4.1.1. > Ok, so I switched to AMQ 5.0.0, created 2 configs (master/slave, see end of > message) and ran two AMQ instances (on the same box) and most of the times my > test (see below) worked, but more often I get various error messages like: > - On the slave: > ERROR Service - Async error occurred: > javax.jms.JMSException: Slave broker out of sync with master: Dispatched > message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the > pending list > javax.jms.JMSException: Slave broker out of sync with master: Dispatched > message (ID:tbuckel-desktop-41814-1202886136210-0:0:565:1:1) was not in the > pending list > at > org.apache.activemq.broker.region.PrefetchSubscription.processMessageDispatchNotification(PrefetchSubscription.java:160) > at > org.apache.activemq.broker.region.AbstractRegion.processDispatchNotification(AbstractRegion.java:381) > at > org.apache.activemq.broker.region.RegionBroker.processDispatchNotification(RegionBroker.java:550) > at > org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201) > at > org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201) > at > org.apache.activemq.broker.BrokerFilter.processDispatchNotification(BrokerFilter.java:201) > at > org.apache.activemq.broker.MutableBrokerFilter.processDispatchNotification(MutableBrokerFilter.java:211) > at > org.apache.activemq.broker.TransportConnection.processMessageDispatchNotification(TransportConnection.java:450) > at > org.apache.activemq.command.MessageDispatchNotification.visit(MessageDispatchNotification.java:77) > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) > at > org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202) > at > org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) > at > org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) > - After having killed the master, stopped the slave, copied the slave's data > into the master's data directory various error message came up (as described > in the Master/Slave recovery section), e.g. (internal) ActiveMQ topics were > not available, the admin webApp showed exceptions and errors on the client. > The test I've created uses Spring 2.0.x and pumps 1000 MapMessages in a queue > through Spring's JmsTempate, each message is created within its own > transaction, using JmsTransactionManager and TransactionTemplate. > The created messages are consumed by an initially instantiated transactional > DefaultMessageListenerContainer. The AMQ JARs in the test's classpath are > activemq-core-5.0.0.jar, geronimo-jms_1.1_spec-1.0.jar, > geronimo-jta_1.0.1B_spec-1.0.jar as I've noticed a really bad performance > when only using the activemq-all-5.0.0.jar (maybe this is the problem?). > The test code work's without problems with OpenMQ, but I'd prefer using the > nice Pure Master/Active ActiveMQ if I can get it running in a *stable* config > ;) > I would highly appreciate any help or suggestions. Maybe my config is wrong > or I miss something essential. I've also tried a recent AMQ 5.1 SNAPSHOT > which wasn't better... > See below for the small program i used to test (no unit test, behaviour > appeared to be non deterministic to me and it's not so nice as i've changed > it quite often) > Thanks in advance, > Thomas > <!-- MASTER config --> > <beans > xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.org/config/1.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.org/config/1.0 > http://activemq.apache.org/schema/activemq-core.xsd > http://activemq.apache.org/camel/schema/spring > http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> > <bean > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> > <broker xmlns="http://activemq.org/config/1.0" brokerName="master" > dataDirectory="${activemq.base}/data"> > <destinationPolicy> > <policyMap> > <policyEntries> > <policyEntry topic="FOO.>" producerFlowControl="false" > memoryLimit="1mb"> > <dispatchPolicy> > <strictOrderDispatchPolicy/> > </dispatchPolicy> > <subscriptionRecoveryPolicy> > <lastImageSubscriptionRecoveryPolicy/> > </subscriptionRecoveryPolicy> > </policyEntry> > </policyEntries> > </policyMap> > </destinationPolicy> > <transportConnectors> > <transportConnector name="openwire" uri="tcp://tbuckel-desktop:7778" /> > </transportConnectors> > <networkConnectors/> > <managementContext> > <managementContext connectorPort="1100" > jmxDomainName="org.apache.activemq"/> > </managementContext> > </broker> > <commandAgent xmlns="http://activemq.org/config/1.0"/> > <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> > <connectors> > <nioConnector port="8161" /> > </connectors> > <handlers> > <webAppContext contextPath="/admin" > resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" /> > </handlers> > </jetty> > </beans> > <!-- SLAVE config --> > <beans > xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.org/config/1.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.org/config/1.0 > http://activemq.apache.org/schema/activemq-core.xsd > http://activemq.apache.org/camel/schema/spring > http://activemq.apache.org/camel/schema/spring/camel-spring.xsd"> > <bean > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/> > > <broker xmlns="http://activemq.org/config/1.0" brokerName="slave" > dataDirectory="${activemq.base}/data-slave" > masterConnectorURI="tcp://tbuckel-desktop:7778"> > > <destinationPolicy> > <policyMap> > <policyEntries> > <policyEntry topic="FOO.>" producerFlowControl="false" > memoryLimit="1mb"> > <dispatchPolicy> > <strictOrderDispatchPolicy/> > </dispatchPolicy> > <subscriptionRecoveryPolicy> > <lastImageSubscriptionRecoveryPolicy/> > </subscriptionRecoveryPolicy> > </policyEntry> > </policyEntries> > </policyMap> > </destinationPolicy> > <transportConnectors> > <transportConnector name="openwire" uri="tcp://localhost:7779"/> > </transportConnectors> > <networkConnectors/> > <managementContext> > <managementContext connectorPort="1101" > jmxDomainName="org.apache.activemq"/> > </managementContext> > </broker> > <commandAgent xmlns="http://activemq.org/config/1.0"/> > <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> > <connectors> > <nioConnector port="8162" /> > </connectors> > <handlers> > <webAppContext contextPath="/admin" > resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" /> > </handlers> > </jetty> > </beans> > ------------ > Test code: > import org.apache.activemq.ActiveMQConnectionFactory; > import org.springframework.jms.connection.JmsTransactionManager; > import > org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.MessageCreator; > import org.springframework.jms.listener.DefaultMessageListenerContainer; > import org.springframework.transaction.TransactionStatus; > import > org.springframework.transaction.support.TransactionCallbackWithoutResult; > import org.springframework.transaction.support.TransactionTemplate; > import javax.jms.*; > import java.math.BigInteger; > import java.util.ArrayList; > import java.util.List; > import java.util.concurrent.TimeUnit; > public class AnotherFailoverTest { > public static final int MESSAGES = 1000; > private final static List<BigInteger> notConsumedMessages = new > ArrayList<BigInteger>(MESSAGES); > private static ConnectionFactory createCF() throws Exception { > ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); > > cf.setBrokerURL("failover://(tcp://localhost:7778,tcp://localhost:7779)?randomize=false"); > return new TransactionAwareConnectionFactoryProxy(cf); > } > private static void send() throws Exception { > JmsTransactionManager transactionManager = new > JmsTransactionManager(); > transactionManager.setConnectionFactory(createCF()); > transactionManager.afterPropertiesSet(); > int i=0; > do { > i++; > final int number = i; > try { > final BigInteger v = new BigInteger(Integer.toString(number)); > TransactionTemplate tt = new > TransactionTemplate(transactionManager); > tt.execute(new TransactionCallbackWithoutResult() { > protected void > doInTransactionWithoutResult(TransactionStatus status) { > final JmsTemplate template = new JmsTemplate(pcf); > template.setSessionTransacted(true); > template.afterPropertiesSet(); > template.send("testqueue", new MessageCreator() { > public Message createMessage(Session session) > throws JMSException { > ObjectMessage dummyMessage = > session.createObjectMessage(); > dummyMessage.setObject(v); > synchronized (notConsumedMessages) { > notConsumedMessages.add(v); > } > // System.out.println("Created message " + > number + "(" + notConsumedMessages.size() + ")"); > return dummyMessage; > } > }); > } > }); > } catch (Exception e) { > e.printStackTrace(); > System.out.println("Error creating message " + number); > } > } while (i < MESSAGES); > } > private static void setupReceiver() throws Exception { > JmsTransactionManager transactionManager = new > JmsTransactionManager(); > transactionManager.setConnectionFactory(createCF()); > transactionManager.afterPropertiesSet(); > final DefaultMessageListenerContainer container = new > DefaultMessageListenerContainer(); > container.setConnectionFactory(pcf); > container.setTransactionManager(transactionManager); > container.setMessageListener(new MessageListener() { > public void onMessage(Message message) { > try { > ObjectMessage msg = (ObjectMessage) message; > BigInteger number = (BigInteger) msg.getObject(); > synchronized (notConsumedMessages) { > if (!notConsumedMessages.remove(number)) { > System.err.println("Message " + number + " not > found in list!"); > } else { > // System.out.println("Consumed message " + number); > } > } > } catch (JMSException e) { > // e.printStackTrace(); > System.out.println("Error consuming message!"); > } > } > }); > container.setSessionTransacted(true); > container.setDestinationName("testqueue"); > container.setExceptionListener(new ExceptionListener() { > public void onException(JMSException jmsException) { > System.err.println(jmsException); > } > }); > container.afterPropertiesSet(); > container.initialize(); > TimeUnit.SECONDS.sleep(1); > } > public static void main(String[] args) throws Exception { > long start = System.currentTimeMillis(); > setupReceiver(); > send(); > int remainingSize = 0; > do { > Thread.sleep(500); > synchronized (notConsumedMessages) { > remainingSize = notConsumedMessages.size(); > } > System.out.println("Unconsumed " + remainingSize + ": " + sb); > } while (remainingSize > 0); > System.out.println("All messages consumed."); > long end = System.currentTimeMillis(); > System.out.println((end-start)); > System.exit(0); > } > } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.