[
https://issues.apache.org/jira/browse/AMQ-5150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timothy Bish closed AMQ-5150.
-----------------------------
Resolution: Not a Problem
The is expected behaviour with failover. You need to ensure you application
keeps it's main thread open.
> ActiveMQ failover seems not to work in 5.9.1 on MacOSX
> ------------------------------------------------------
>
> Key: AMQ-5150
> URL: https://issues.apache.org/jira/browse/AMQ-5150
> Project: ActiveMQ
> Issue Type: Bug
> Components: Connector
> Affects Versions: 5.9.1
> Environment: MacOSx
> Reporter: Ihor Mochurad
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> I have super simple scenario: one broker and one consumer with durable
> subscription.
> This is the code of my consumer app:
> package test;
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.jms.Topic;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> import pojo.Event;
> import pojo.StockUpdate;
>
> public class Consumer
> {
>
> private static transient ConnectionFactory factory;
> private transient Connection connection;
> private transient Session session;
> public static int counter = 0;
>
> public Consumer(String brokerURL) throws JMSException
> {
> factory = new ActiveMQConnectionFactory(brokerURL);
> connection = factory.createConnection();
> connection.setClientID("CLUSTER_CLIENT_1");
> connection.start();
> session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> }
>
> public void close() throws JMSException
> {
> if (connection != null)
> {
> connection.close();
> }
> }
>
> public static void main(String[] args) throws JMSException
> {
>
> try
> {
> // extract topics from the rest of arguments
> String[] topics = new String[2];
> topics[0] = "CSCO";
> topics[1] = "ORCL";
>
> // define connection URI
> Consumer consumer = new
> Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true");
>
> for (String stock : topics)
> {
> try
> {
> Destination destination =
> consumer.getSession().createTopic("STOCKS." + stock);
> // consumer.getSession().
> MessageConsumer messageConsumer =
> consumer.getSession().createDurableSubscriber((Topic) destination,
> "STOCKS_DURABLE_CONSUMER_" + stock);
> messageConsumer.setMessageListener(new Listener());
> }
> catch (JMSException e)
> {
> e.printStackTrace();
> }
> }
> }
> catch (Throwable t)
> {
> t.printStackTrace();
> }
>
> }
>
> public Session getSession()
> {
> return session;
> }
>
> }
>
> class Listener implements MessageListener
> {
>
> public void onMessage(Message message)
> {
> try
> {
> TextMessage textMessage = (TextMessage) message;
> String json = textMessage.getText();
> Event event = StockUpdate.fromJSON(json, StockUpdate.class);
> System.out.println("Consumed message #:" + ++Consumer.counter
> + "\n" + event);
> }
> catch (Exception e)
> {
> e.printStackTrace();
> }
> }
>
> }
> Here is my activemq.xml
> <beans
> xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>
> <!-- Allows us to use system properties as variables in this
> configuration file -->
> <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
> <property name="locations">
> <value>file:${activemq.conf}/credentials.properties</value>
> </property>
> </bean>
>
> <!--
> The <broker> element is used to configure the ActiveMQ broker.
> -->
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="R6_cluster_broker1" persistent="true">
>
> <networkConnectors>
> <networkConnector
> uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/>
> </networkConnectors>
>
> <destinationPolicy>
> <policyMap>
> <policyEntries>
> <policyEntry topic=">" >
> <!-- The constantPendingMessageLimitStrategy is used
> to prevent
> slow topic consumers to block producers and
> affect other consumers
> by limiting the number of messages that are
> retained
> For more information, see:
>
>
> http://activemq.apache.org/slow-consumer-handling.html
>
> -->
> <pendingMessageLimitStrategy>
> <constantPendingMessageLimitStrategy limit="1000"/>
> </pendingMessageLimitStrategy>
> </policyEntry>
> </policyEntries>
> </policyMap>
> </destinationPolicy>
>
>
> <!--
> The managementContext is used to configure how ActiveMQ is
> exposed in
> JMX. By default, ActiveMQ uses the MBean server that is
> started by
> the JVM. For more information, see:
>
> http://activemq.apache.org/jmx.html
> -->
> <managementContext>
> <managementContext createConnector="false"/>
> </managementContext>
>
> <!--
> Configure message persistence for the broker. The default
> persistence
> mechanism is the KahaDB store (identified by the kahaDB tag).
> For more information, see:
>
> http://activemq.apache.org/persistence.html
> -->
> <persistenceAdapter>
> <kahaDB directory="/work/temp/kahadb"/>
> </persistenceAdapter>
>
>
> <!--
> The systemUsage controls the maximum amount of space the
> broker will
> use before disabling caching and/or slowing down producers.
> For more information, see:
> http://activemq.apache.org/producer-flow-control.html
> -->
> <systemUsage>
> <systemUsage>
> <memoryUsage>
> <memoryUsage percentOfJvmHeap="70" />
> </memoryUsage>
> <storeUsage>
> <storeUsage limit="100 gb"/>
> </storeUsage>
> <tempUsage>
> <tempUsage limit="50 gb"/>
> </tempUsage>
> </systemUsage>
> </systemUsage>
>
> <!--
> The transport connectors expose ActiveMQ over a given
> protocol to
> clients and other brokers. For more information, see:
>
> http://activemq.apache.org/configuring-transports.html
> -->
> <transportConnectors>
> <!-- DOS protection, limit concurrent connections to 1000 and
> frame size to 100MB -->
> <transportConnector name="openwire"
> uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
> <!-- <transportConnector name="amqp"
> uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
> <transportConnector name="stomp"
> uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
> <transportConnector name="mqtt"
> uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
> <transportConnector name="ws"
> uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
> -->
> </transportConnectors>
>
> <!-- destroy the spring context on shutdown to stop jetty -->
> <shutdownHooks>
> <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook" />
> </shutdownHooks>
>
> </broker>
>
> <!--
> Enable web consoles, REST and Ajax APIs and demos
> The web consoles requires by default login, you can disable this
> in the jetty.xml file
>
> Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
> -->
> <import resource="jetty.xml"/>
>
> </beans>
> When I have both broker and consumer running and then stop the broker my
> consumer exits few moments after. As far I understood it must attempt to
> reconnect, but it is not the case. What am I doing wrong, please advise.
> !NOTE! I launch my consumer in Eclipse, i do not build a standalone jar for
> this task.
> I have updated my broker to the latest 5.9.1 and did the same to my consumer.
> Result is the same - after I stop the broker my consumer dies few seconds
> after. It works fine if broker is up and running.
--
This message was sent by Atlassian JIRA
(v6.2#6252)