jbonofre commented on code in PR #1759:
URL: https://github.com/apache/activemq/pull/1759#discussion_r2931424957


##########
activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java:
##########
@@ -338,72 +364,201 @@ protected void setUp() throws Exception {
     @Override
     protected void tearDown() throws Exception {
         shutdownClients();
-        Thread.sleep(2000);
         destroyBrokerCluster();
     }
 
-    private void initSingleTcBroker(String params, String clusterFilter, 
String destinationFilter) throws Exception {
-        createBrokerA(false, params, clusterFilter, null);
-        createBrokerB(false, params, clusterFilter, null);
-        createBrokerC(false, params, clusterFilter, null);
+    private void initSingleTcBroker(final String params, final String 
clusterFilter, final String destinationFilter) throws Exception {
+        final String tcParams = (params == null) ? "" : params;
+
+        // Phase 1: Create and start all 3 brokers with transport connectors 
only
+        addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+        addTransportConnector(getBroker(BROKER_A_NAME), "openwire", 
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+        getBroker(BROKER_A_NAME).start();
+        getBroker(BROKER_A_NAME).waitUntilStarted();
+        brokerAClientAddr = 
getBroker(BROKER_A_NAME).getTransportConnectors().get(0).getPublishableConnectString();
+
+        addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+        addTransportConnector(getBroker(BROKER_B_NAME), "openwire", 
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+        getBroker(BROKER_B_NAME).start();
+        getBroker(BROKER_B_NAME).waitUntilStarted();
+        brokerBClientAddr = 
getBroker(BROKER_B_NAME).getTransportConnectors().get(0).getPublishableConnectString();
+
+        addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
+        addTransportConnector(getBroker(BROKER_C_NAME), "openwire", 
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+        getBroker(BROKER_C_NAME).start();
         getBroker(BROKER_C_NAME).waitUntilStarted();
+        brokerCClientAddr = 
getBroker(BROKER_C_NAME).getTransportConnectors().get(0).getPublishableConnectString();
+
+        // Phase 2: Add network bridges using resolved addresses and start them
+        addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", 
"static://(" + brokerBClientAddr + ")?useExponentialBackOff=false", false, 
clusterFilter);
+        addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", 
"static://(" + brokerCClientAddr + ")?useExponentialBackOff=false", false, 
null);
+
+        addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", 
"static://(" + brokerAClientAddr + ")?useExponentialBackOff=false", false, 
clusterFilter);
+        addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", 
"static://(" + brokerCClientAddr + ")?useExponentialBackOff=false", false, 
null);
+
+        addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", 
"static://(" + brokerAClientAddr + ")?useExponentialBackOff=false", false, 
clusterFilter);
+        addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", 
"static://(" + brokerBClientAddr + ")?useExponentialBackOff=false", false, 
null);
+
+        // Wait for all bridges to form
+        waitForAllBridges();
     }
 
-    private void initMultiTcCluster(String params, String clusterFilter) 
throws Exception {
-        createBrokerA(true, params, clusterFilter, null);
-        createBrokerB(true, params, clusterFilter, null);
-        createBrokerC(true, params, clusterFilter, null);
+    private void initMultiTcCluster(final String params, final String 
clusterFilter) throws Exception {
+        final String tcParams = (params == null) ? "" : params;
+
+        // Phase 1: Create and start all 3 brokers with both transport 
connectors
+        addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+        addTransportConnector(getBroker(BROKER_A_NAME), "openwire", 
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+        addTransportConnector(getBroker(BROKER_A_NAME), "network", 
EPHEMERAL_BIND_ADDRESS + tcParams, false);
+        getBroker(BROKER_A_NAME).start();
+        getBroker(BROKER_A_NAME).waitUntilStarted();
+        brokerAClientAddr = 
getBroker(BROKER_A_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+        brokerANobAddr = 
getBroker(BROKER_A_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+
+        addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+        addTransportConnector(getBroker(BROKER_B_NAME), "openwire", 
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+        addTransportConnector(getBroker(BROKER_B_NAME), "network", 
EPHEMERAL_BIND_ADDRESS + tcParams, false);
+        getBroker(BROKER_B_NAME).start();
+        getBroker(BROKER_B_NAME).waitUntilStarted();
+        brokerBClientAddr = 
getBroker(BROKER_B_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+        brokerBNobAddr = 
getBroker(BROKER_B_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+
+        addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
+        addTransportConnector(getBroker(BROKER_C_NAME), "openwire", 
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+        addTransportConnector(getBroker(BROKER_C_NAME), "network", 
EPHEMERAL_BIND_ADDRESS + tcParams, false);
+        getBroker(BROKER_C_NAME).start();
         getBroker(BROKER_C_NAME).waitUntilStarted();
+        brokerCClientAddr = 
getBroker(BROKER_C_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+        brokerCNobAddr = 
getBroker(BROKER_C_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+
+        // Phase 2: Add network bridges using network connector addresses and 
start them
+        addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", 
"static://(" + brokerBNobAddr + ")?useExponentialBackOff=false", false, 
clusterFilter);
+        addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", 
"static://(" + brokerCNobAddr + ")?useExponentialBackOff=false", false, null);
+
+        addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", 
"static://(" + brokerANobAddr + ")?useExponentialBackOff=false", false, 
clusterFilter);
+        addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", 
"static://(" + brokerCNobAddr + ")?useExponentialBackOff=false", false, null);
+
+        addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", 
"static://(" + brokerANobAddr + ")?useExponentialBackOff=false", false, 
clusterFilter);
+        addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", 
"static://(" + brokerBNobAddr + ")?useExponentialBackOff=false", false, null);
+
+        // Wait for all bridges to form
+        waitForAllBridges();
     }
 
-    private void createBrokerA(boolean multi, String params, String 
clusterFilter, String destinationFilter) throws Exception {
-        final String tcParams = (params == null)?"":params;
+    /**
+     * Creates broker A with transport connector and network bridges to B and 
C.
+     * Rebinds to the same port that was previously used (stored in 
brokerAClientAddr).
+     * Used for re-creating broker A after it has been stopped.
+     */
+    private void createBrokerA(final boolean multi, final String params, final 
String clusterFilter, final String destinationFilter) throws Exception {
+        final String tcParams = (params == null) ? "" : params;
         if (getBroker(BROKER_A_NAME) == null) {
+            final String bindAddr = extractBindAddress(brokerAClientAddr);
+
             addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
-            addTransportConnector(getBroker(BROKER_A_NAME), "openwire", 
BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
             if (multi) {
-                addTransportConnector(getBroker(BROKER_A_NAME), "network", 
BROKER_A_NOB_TC_ADDRESS + tcParams, false);
-                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", 
"static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, clusterFilter);
-                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", 
"static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, null);
+                final String nobBindAddr = extractBindAddress(brokerANobAddr);
+                addTransportConnector(getBroker(BROKER_A_NAME), "openwire", 
bindAddr + tcParams, true);
+                addTransportConnector(getBroker(BROKER_A_NAME), "network", 
nobBindAddr + tcParams, false);
             } else {
-                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", 
"static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, clusterFilter);
-                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", 
"static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, null);
+                addTransportConnector(getBroker(BROKER_A_NAME), "openwire", 
bindAddr + tcParams, true);
             }
             getBroker(BROKER_A_NAME).start();
-        }
-    }
-
-    private void createBrokerB(boolean multi, String params, String 
clusterFilter, String destinationFilter) throws Exception {
-        final String tcParams = (params == null)?"":params;
-        if (getBroker(BROKER_B_NAME) == null) {
-            addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
-            addTransportConnector(getBroker(BROKER_B_NAME), "openwire", 
BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
+            getBroker(BROKER_A_NAME).waitUntilStarted();
+            brokerAClientAddr = 
getBroker(BROKER_A_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
             if (multi) {
-                addTransportConnector(getBroker(BROKER_B_NAME), "network", 
BROKER_B_NOB_TC_ADDRESS + tcParams, false);
-                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", 
"static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, clusterFilter);
-                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", 
"static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, null);
+                brokerANobAddr = 
getBroker(BROKER_A_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+                addAndStartNetworkBridge(getBroker(BROKER_A_NAME), 
"A_2_B_Bridge", "static://(" + brokerBNobAddr + 
")?useExponentialBackOff=false", false, clusterFilter);
+                addAndStartNetworkBridge(getBroker(BROKER_A_NAME), 
"A_2_C_Bridge", "static://(" + brokerCNobAddr + 
")?useExponentialBackOff=false", false, null);
             } else {
-                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", 
"static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, clusterFilter);
-                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", 
"static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, null);
+                addAndStartNetworkBridge(getBroker(BROKER_A_NAME), 
"A_2_B_Bridge", "static://(" + brokerBClientAddr + 
")?useExponentialBackOff=false", false, clusterFilter);
+                addAndStartNetworkBridge(getBroker(BROKER_A_NAME), 
"A_2_C_Bridge", "static://(" + brokerCClientAddr + 
")?useExponentialBackOff=false", false, null);
             }
-            getBroker(BROKER_B_NAME).start();
         }
     }
 
-    private void createBrokerC(boolean multi, String params, String 
clusterFilter, String destinationFilter) throws Exception {
-        final String tcParams = (params == null)?"":params;
+    /**
+     * Creates broker C with transport connector and network bridges to A and 
B.
+     * Rebinds to the same port that was previously used (stored in 
brokerCClientAddr).
+     * Used for re-creating broker C after it has been stopped.
+     */
+    private void createBrokerC(final boolean multi, final String params, final 
String clusterFilter, final String destinationFilter) throws Exception {
+        final String tcParams = (params == null) ? "" : params;
         if (getBroker(BROKER_C_NAME) == null) {
+            final String bindAddr = extractBindAddress(brokerCClientAddr);
+
             addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
-            addTransportConnector(getBroker(BROKER_C_NAME), "openwire", 
BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
             if (multi) {
-                addTransportConnector(getBroker(BROKER_C_NAME), "network", 
BROKER_C_NOB_TC_ADDRESS + tcParams, false);
-                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", 
"static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, clusterFilter);
-                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", 
"static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, null);
+                final String nobBindAddr = extractBindAddress(brokerCNobAddr);
+                addTransportConnector(getBroker(BROKER_C_NAME), "openwire", 
bindAddr + tcParams, true);
+                addTransportConnector(getBroker(BROKER_C_NAME), "network", 
nobBindAddr + tcParams, false);
             } else {
-                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", 
"static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, clusterFilter);
-                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", 
"static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", 
false, null);
+                addTransportConnector(getBroker(BROKER_C_NAME), "openwire", 
bindAddr + tcParams, true);
             }
             getBroker(BROKER_C_NAME).start();
+            getBroker(BROKER_C_NAME).waitUntilStarted();
+            brokerCClientAddr = 
getBroker(BROKER_C_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+            if (multi) {
+                brokerCNobAddr = 
getBroker(BROKER_C_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+                addAndStartNetworkBridge(getBroker(BROKER_C_NAME), 
"C_2_A_Bridge", "static://(" + brokerANobAddr + 
")?useExponentialBackOff=false", false, clusterFilter);
+                addAndStartNetworkBridge(getBroker(BROKER_C_NAME), 
"C_2_B_Bridge", "static://(" + brokerBNobAddr + 
")?useExponentialBackOff=false", false, null);
+            } else {
+                addAndStartNetworkBridge(getBroker(BROKER_C_NAME), 
"C_2_A_Bridge", "static://(" + brokerAClientAddr + 
")?useExponentialBackOff=false", false, clusterFilter);
+                addAndStartNetworkBridge(getBroker(BROKER_C_NAME), 
"C_2_B_Bridge", "static://(" + brokerBClientAddr + 
")?useExponentialBackOff=false", false, null);
+            }
+        }
+    }
+
+    /**
+     * Extracts the bind address (tcp://host:port) from a publishable connect 
string.
+     */
+    private static String extractBindAddress(final String publishableAddr) 
throws Exception {
+        final URI uri = new URI(publishableAddr);
+        return "tcp://127.0.0.1:" + uri.getPort();
+    }
+
+    /**
+     * Adds a network bridge to a broker and starts it immediately.
+     * This is used when the broker is already running.
+     */
+    private void addAndStartNetworkBridge(final BrokerService broker, final 
String bridgeName,
+                                          final String uri, final boolean 
duplex, final String destinationFilter) throws Exception {
+        final NetworkConnector network = broker.addNetworkConnector(uri);
+        network.setName(bridgeName);
+        network.setDuplex(duplex);
+        if (destinationFilter != null && !destinationFilter.isEmpty()) {
+            network.setDestinationFilter(bridgeName);

Review Comment:
   I think it should be `destinationFilter` here as argument (not `bridgeName`) 
?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to