[ 
https://issues.apache.org/jira/browse/AMQ-3213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13005203#comment-13005203
 ] 

Gary Tully commented on AMQ-3213:
---------------------------------

There were some fixes in this area w.r.t to duplicate network connection 
detection. One of the prerequisites is that the brokerId be specified for the 
broker to ensure it is maintained after a restart. The broker id and 
networkConnector name(if there is more than one duplex network connector 
configured (for 5.5) are use to identify duplicate network connectors and pull 
down old sate that can be pending with a temp network partition or fast restart.
Can you validate a 5.5-SNAPSHOT? But first, try setting a brokerId, use the 
same value as your brokerName. 

> failover doesn't reconnect after broker restart
> -----------------------------------------------
>
>                 Key: AMQ-3213
>                 URL: https://issues.apache.org/jira/browse/AMQ-3213
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>         Environment: Windows 7 x64, jdk 1.6.0_21
>            Reporter: Ivan Shcheklein
>
> I have a network of three brokers: two embedded (publisher and subscriber) 
> and one stand alone remote. Embedded brokers connect to the remote with 
> duplex static failover connections. Everything works fine if remote broker is 
> not restarted. After the remote broker is restarted embedded brokers fail to 
> reestablish duplex bridges. Subscriber doesn't receive any messages and in 
> logs I have:
> *Subscriber worker:*
> {code}
> Received 14-th message.
> Received 15-th message.
> Received 16-th message.
> 2011:03:09 18:19:08,276 [WARN ] 
> org.apache.activemq.transport.failover.FailoverTransport - Transport 
> (localhost/127.0.0.1:61616) failed to tcp://localhost:61616 , attempting to 
> automatically reconnect due to: java.net.SocketException: Connection reset
> 2011:03:09 18:19:08,276 [INFO ] 
> org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound 
> transport to remote interrupted.
> 2011:03:09 18:19:22,426 [INFO ] 
> org.apache.activemq.network.DemandForwardingBridgeSupport - Network 
> connection between vm://local1#0 and tcp://localhost:61616(remote) has been 
> established.
> 2011:03:09 18:19:22,429 [INFO ] 
> org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound 
> transport to remote resumed
> 2011:03:09 18:19:22,429 [INFO ] 
> org.apache.activemq.transport.failover.FailoverTransport - Successfully 
> reconnected to tcp://localhost:61616
> 2011:03:09 18:19:22,455 [WARN ] 
> org.apache.activemq.broker.TransportConnection - Unexpected extra broker info 
> command received: BrokerInfo {commandId = 1929, responseRequired = false, 
> brokerId = ID:Air-17262-1299683959982-0:1, brokerURL = 
> tcp://validation.sls.microsoft.com:61616, slaveBroker = false, masterBroker = 
> false, faultTolerantConfiguration = false, networkConnection = false, 
> duplexConnection = false, peerBrokerInfos = [], brokerName = remote, 
> connectionId = 0, brokerUploadUrl = null, networkProperties = null}
> {code}
> *Publisher worker:*
> {code}
> Sending 28-th message
> Sending 29-th message
> Sending 30-th message
> 2011:03:09 18:19:22,430 [INFO ] 
> org.apache.activemq.network.DemandForwardingBridgeSupport - Network 
> connection between vm://local#0 and tcp://localhost:61616(remote) has been 
> established.
> 2011:03:09 18:19:22,435 [INFO ] 
> org.apache.activemq.network.DemandForwardingBridgeSupport - Outbound 
> transport to remote resumed
> 2011:03:09 18:19:22,435 [INFO ] 
> org.apache.activemq.transport.failover.FailoverTransport - Successfully 
> reconnected to tcp://localhost:61616
> 2011:03:09 18:19:22,469 [WARN ] 
> org.apache.activemq.broker.TransportConnection - Unexpected extra broker info 
> command received: BrokerInfo {commandId = 1911, responseRequired = false, 
> brokerId = ID:Air-17262-1299683959982-0:1, brokerURL = 
> tcp://validation.sls.microsoft.com:61616, slaveBroker = false, masterBroker = 
> false, faultTolerantConfiguration = false, networkConnection = false, 
> duplexConnection = false, peerBrokerInfos = [], brokerName = remote, 
> connectionId = 0, brokerUploadUrl = null, networkProperties = null}
> Sending 31-th message
> Sending 32-th message
> Sending 33-th message
> {code}
> *Sample code to reproduce this issue:*
> {code}
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.broker.TransportConnector;
> import org.apache.activemq.network.NetworkConnector;
> import javax.jms.*;
> import java.net.URI;
> public class JmsTester implements MessageListener {
>     private static final int size = 256;
>     private static byte[] payload;
>     static {
>         char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
>         payload = new byte[size];
>         for (int i = 0; i < size; i++) {
>             payload[i] = (byte)DATA[i % DATA.length];
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         if(System.getProperty("jms.mode").equals("publisher"))
>             publisher();
>         else if(System.getProperty("jms.mode").equals("broker"))    
>             broker();
>         else
>             subscriber();
>     }
>     private static void publisher() throws Exception {
>         System.out.println("Starting publisher ...");
>         BrokerService broker = new BrokerService();
>         broker.setBrokerName("local");
>         broker.setUseJmx(true);
>         broker.setPersistent(true);
>         NetworkConnector nc = 
> broker.addNetworkConnector("static:(failover:(tcp://localhost:61616))");
>         nc.setDuplex(true);
>         nc.setNetworkTTL(4);
>         broker.start();
>         ActiveMQConnectionFactory factory = new 
> ActiveMQConnectionFactory("vm://local");
>         Connection connection = factory.createConnection();
>         Session session = connection.createSession(false, 
> Session.DUPS_OK_ACKNOWLEDGE);
>         Topic topic = session.createTopic("topictest.messages");
>         MessageProducer publisher = session.createProducer(topic);
>         publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
>         for (int i=0; i<1000; i++) {
>             Thread.sleep(1000);
>             System.out.println("Sending " + i + "-th message");
>             BytesMessage msg = session.createBytesMessage();
>             msg.setIntProperty("count", i);
>             msg.writeBytes(payload);
>             publisher.send(msg);
>         }
>         broker.stop();
>     }
>     private static void broker() throws Exception {
>         System.out.println("Starting broker ...");
>         BrokerService broker = new BrokerService();
>         broker.setPersistent(true);
>         broker.setBrokerName("remote");
>         broker.setUseJmx(true);
>         TransportConnector connector = new TransportConnector();
>         connector.setUri(new URI("tcp://localhost:61616"));
>         
>         broker.addConnector(connector);
>         broker.start();
>         while(true) { }
>     }
>     private static void subscriber() throws Exception {
>         System.out.println("Starting subscriber ...");
>         BrokerService broker = new BrokerService();
>         broker.setBrokerName("local1");
>         broker.setUseJmx(true);
>         broker.setPersistent(true);
>         NetworkConnector nc = 
> broker.addNetworkConnector("static:(failover:(tcp://localhost:61616))");
>         nc.setDuplex(true);
>         nc.setNetworkTTL(4);
>         broker.start();
>         ActiveMQConnectionFactory factory = new 
> ActiveMQConnectionFactory("vm://local1");
>         Connection connection = factory.createConnection();
>         connection.setClientID("subscriber2");
>         Session session = connection.createSession(false, 
> Session.DUPS_OK_ACKNOWLEDGE);
>         Topic topic = session.createTopic("topictest.messages");
>         MessageConsumer consumer = session.createDurableSubscriber(topic, 
> "subscriber2");
>         JmsTester tester = new JmsTester();
>         consumer.setMessageListener(tester);
>         connection.start();
>         while(true) {
>             Thread.sleep(1000);
>         }
>     }
>     @Override
>     public void onMessage(Message message) {
>         try {
>             int count = message.getIntProperty("count");
>             System.out.println("Received " + count + "-th message.");
>         } catch (JMSException e) {
>             e.printStackTrace();
>         }
>     }
> }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to