jeanouii commented on code in PR #1759:
URL: https://github.com/apache/activemq/pull/1759#discussion_r2931587254
##########
activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java:
##########
@@ -338,72 +364,201 @@ protected void setUp() throws Exception {
@Override
protected void tearDown() throws Exception {
shutdownClients();
- Thread.sleep(2000);
destroyBrokerCluster();
}
- private void initSingleTcBroker(String params, String clusterFilter,
String destinationFilter) throws Exception {
- createBrokerA(false, params, clusterFilter, null);
- createBrokerB(false, params, clusterFilter, null);
- createBrokerC(false, params, clusterFilter, null);
+ private void initSingleTcBroker(final String params, final String
clusterFilter, final String destinationFilter) throws Exception {
+ final String tcParams = (params == null) ? "" : params;
+
+ // Phase 1: Create and start all 3 brokers with transport connectors
only
+ addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+ addTransportConnector(getBroker(BROKER_A_NAME), "openwire",
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+ getBroker(BROKER_A_NAME).start();
+ getBroker(BROKER_A_NAME).waitUntilStarted();
+ brokerAClientAddr =
getBroker(BROKER_A_NAME).getTransportConnectors().get(0).getPublishableConnectString();
+
+ addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+ addTransportConnector(getBroker(BROKER_B_NAME), "openwire",
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+ getBroker(BROKER_B_NAME).start();
+ getBroker(BROKER_B_NAME).waitUntilStarted();
+ brokerBClientAddr =
getBroker(BROKER_B_NAME).getTransportConnectors().get(0).getPublishableConnectString();
+
+ addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
+ addTransportConnector(getBroker(BROKER_C_NAME), "openwire",
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+ getBroker(BROKER_C_NAME).start();
getBroker(BROKER_C_NAME).waitUntilStarted();
+ brokerCClientAddr =
getBroker(BROKER_C_NAME).getTransportConnectors().get(0).getPublishableConnectString();
+
+ // Phase 2: Add network bridges using resolved addresses and start them
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge",
"static://(" + brokerBClientAddr + ")?useExponentialBackOff=false", false,
clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge",
"static://(" + brokerCClientAddr + ")?useExponentialBackOff=false", false,
null);
+
+ addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge",
"static://(" + brokerAClientAddr + ")?useExponentialBackOff=false", false,
clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge",
"static://(" + brokerCClientAddr + ")?useExponentialBackOff=false", false,
null);
+
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge",
"static://(" + brokerAClientAddr + ")?useExponentialBackOff=false", false,
clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge",
"static://(" + brokerBClientAddr + ")?useExponentialBackOff=false", false,
null);
+
+ // Wait for all bridges to form
+ waitForAllBridges();
}
- private void initMultiTcCluster(String params, String clusterFilter)
throws Exception {
- createBrokerA(true, params, clusterFilter, null);
- createBrokerB(true, params, clusterFilter, null);
- createBrokerC(true, params, clusterFilter, null);
+ private void initMultiTcCluster(final String params, final String
clusterFilter) throws Exception {
+ final String tcParams = (params == null) ? "" : params;
+
+ // Phase 1: Create and start all 3 brokers with both transport
connectors
+ addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+ addTransportConnector(getBroker(BROKER_A_NAME), "openwire",
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+ addTransportConnector(getBroker(BROKER_A_NAME), "network",
EPHEMERAL_BIND_ADDRESS + tcParams, false);
+ getBroker(BROKER_A_NAME).start();
+ getBroker(BROKER_A_NAME).waitUntilStarted();
+ brokerAClientAddr =
getBroker(BROKER_A_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+ brokerANobAddr =
getBroker(BROKER_A_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+
+ addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+ addTransportConnector(getBroker(BROKER_B_NAME), "openwire",
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+ addTransportConnector(getBroker(BROKER_B_NAME), "network",
EPHEMERAL_BIND_ADDRESS + tcParams, false);
+ getBroker(BROKER_B_NAME).start();
+ getBroker(BROKER_B_NAME).waitUntilStarted();
+ brokerBClientAddr =
getBroker(BROKER_B_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+ brokerBNobAddr =
getBroker(BROKER_B_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+
+ addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
+ addTransportConnector(getBroker(BROKER_C_NAME), "openwire",
EPHEMERAL_BIND_ADDRESS + tcParams, true);
+ addTransportConnector(getBroker(BROKER_C_NAME), "network",
EPHEMERAL_BIND_ADDRESS + tcParams, false);
+ getBroker(BROKER_C_NAME).start();
getBroker(BROKER_C_NAME).waitUntilStarted();
+ brokerCClientAddr =
getBroker(BROKER_C_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+ brokerCNobAddr =
getBroker(BROKER_C_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+
+ // Phase 2: Add network bridges using network connector addresses and
start them
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge",
"static://(" + brokerBNobAddr + ")?useExponentialBackOff=false", false,
clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge",
"static://(" + brokerCNobAddr + ")?useExponentialBackOff=false", false, null);
+
+ addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge",
"static://(" + brokerANobAddr + ")?useExponentialBackOff=false", false,
clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge",
"static://(" + brokerCNobAddr + ")?useExponentialBackOff=false", false, null);
+
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge",
"static://(" + brokerANobAddr + ")?useExponentialBackOff=false", false,
clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge",
"static://(" + brokerBNobAddr + ")?useExponentialBackOff=false", false, null);
+
+ // Wait for all bridges to form
+ waitForAllBridges();
}
- private void createBrokerA(boolean multi, String params, String
clusterFilter, String destinationFilter) throws Exception {
- final String tcParams = (params == null)?"":params;
+ /**
+ * Creates broker A with transport connector and network bridges to B and
C.
+ * Rebinds to the same port that was previously used (stored in
brokerAClientAddr).
+ * Used for re-creating broker A after it has been stopped.
+ */
+ private void createBrokerA(final boolean multi, final String params, final
String clusterFilter, final String destinationFilter) throws Exception {
+ final String tcParams = (params == null) ? "" : params;
if (getBroker(BROKER_A_NAME) == null) {
+ final String bindAddr = extractBindAddress(brokerAClientAddr);
+
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- addTransportConnector(getBroker(BROKER_A_NAME), "openwire",
BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
if (multi) {
- addTransportConnector(getBroker(BROKER_A_NAME), "network",
BROKER_A_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge",
"static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false",
false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge",
"static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false",
false, null);
+ final String nobBindAddr = extractBindAddress(brokerANobAddr);
+ addTransportConnector(getBroker(BROKER_A_NAME), "openwire",
bindAddr + tcParams, true);
+ addTransportConnector(getBroker(BROKER_A_NAME), "network",
nobBindAddr + tcParams, false);
} else {
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge",
"static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false",
false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge",
"static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false",
false, null);
+ addTransportConnector(getBroker(BROKER_A_NAME), "openwire",
bindAddr + tcParams, true);
}
getBroker(BROKER_A_NAME).start();
- }
- }
-
- private void createBrokerB(boolean multi, String params, String
clusterFilter, String destinationFilter) throws Exception {
- final String tcParams = (params == null)?"":params;
- if (getBroker(BROKER_B_NAME) == null) {
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- addTransportConnector(getBroker(BROKER_B_NAME), "openwire",
BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
+ getBroker(BROKER_A_NAME).waitUntilStarted();
+ brokerAClientAddr =
getBroker(BROKER_A_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
if (multi) {
- addTransportConnector(getBroker(BROKER_B_NAME), "network",
BROKER_B_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge",
"static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false",
false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge",
"static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false",
false, null);
+ brokerANobAddr =
getBroker(BROKER_A_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME),
"A_2_B_Bridge", "static://(" + brokerBNobAddr +
")?useExponentialBackOff=false", false, clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME),
"A_2_C_Bridge", "static://(" + brokerCNobAddr +
")?useExponentialBackOff=false", false, null);
} else {
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge",
"static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false",
false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge",
"static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false",
false, null);
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME),
"A_2_B_Bridge", "static://(" + brokerBClientAddr +
")?useExponentialBackOff=false", false, clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_A_NAME),
"A_2_C_Bridge", "static://(" + brokerCClientAddr +
")?useExponentialBackOff=false", false, null);
}
- getBroker(BROKER_B_NAME).start();
}
}
- private void createBrokerC(boolean multi, String params, String
clusterFilter, String destinationFilter) throws Exception {
- final String tcParams = (params == null)?"":params;
+ /**
+ * Creates broker C with transport connector and network bridges to A and
B.
+ * Rebinds to the same port that was previously used (stored in
brokerCClientAddr).
+ * Used for re-creating broker C after it has been stopped.
+ */
+ private void createBrokerC(final boolean multi, final String params, final
String clusterFilter, final String destinationFilter) throws Exception {
+ final String tcParams = (params == null) ? "" : params;
if (getBroker(BROKER_C_NAME) == null) {
+ final String bindAddr = extractBindAddress(brokerCClientAddr);
+
addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
- addTransportConnector(getBroker(BROKER_C_NAME), "openwire",
BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
if (multi) {
- addTransportConnector(getBroker(BROKER_C_NAME), "network",
BROKER_C_NOB_TC_ADDRESS + tcParams, false);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge",
"static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false",
false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge",
"static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false",
false, null);
+ final String nobBindAddr = extractBindAddress(brokerCNobAddr);
+ addTransportConnector(getBroker(BROKER_C_NAME), "openwire",
bindAddr + tcParams, true);
+ addTransportConnector(getBroker(BROKER_C_NAME), "network",
nobBindAddr + tcParams, false);
} else {
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge",
"static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false",
false, clusterFilter);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge",
"static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false",
false, null);
+ addTransportConnector(getBroker(BROKER_C_NAME), "openwire",
bindAddr + tcParams, true);
}
getBroker(BROKER_C_NAME).start();
+ getBroker(BROKER_C_NAME).waitUntilStarted();
+ brokerCClientAddr =
getBroker(BROKER_C_NAME).getTransportConnectorByName("openwire").getPublishableConnectString();
+ if (multi) {
+ brokerCNobAddr =
getBroker(BROKER_C_NAME).getTransportConnectorByName("network").getPublishableConnectString();
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME),
"C_2_A_Bridge", "static://(" + brokerANobAddr +
")?useExponentialBackOff=false", false, clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME),
"C_2_B_Bridge", "static://(" + brokerBNobAddr +
")?useExponentialBackOff=false", false, null);
+ } else {
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME),
"C_2_A_Bridge", "static://(" + brokerAClientAddr +
")?useExponentialBackOff=false", false, clusterFilter);
+ addAndStartNetworkBridge(getBroker(BROKER_C_NAME),
"C_2_B_Bridge", "static://(" + brokerBClientAddr +
")?useExponentialBackOff=false", false, null);
+ }
+ }
+ }
+
+ /**
+ * Extracts the bind address (tcp://host:port) from a publishable connect
string.
+ */
+ private static String extractBindAddress(final String publishableAddr)
throws Exception {
+ final URI uri = new URI(publishableAddr);
+ return "tcp://127.0.0.1:" + uri.getPort();
+ }
+
+ /**
+ * Adds a network bridge to a broker and starts it immediately.
+ * This is used when the broker is already running.
+ */
+ private void addAndStartNetworkBridge(final BrokerService broker, final
String bridgeName,
+ final String uri, final boolean
duplex, final String destinationFilter) throws Exception {
+ final NetworkConnector network = broker.addNetworkConnector(uri);
+ network.setName(bridgeName);
+ network.setDuplex(duplex);
+ if (destinationFilter != null && !destinationFilter.isEmpty()) {
+ network.setDestinationFilter(bridgeName);
Review Comment:
Bad copy/paste sorry.
Thanks for catching it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact