This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new b6f9a14e1a Ensure connection info is processed before durable sync
(#2047)
b6f9a14e1a is described below
commit b6f9a14e1a6ad1b9a3f1c41eb5ea2db990eaba34
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed May 27 01:42:11 2026 -0400
Ensure connection info is processed before durable sync (#2047)
This update waits until the ConnectionInfo command is processed by the
entire broker chain without error before sending the BrokerSubscriptionInfo
command for durable sync back to a remote broker requesting it
---
.../activemq/broker/TransportConnection.java | 46 +++-
.../AbstractDurableSyncNetworkBridgeTest.java | 123 ++++++++++
.../network/DurableSyncNetworkBridgeAuthTest.java | 270 +++++++++++++++++++++
.../network/DurableSyncNetworkBridgeTest.java | 78 +-----
4 files changed, 431 insertions(+), 86 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 9079e24d67..f038f34771 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -164,6 +165,7 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
private final ReentrantReadWriteLock serviceLock = new
ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
private final long connectedTimestamp;
+ private final CompletableFuture<ConnectionId> initialConnectionId = new
CompletableFuture<>();
/**
* @param taskRunnerFactory - can be null if you want direct dispatch to
the transport
@@ -852,11 +854,16 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
try {
broker.addConnection(context, info);
+ // Complete the future with the connectionId if we completed
+ // the broker.addConnection() chain successfully
+ initialConnectionId.complete(info.getConnectionId());
} catch (Exception e) {
synchronized (brokerConnectionStates) {
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
+ // complete with the exception
+ initialConnectionId.completeExceptionally(e);
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={}
due to {}",
info.getConnectionId(), clientId, info.getClientIp(),
e.getLocalizedMessage());
//AMQ-6561 - stop for all exceptions on addConnection
@@ -1390,13 +1397,10 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
LOG.error(" Slave Brokers are no longer supported - slave trying
to attach is: {}", info.getBrokerName());
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
try {
- NetworkBridgeConfiguration config =
getNetworkConfiguration(info);
- if (config.isSyncDurableSubs() && protocolVersion.get() >=
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
- LOG.debug("SyncDurableSubs is enabled, Sending
BrokerSubscriptionInfo");
-
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(),
config));
- }
+ // register durable sync to be sent after ConnectionInfo has
been handled
+ registerDurableSync(getNetworkConfiguration(info), info);
} catch (Exception e) {
- LOG.error("Failed to respond to network bridge creation from
broker {}", info.getBrokerId(), e);
+ LOG.error("Failed to register durable sync for network bridge
creation from broker {}", info.getBrokerId(), e);
return null;
}
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
@@ -1406,10 +1410,8 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
NetworkBridgeConfiguration config =
getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());
- if (config.isSyncDurableSubs() && protocolVersion.get() >=
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
- LOG.debug("SyncDurableSubs is enabled, Sending
BrokerSubscriptionInfo");
-
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(),
config));
- }
+ // register durable sync to be sent after ConnectionInfo has
been handled
+ registerDurableSync(config, info);
// check for existing duplex connection hanging about
@@ -1475,6 +1477,30 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
return null;
}
+ private void registerDurableSync(final NetworkBridgeConfiguration config,
final BrokerInfo info) {
+ if (config.isSyncDurableSubs() && protocolVersion.get() >=
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
+ // this will complete when the connection id has been set, or
immediately if already set
+ initialConnectionId.whenComplete((connectionId, t) -> {
+ try {
+ if (t != null) {
+ LOG.warn("SyncDurableSubs will be skipped due to error
{}",
+ t.getMessage());
+ return;
+ }
+ // check connection still registered
+ if (lookupConnectionState(connectionId) != null) {
+ LOG.debug("SyncDurableSubs is enabled, Sending
BrokerSubscriptionInfo");
+
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
+ this.broker.getBrokerService(), config));
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to respond to network bridge creation
from broker {}",
+ info.getBrokerId(), e);
+ }
+ });
+ }
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
private HashMap<String, String> createMap(Properties properties) {
return new HashMap(properties);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java
new file mode 100644
index 0000000000..47df7ef8ba
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractDurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(
+ AbstractDurableSyncNetworkBridgeTest.class);
+
+ protected abstract void doSetUpLocalBroker(boolean deleteAllMessages,
boolean startNetworkConnector, File dataDir) throws Exception;
+
+ protected abstract void doSetUpRemoteBroker(boolean deleteAllMessages,
File dataDir, int port) throws Exception;
+
+ protected void restartLocalBroker(boolean startNetworkConnector) throws
Exception {
+ stopLocalBroker();
+ doSetUpLocalBroker(false, startNetworkConnector,
localBroker.getDataDirectoryFile());
+ }
+
+ protected void restartRemoteBroker() throws Exception {
+ final int previousPort =
remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
+ final File dataDir = remoteBroker.getDataDirectoryFile();
+ stopRemoteBroker();
+ try {
+ doSetUpRemoteBroker(false, dataDir, previousPort);
+ } catch (final IOException e) {
+ if (e.getCause() instanceof java.net.BindException) {
+ // Previous port still in TIME_WAIT — use a new ephemeral port
+ doSetUpRemoteBroker(false, dataDir, 0);
+ // Update the local broker's network connector to point to the
new port
+ updateLocalNetworkConnectorUri();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ protected void restartBroker(BrokerService broker, boolean
startNetworkConnector) throws Exception {
+ if (broker.getBrokerName().equals("localBroker")) {
+ restartLocalBroker(startNetworkConnector);
+ } else {
+ restartRemoteBroker();
+ }
+ }
+
+ protected void waitForBridgeFullyStarted() throws Exception {
+ waitForBridgeFullyStarted(TimeUnit.SECONDS.toMillis(15), true);
+ }
+
+ protected void waitForBridgeFullyStarted(long millis, boolean duplex)
throws Exception {
+ // Wait for the local bridge to be fully started (advisory consumers
registered)
+ assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
+ if
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+ return false;
+ }
+ final NetworkBridge bridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ return ((DemandForwardingBridgeSupport)
bridge).startedLatch.getCount() == 0;
+ }
+ return true;
+ }, millis, 100));
+
+ // Also wait for the duplex bridge on the remote broker to be fully
started.
+ // The duplex connector creates a separate DemandForwardingBridge on
the remote side
+ // that also needs its advisory consumers registered before it can
process events.
+ if (duplex) {
+ assertTrue("Duplex bridge should be fully started",
Wait.waitFor(() -> {
+ final DemandForwardingBridge duplexBridge = findDuplexBridge(
+ remoteBroker.getTransportConnectors().get(0));
+ return duplexBridge != null &&
duplexBridge.startedLatch.getCount() == 0;
+ }, millis, 100));
+ }
+ }
+
+
+ /**
+ * When the remote broker restarts on a new ephemeral port (BindException
fallback),
+ * any existing network connector on the local broker still points to the
old port.
+ * This method stops the old connector and replaces it with one targeting
the new URI.
+ */
+ protected void updateLocalNetworkConnectorUri() throws Exception {
+ if (localBroker == null) {
+ return;
+ }
+ final List<NetworkConnector> connectors =
localBroker.getNetworkConnectors();
+ if (connectors.isEmpty()) {
+ return;
+ }
+ final NetworkConnector oldConnector = connectors.get(0);
+ oldConnector.stop();
+ localBroker.removeNetworkConnector(oldConnector);
+ final NetworkConnector newConnector = configureLocalNetworkConnector();
+ localBroker.addNetworkConnector(newConnector);
+ newConnector.start();
+ }
+
+ protected abstract NetworkConnector configureLocalNetworkConnector()
throws Exception;
+
+}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
new file mode 100644
index 0000000000..646dd4f184
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerSubscriptionInfo;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import
org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetworkBridgeTest {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(DurableSyncNetworkBridgeAuthTest.class);
+
+ @Parameters(name="duplex={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {true},
+ {false}
+ });
+ }
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
+
+ public static final String KEYSTORE_TYPE = "jks";
+ public static final String PASSWORD = "password";
+ public static final String SERVER_KEYSTORE =
"src/test/resources/server.keystore";
+ public static final String TRUST_KEYSTORE =
"src/test/resources/client.keystore";
+
+ static {
+ System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+ System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+ System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+ System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+ System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+ System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+ }
+
+ private static final String USER_PASSWORD = "password";
+ private final boolean duplex;
+ private final AtomicReference<BrokerSubscriptionInfo> brokerSubInfo = new
AtomicReference<>();
+ private String ncPassword = USER_PASSWORD;
+
+ public DurableSyncNetworkBridgeAuthTest(boolean duplex) {
+ this.duplex = duplex;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.ncPassword = USER_PASSWORD;
+ this.brokerSubInfo.set(null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ doTearDown();
+ }
+
+ @Test
+ public void testAuthSuccess() throws Exception {
+ doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+ TimeUnit.SECONDS.toMillis(15));
+
+ // When the local broker starts the bridge it will send its
BrokerSubscriptionInfo list
+ // automatically on connect so the remote broker will always receive
it. However, the
+ // remote broker should only send back its list after the connection
is properly authenticated.
+ assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+
+ // Simulate a connection exception and reconnect, we should receive
again
+ brokerSubInfo.set(null);
+ localBroker.getNetworkConnectors().get(0).activeBridges().stream()
+ .findFirst().orElseThrow().serviceRemoteException(new
Exception());
+ assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+ }
+
+ @Test
+ public void testAuthFailure() throws Exception {
+ this.ncPassword = "badpassword";
+ try {
+ // set a shorter wait time, it won't connect with bad password
+ doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+ TimeUnit.SECONDS.toMillis(5));
+ throw new IllegalStateException("Should have received assertion
error with bad password");
+ } catch (AssertionError e) {
+ // expected
+ }
+
+ // Because the local broker was not authenticated by the remote
broker, the local broker
+ // should not have received back the BrokerSubscriptionInfo
+ assertNull(brokerSubInfo.get());
+ }
+
+ @Test
+ public void testRestartSync() throws Exception {
+ doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+ TimeUnit.SECONDS.toMillis(15));
+
+ // When the local broker starts the bridge it will send its
BrokerSubscriptionInfo list
+ // automatically on connect so the remote broker will always receive
it. However, the
+ // remote broker should only send back its list after the connection
is properly authenticated.
+ assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+
+ // Restart, should receive again with new connection
+ brokerSubInfo.set(null);
+ restartRemoteBroker();
+
+ // Wait for the reconnect and receive of BrokerSubInfo
+ assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+ }
+
+ protected void doSetUp(boolean deleteAllMessages, boolean
startNetworkConnector, File localDataDir,
+ File remoteDataDir, long waitForStart) throws Exception {
+ doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
+ doSetUpLocalBroker(deleteAllMessages, startNetworkConnector,
localDataDir);
+ //Wait for the bridge to be fully started
+ if (startNetworkConnector) {
+ waitForBridgeFullyStarted(waitForStart, duplex);
+ }
+ }
+
+ protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean
startNetworkConnector,
+ File dataDir) throws Exception {
+ localBroker = createLocalBroker(dataDir, startNetworkConnector);
+ localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ localBroker.start();
+ localBroker.waitUntilStarted();
+
+ if (startNetworkConnector) {
+ // Best-effort wait for the bridge to appear. Do NOT use
assertTrue here
+ // because some tests restart localBroker before remoteBroker is
running,
+ // relying on the bridge connecting later when remoteBroker
restarts.
+ // Tests that need the bridge to be fully started call
assertBridgeStarted() explicitly.
+ // Keep timeout short (5s) to avoid growing the NC reconnect
backoff too much,
+ // which would delay bridge formation when the remote broker
starts later.
+ Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+ TimeUnit.SECONDS.toMillis(5), 500);
+ }
+
+ }
+
+ protected void doSetUpRemoteBroker(boolean deleteAllMessages, File
dataDir, int port) throws Exception {
+ remoteBroker = createRemoteBroker(dataDir, port);
+ remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ remoteBroker.start();
+ remoteBroker.waitUntilStarted();
+ }
+
+ protected BrokerService createLocalBroker(File dataDir, boolean
startNetworkConnector) throws Exception {
+ BrokerService brokerService = new BrokerService();
+ brokerService.setBrokerName("localBroker");
+ brokerService.setDataDirectoryFile(dataDir);
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(dataDir);
+
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+ brokerService.setPersistenceAdapter(adapter);
+
+ if (startNetworkConnector) {
+
brokerService.addNetworkConnector(configureLocalNetworkConnector());
+ }
+
+ //Use auto+nio+ssl to test out the transport works with bridging
+ brokerService.addConnector("auto+nio+ssl://localhost:0");
+
+ return brokerService;
+ }
+
+ @Override
+ protected NetworkConnector configureLocalNetworkConnector() throws
Exception {
+ List<TransportConnector> transportConnectors =
remoteBroker.getTransportConnectors();
+ URI remoteURI = transportConnectors.get(0).getConnectUri();
+ String uri = "static:(" + remoteURI + ")";
+ NetworkConnector connector = new DiscoveryNetworkConnector(new
URI(uri)) {
+ @Override
+ protected NetworkBridge createBridge(Transport localTransport,
+ Transport remoteTransport, DiscoveryEvent event) {
+ // Add a listener so we can capture if the remote broker sends
+ // back a BrokerSubscriptionInfo object
+ final Transport remoteFilter = new
TransportFilter(remoteTransport) {
+ @Override
+ public void onCommand(Object command) {
+ if (command instanceof BrokerSubscriptionInfo) {
+ if (brokerSubInfo.get() != null) {
+ throw new IllegalStateException("Received
BrokerSubscriptionInfo more than once.");
+ }
+ brokerSubInfo.set((BrokerSubscriptionInfo)
command);
+ }
+ super.onCommand(command);
+ }
+ };
+ return super.createBridge(localTransport, remoteFilter, event);
+ }
+ };
+ connector.setName("networkConnector");
+ connector.setUserName("user1");
+ connector.setPassword(ncPassword);
+ connector.setDecreaseNetworkConsumerPriority(false);
+ connector.setConduitSubscriptions(true);
+ connector.setDuplex(duplex);
+ connector.setStaticBridge(false);
+ connector.setSyncDurableSubs(true);
+ connector.setDynamicallyIncludedDestinations(List.of(new
ActiveMQTopic("include.test.>")));
+ return connector;
+ }
+
+ protected BrokerService createRemoteBroker(File dataDir, int port) throws
Exception {
+ BrokerService brokerService = new BrokerService();
+ brokerService.setBrokerName("remoteBroker");
+ brokerService.setUseJmx(false);
+ brokerService.setDataDirectoryFile(dataDir);
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(dataDir);
+
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+ brokerService.setPersistenceAdapter(adapter);
+
+ // Add authentication to the remote broker
+ AuthenticationUser user = new AuthenticationUser("user1",
USER_PASSWORD, "group1");
+ SimpleAuthenticationPlugin authenticationPlugin = new
SimpleAuthenticationPlugin();
+ authenticationPlugin.setUsers(List.of(user));
+ brokerService.setPlugins(new BrokerPlugin[] {authenticationPlugin});
+
+ brokerService.addConnector("auto+nio+ssl://localhost:" + port);
+
+ return brokerService;
+ }
+
+}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 56cbe45af4..39c7e278ac 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -63,7 +63,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
-public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
+public class DurableSyncNetworkBridgeTest extends
AbstractDurableSyncNetworkBridgeTest {
protected static final Logger LOG =
LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
@@ -706,14 +706,6 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
return compositeTopic;
}
- protected void restartBroker(BrokerService broker, boolean
startNetworkConnector) throws Exception {
- if (broker.getBrokerName().equals("localBroker")) {
- restartLocalBroker(startNetworkConnector);
- } else {
- restartRemoteBroker();
- }
- }
-
protected void restartBrokers(boolean startNetworkConnector) throws
Exception {
doTearDown();
doSetUp(false, startNetworkConnector,
localBroker.getDataDirectoryFile(),
@@ -734,73 +726,6 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
}
}
- private void waitForBridgeFullyStarted() throws Exception {
- // Wait for the local bridge to be fully started (advisory consumers
registered)
- assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
- if
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
- return false;
- }
- final NetworkBridge bridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
- if (bridge instanceof DemandForwardingBridgeSupport) {
- return ((DemandForwardingBridgeSupport)
bridge).startedLatch.getCount() == 0;
- }
- return true;
- }, TimeUnit.SECONDS.toMillis(15), 100));
-
- // Also wait for the duplex bridge on the remote broker to be fully
started.
- // The duplex connector creates a separate DemandForwardingBridge on
the remote side
- // that also needs its advisory consumers registered before it can
process events.
- assertTrue("Duplex bridge should be fully started", Wait.waitFor(() ->
{
- final DemandForwardingBridge duplexBridge = findDuplexBridge(
- remoteBroker.getTransportConnectors().get(0));
- return duplexBridge != null &&
duplexBridge.startedLatch.getCount() == 0;
- }, TimeUnit.SECONDS.toMillis(15), 100));
- }
-
- protected void restartLocalBroker(boolean startNetworkConnector) throws
Exception {
- stopLocalBroker();
- doSetUpLocalBroker(false, startNetworkConnector,
localBroker.getDataDirectoryFile());
- }
-
- protected void restartRemoteBroker() throws Exception {
- final int previousPort =
remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
- final File dataDir = remoteBroker.getDataDirectoryFile();
- stopRemoteBroker();
- try {
- doSetUpRemoteBroker(false, dataDir, previousPort);
- } catch (final IOException e) {
- if (e.getCause() instanceof java.net.BindException) {
- // Previous port still in TIME_WAIT — use a new ephemeral port
- doSetUpRemoteBroker(false, dataDir, 0);
- // Update the local broker's network connector to point to the
new port
- updateLocalNetworkConnectorUri();
- } else {
- throw e;
- }
- }
- }
-
- /**
- * When the remote broker restarts on a new ephemeral port (BindException
fallback),
- * any existing network connector on the local broker still points to the
old port.
- * This method stops the old connector and replaces it with one targeting
the new URI.
- */
- private void updateLocalNetworkConnectorUri() throws Exception {
- if (localBroker == null) {
- return;
- }
- final List<NetworkConnector> connectors =
localBroker.getNetworkConnectors();
- if (connectors.isEmpty()) {
- return;
- }
- final NetworkConnector oldConnector = connectors.get(0);
- oldConnector.stop();
- localBroker.removeNetworkConnector(oldConnector);
- final NetworkConnector newConnector = configureLocalNetworkConnector();
- localBroker.addNetworkConnector(newConnector);
- newConnector.start();
- }
-
protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean
startNetworkConnector,
File dataDir) throws Exception {
localBroker = createLocalBroker(dataDir, startNetworkConnector);
@@ -882,6 +807,7 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
return brokerService;
}
+ @Override
protected NetworkConnector configureLocalNetworkConnector() throws
Exception {
List<TransportConnector> transportConnectors =
remoteBroker.getTransportConnectors();
URI remoteURI = transportConnectors.get(0).getConnectUri();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact