failover doesn't reconnect after broker restart
-----------------------------------------------

                 Key: AMQ-3213
                 URL: https://issues.apache.org/jira/browse/AMQ-3213
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.4.2
         Environment: Windows 7 x64, jdk 1.6.0_21
            Reporter: Ivan Shcheklein


I have a network of three brokers: two embedded (publisher and subscriber) and 
one stand alone remote. Embedded brokers connect to the remote with duplex 
static failover connections. Everything works fine if remote broker is not 
restarted. After the remote broker is restarted embedded brokers fail to 
reestablish duplex bridges. Subscriber doesn't receive any messages and in logs 
I have:

*Subscriber worker:*

{code}
Received 14-th message.
Received 15-th message.
Received 16-th message.
2011:03:09 18:19:08,276 [WARN ] 
org.apache.activemq.transport.failover.FailoverTransport - Transport 
(localhost/127.0.0.1:61616) failed to tcp://localhost:61616 , attempting to 
automatically reconnect due to: java.net.SocketException: Connection reset
2011:03:09 18:19:08,276 [INFO ] 
org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound transport 
to remote interrupted.
2011:03:09 18:19:22,426 [INFO ] 
org.apache.activemq.network.DemandForwardingBridgeSupport - Network connection 
between vm://local1#0 and tcp://localhost:61616(remote) has been established.
2011:03:09 18:19:22,429 [INFO ] 
org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound transport 
to remote resumed
2011:03:09 18:19:22,429 [INFO ] 
org.apache.activemq.transport.failover.FailoverTransport - Successfully 
reconnected to tcp://localhost:61616
2011:03:09 18:19:22,455 [WARN ] org.apache.activemq.broker.TransportConnection 
- Unexpected extra broker info command received: BrokerInfo {commandId = 1929, 
responseRequired = false, brokerId = ID:Air-17262-1299683959982-0:1, brokerURL 
= tcp://validation.sls.microsoft.com:61616, slaveBroker = false, masterBroker = 
false, faultTolerantConfiguration = false, networkConnection = false, 
duplexConnection = false, peerBrokerInfos = [], brokerName = remote, 
connectionId = 0, brokerUploadUrl = null, networkProperties = null}
{code}


*Publisher worker:*

{code}
Sending 28-th message
Sending 29-th message
Sending 30-th message
2011:03:09 18:19:22,430 [INFO ] 
org.apache.activemq.network.DemandForwardingBridgeSupport - Network connection 
between vm://local#0 and tcp://localhost:61616(remote) has been established.
2011:03:09 18:19:22,435 [INFO ] 
org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound transport 
to remote resumed
2011:03:09 18:19:22,435 [INFO ] 
org.apache.activemq.transport.failover.FailoverTransport - Successfully 
reconnected to tcp://localhost:61616
2011:03:09 18:19:22,469 [WARN ] org.apache.activemq.broker.TransportConnection 
- Unexpected extra broker info command received: BrokerInfo {commandId = 1911, 
responseRequired = false, brokerId = ID:Air-17262-1299683959982-0:1, brokerURL 
= tcp://validation.sls.microsoft.com:61616, slaveBroker = false, masterBroker = 
false, faultTolerantConfiguration = false, networkConnection = false, 
duplexConnection = false, peerBrokerInfos = [], brokerName = remote, 
connectionId = 0, brokerUploadUrl = null, networkProperties = null}
Sending 31-th message
Sending 32-th message
Sending 33-th message
{code}

*Sample code to reproduce this issue:*

{code}
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;

import javax.jms.*;
import java.net.URI;


public class JmsTester implements MessageListener {

    private static final int size = 256;
    private static byte[] payload;

    static {
        char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
        payload = new byte[size];
        for (int i = 0; i < size; i++) {
            payload[i] = (byte)DATA[i % DATA.length];
        }
    }

    public static void main(String[] args) throws Exception {
        if(System.getProperty("jms.mode").equals("publisher"))
            publisher();
        else if(System.getProperty("jms.mode").equals("broker"))    
            broker();
        else
            subscriber();
    }

    private static void publisher() throws Exception {
        System.out.println("Starting publisher ...");
        BrokerService broker = new BrokerService();
        broker.setBrokerName("local");
        broker.setUseJmx(true);
        broker.setPersistent(true);
        NetworkConnector nc = 
broker.addNetworkConnector("static:(failover:(tcp://localhost:61616))");
        nc.setDuplex(true);
        nc.setNetworkTTL(4);
        broker.start();

        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://local");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);
        Topic topic = session.createTopic("topictest.messages");

        MessageProducer publisher = session.createProducer(topic);
        publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

        for (int i=0; i<1000; i++) {
            Thread.sleep(1000);
            System.out.println("Sending " + i + "-th message");
            BytesMessage msg = session.createBytesMessage();
            msg.setIntProperty("count", i);
            msg.writeBytes(payload);
            publisher.send(msg);
        }

        broker.stop();
    }

    private static void broker() throws Exception {
        System.out.println("Starting broker ...");
        BrokerService broker = new BrokerService();
        broker.setPersistent(true);
        broker.setBrokerName("remote");
        broker.setUseJmx(true);
        TransportConnector connector = new TransportConnector();
        connector.setUri(new URI("tcp://localhost:61616"));
        
        broker.addConnector(connector);
        broker.start();
        while(true) { }
    }

    private static void subscriber() throws Exception {
        System.out.println("Starting subscriber ...");

        BrokerService broker = new BrokerService();
        broker.setBrokerName("local1");
        broker.setUseJmx(true);
        broker.setPersistent(true);
        NetworkConnector nc = 
broker.addNetworkConnector("static:(failover:(tcp://localhost:61616))");
        nc.setDuplex(true);
        nc.setNetworkTTL(4);
        broker.start();

        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://local1");
        Connection connection = factory.createConnection();
        connection.setClientID("subscriber2");
        Session session = connection.createSession(false, 
Session.DUPS_OK_ACKNOWLEDGE);

        Topic topic = session.createTopic("topictest.messages");

        MessageConsumer consumer = session.createDurableSubscriber(topic, 
"subscriber2");
        JmsTester tester = new JmsTester();
        consumer.setMessageListener(tester);
        connection.start();
        while(true) {
            Thread.sleep(1000);
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            int count = message.getIntProperty("count");
            System.out.println("Received " + count + "-th message.");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
{code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to