This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new b6f9a14e1a Ensure connection info is processed before durable sync 
(#2047)
b6f9a14e1a is described below

commit b6f9a14e1a6ad1b9a3f1c41eb5ea2db990eaba34
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Wed May 27 01:42:11 2026 -0400

    Ensure connection info is processed before durable sync (#2047)
    
    This update waits until the ConnectionInfo command is processed by the
    entire broker chain without error before sending the BrokerSubscriptionInfo
    command for durable sync back to a remote broker requesting it
---
 .../activemq/broker/TransportConnection.java       |  46 +++-
 .../AbstractDurableSyncNetworkBridgeTest.java      | 123 ++++++++++
 .../network/DurableSyncNetworkBridgeAuthTest.java  | 270 +++++++++++++++++++++
 .../network/DurableSyncNetworkBridgeTest.java      |  78 +-----
 4 files changed, 431 insertions(+), 86 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 9079e24d67..f038f34771 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -164,6 +165,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     private final ReentrantReadWriteLock serviceLock = new 
ReentrantReadWriteLock();
     private String duplexNetworkConnectorId;
     private final long connectedTimestamp;
+    private final CompletableFuture<ConnectionId> initialConnectionId = new 
CompletableFuture<>();
 
     /**
      * @param taskRunnerFactory - can be null if you want direct dispatch to 
the transport
@@ -852,11 +854,16 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
 
         try {
             broker.addConnection(context, info);
+            // Complete the future with the connectionId if we completed
+            // the broker.addConnection() chain successfully
+            initialConnectionId.complete(info.getConnectionId());
         } catch (Exception e) {
             synchronized (brokerConnectionStates) {
                 brokerConnectionStates.remove(info.getConnectionId());
             }
             unregisterConnectionState(info.getConnectionId());
+            // complete with the exception
+            initialConnectionId.completeExceptionally(e);
             LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} 
due to {}",
                     info.getConnectionId(), clientId, info.getClientIp(), 
e.getLocalizedMessage());
             //AMQ-6561 - stop for all exceptions on addConnection
@@ -1390,13 +1397,10 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
             LOG.error(" Slave Brokers are no longer supported - slave trying 
to attach is: {}", info.getBrokerName());
         } else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
             try {
-                NetworkBridgeConfiguration config = 
getNetworkConfiguration(info);
-                if (config.isSyncDurableSubs() && protocolVersion.get() >= 
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
-                    LOG.debug("SyncDurableSubs is enabled, Sending 
BrokerSubscriptionInfo");
-                    
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(),
 config));
-                }
+                // register durable sync to be sent after ConnectionInfo has 
been handled
+                registerDurableSync(getNetworkConfiguration(info), info);
             } catch (Exception e) {
-                LOG.error("Failed to respond to network bridge creation from 
broker {}", info.getBrokerId(), e);
+                LOG.error("Failed to register durable sync for network bridge 
creation from broker {}", info.getBrokerId(), e);
                 return null;
             }
         } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
@@ -1406,10 +1410,8 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 NetworkBridgeConfiguration config = 
getNetworkConfiguration(info);
                 config.setBrokerName(broker.getBrokerName());
 
-                if (config.isSyncDurableSubs() && protocolVersion.get() >= 
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
-                    LOG.debug("SyncDurableSubs is enabled, Sending 
BrokerSubscriptionInfo");
-                    
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(),
 config));
-                }
+                // register durable sync to be sent after ConnectionInfo has 
been handled
+                registerDurableSync(config, info);
 
                 // check for existing duplex connection hanging about
 
@@ -1475,6 +1477,30 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         return null;
     }
 
+    private void registerDurableSync(final NetworkBridgeConfiguration config, 
final BrokerInfo info) {
+        if (config.isSyncDurableSubs() && protocolVersion.get() >= 
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
+            // this will complete when the connection id has been set, or 
immediately if already set
+            initialConnectionId.whenComplete((connectionId, t) -> {
+                try {
+                    if (t != null) {
+                        LOG.warn("SyncDurableSubs will be skipped due to error 
{}",
+                                t.getMessage());
+                        return;
+                    }
+                    // check connection still registered
+                    if (lookupConnectionState(connectionId) != null) {
+                        LOG.debug("SyncDurableSubs is enabled, Sending 
BrokerSubscriptionInfo");
+                        
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
+                                this.broker.getBrokerService(), config));
+                    }
+                } catch (Exception e) {
+                    LOG.error("Failed to respond to network bridge creation 
from broker {}",
+                            info.getBrokerId(), e);
+                }
+            });
+        }
+    }
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     private HashMap<String, String> createMap(Properties properties) {
         return new HashMap(properties);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java
new file mode 100644
index 0000000000..47df7ef8ba
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractDurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(
+            AbstractDurableSyncNetworkBridgeTest.class);
+
+    protected abstract void doSetUpLocalBroker(boolean deleteAllMessages, 
boolean startNetworkConnector, File dataDir) throws Exception;
+
+    protected abstract void doSetUpRemoteBroker(boolean deleteAllMessages, 
File dataDir, int port) throws Exception;
+
+    protected void restartLocalBroker(boolean startNetworkConnector) throws 
Exception {
+        stopLocalBroker();
+        doSetUpLocalBroker(false, startNetworkConnector, 
localBroker.getDataDirectoryFile());
+    }
+
+    protected void restartRemoteBroker() throws Exception {
+        final int previousPort = 
remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
+        final File dataDir = remoteBroker.getDataDirectoryFile();
+        stopRemoteBroker();
+        try {
+            doSetUpRemoteBroker(false, dataDir, previousPort);
+        } catch (final IOException e) {
+            if (e.getCause() instanceof java.net.BindException) {
+                // Previous port still in TIME_WAIT — use a new ephemeral port
+                doSetUpRemoteBroker(false, dataDir, 0);
+                // Update the local broker's network connector to point to the 
new port
+                updateLocalNetworkConnectorUri();
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    protected void restartBroker(BrokerService broker, boolean 
startNetworkConnector) throws Exception {
+        if (broker.getBrokerName().equals("localBroker")) {
+            restartLocalBroker(startNetworkConnector);
+        } else  {
+            restartRemoteBroker();
+        }
+    }
+
+    protected void waitForBridgeFullyStarted() throws Exception {
+        waitForBridgeFullyStarted(TimeUnit.SECONDS.toMillis(15), true);
+    }
+
+    protected void waitForBridgeFullyStarted(long millis, boolean duplex) 
throws Exception {
+        // Wait for the local bridge to be fully started (advisory consumers 
registered)
+        assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
+            if 
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+                return false;
+            }
+            final NetworkBridge bridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+            if (bridge instanceof DemandForwardingBridgeSupport) {
+                return ((DemandForwardingBridgeSupport) 
bridge).startedLatch.getCount() == 0;
+            }
+            return true;
+        }, millis, 100));
+
+        // Also wait for the duplex bridge on the remote broker to be fully 
started.
+        // The duplex connector creates a separate DemandForwardingBridge on 
the remote side
+        // that also needs its advisory consumers registered before it can 
process events.
+        if (duplex) {
+            assertTrue("Duplex bridge should be fully started", 
Wait.waitFor(() -> {
+                final DemandForwardingBridge duplexBridge = findDuplexBridge(
+                        remoteBroker.getTransportConnectors().get(0));
+                return duplexBridge != null && 
duplexBridge.startedLatch.getCount() == 0;
+            }, millis, 100));
+        }
+    }
+
+
+    /**
+     * When the remote broker restarts on a new ephemeral port (BindException 
fallback),
+     * any existing network connector on the local broker still points to the 
old port.
+     * This method stops the old connector and replaces it with one targeting 
the new URI.
+     */
+    protected void updateLocalNetworkConnectorUri() throws Exception {
+        if (localBroker == null) {
+            return;
+        }
+        final List<NetworkConnector> connectors = 
localBroker.getNetworkConnectors();
+        if (connectors.isEmpty()) {
+            return;
+        }
+        final NetworkConnector oldConnector = connectors.get(0);
+        oldConnector.stop();
+        localBroker.removeNetworkConnector(oldConnector);
+        final NetworkConnector newConnector = configureLocalNetworkConnector();
+        localBroker.addNetworkConnector(newConnector);
+        newConnector.start();
+    }
+
+    protected abstract NetworkConnector configureLocalNetworkConnector() 
throws Exception;
+
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
new file mode 100644
index 0000000000..646dd4f184
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerSubscriptionInfo;
+import org.apache.activemq.command.DiscoveryEvent;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import 
org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetworkBridgeTest {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DurableSyncNetworkBridgeAuthTest.class);
+
+    @Parameters(name="duplex={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+        });
+    }
+
+    @Rule
+    public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
+
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final String PASSWORD = "password";
+    public static final String SERVER_KEYSTORE = 
"src/test/resources/server.keystore";
+    public static final String TRUST_KEYSTORE = 
"src/test/resources/client.keystore";
+
+    static {
+        System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
+        System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
+        System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
+        System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
+        System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
+    }
+
+    private static final String USER_PASSWORD = "password";
+    private final boolean duplex;
+    private final AtomicReference<BrokerSubscriptionInfo> brokerSubInfo = new 
AtomicReference<>();
+    private String ncPassword = USER_PASSWORD;
+
+    public DurableSyncNetworkBridgeAuthTest(boolean duplex) {
+        this.duplex = duplex;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        this.ncPassword = USER_PASSWORD;
+        this.brokerSubInfo.set(null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    @Test
+    public void testAuthSuccess() throws Exception {
+        doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+                TimeUnit.SECONDS.toMillis(15));
+
+        // When the local broker starts the bridge it will send its 
BrokerSubscriptionInfo list
+        // automatically on connect so the remote broker will always receive 
it. However, the
+        // remote broker should only send back its list after the connection 
is properly authenticated.
+        assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+
+        // Simulate a connection exception and reconnect, we should receive 
again
+        brokerSubInfo.set(null);
+        localBroker.getNetworkConnectors().get(0).activeBridges().stream()
+                .findFirst().orElseThrow().serviceRemoteException(new 
Exception());
+        assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+    }
+
+    @Test
+    public void testAuthFailure() throws Exception {
+        this.ncPassword = "badpassword";
+        try {
+            // set a shorter wait time, it won't connect with bad password
+            doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+                    TimeUnit.SECONDS.toMillis(5));
+            throw new IllegalStateException("Should have received assertion 
error with bad password");
+        } catch (AssertionError e) {
+            // expected
+        }
+
+        // Because the local broker was not authenticated by the remote 
broker, the local broker
+        // should not have received back the BrokerSubscriptionInfo
+        assertNull(brokerSubInfo.get());
+    }
+
+    @Test
+    public void testRestartSync() throws Exception {
+        doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+                TimeUnit.SECONDS.toMillis(15));
+
+        // When the local broker starts the bridge it will send its 
BrokerSubscriptionInfo list
+        // automatically on connect so the remote broker will always receive 
it. However, the
+        // remote broker should only send back its list after the connection 
is properly authenticated.
+        assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+
+        // Restart, should receive again with new connection
+        brokerSubInfo.set(null);
+        restartRemoteBroker();
+
+        // Wait for the reconnect and receive of BrokerSubInfo
+        assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+    }
+
+    protected void doSetUp(boolean deleteAllMessages, boolean 
startNetworkConnector, File localDataDir,
+            File remoteDataDir, long waitForStart) throws Exception {
+        doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
+        doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, 
localDataDir);
+        //Wait for the bridge to be fully started
+        if (startNetworkConnector) {
+            waitForBridgeFullyStarted(waitForStart, duplex);
+        }
+    }
+
+    protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean 
startNetworkConnector,
+            File dataDir) throws Exception {
+        localBroker = createLocalBroker(dataDir, startNetworkConnector);
+        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+
+        if (startNetworkConnector) {
+            // Best-effort wait for the bridge to appear. Do NOT use 
assertTrue here
+            // because some tests restart localBroker before remoteBroker is 
running,
+            // relying on the bridge connecting later when remoteBroker 
restarts.
+            // Tests that need the bridge to be fully started call 
assertBridgeStarted() explicitly.
+            // Keep timeout short (5s) to avoid growing the NC reconnect 
backoff too much,
+            // which would delay bridge formation when the remote broker 
starts later.
+            Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+                         TimeUnit.SECONDS.toMillis(5), 500);
+        }
+
+    }
+
+    protected void doSetUpRemoteBroker(boolean deleteAllMessages, File 
dataDir, int port) throws Exception {
+        remoteBroker = createRemoteBroker(dataDir, port);
+        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+    }
+
+    protected BrokerService createLocalBroker(File dataDir, boolean 
startNetworkConnector) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("localBroker");
+        brokerService.setDataDirectoryFile(dataDir);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dataDir);
+        
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+        brokerService.setPersistenceAdapter(adapter);
+
+        if (startNetworkConnector) {
+            
brokerService.addNetworkConnector(configureLocalNetworkConnector());
+        }
+
+        //Use auto+nio+ssl to test out the transport works with bridging
+        brokerService.addConnector("auto+nio+ssl://localhost:0");
+
+        return brokerService;
+    }
+
+    @Override
+    protected NetworkConnector configureLocalNetworkConnector() throws 
Exception {
+        List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
+        URI remoteURI = transportConnectors.get(0).getConnectUri();
+        String uri = "static:(" + remoteURI + ")";
+        NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI(uri)) {
+            @Override
+            protected NetworkBridge createBridge(Transport localTransport,
+                    Transport remoteTransport, DiscoveryEvent event) {
+                // Add a listener so we can capture if the remote broker sends
+                // back a BrokerSubscriptionInfo object
+                final Transport remoteFilter = new 
TransportFilter(remoteTransport) {
+                    @Override
+                    public void onCommand(Object command) {
+                        if (command instanceof BrokerSubscriptionInfo) {
+                            if (brokerSubInfo.get() != null) {
+                                throw new IllegalStateException("Received 
BrokerSubscriptionInfo more than once.");
+                            }
+                            brokerSubInfo.set((BrokerSubscriptionInfo) 
command);
+                        }
+                        super.onCommand(command);
+                    }
+                };
+                return super.createBridge(localTransport, remoteFilter, event);
+            }
+        };
+        connector.setName("networkConnector");
+        connector.setUserName("user1");
+        connector.setPassword(ncPassword);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setDuplex(duplex);
+        connector.setStaticBridge(false);
+        connector.setSyncDurableSubs(true);
+        connector.setDynamicallyIncludedDestinations(List.of(new 
ActiveMQTopic("include.test.>")));
+        return connector;
+    }
+
+    protected BrokerService createRemoteBroker(File dataDir, int port) throws 
Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("remoteBroker");
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectoryFile(dataDir);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dataDir);
+        
adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+        brokerService.setPersistenceAdapter(adapter);
+
+        // Add authentication to the remote broker
+        AuthenticationUser user = new AuthenticationUser("user1", 
USER_PASSWORD, "group1");
+        SimpleAuthenticationPlugin authenticationPlugin = new 
SimpleAuthenticationPlugin();
+        authenticationPlugin.setUsers(List.of(user));
+        brokerService.setPlugins(new BrokerPlugin[] {authenticationPlugin});
+
+        brokerService.addConnector("auto+nio+ssl://localhost:" + port);
+
+        return brokerService;
+    }
+
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 56cbe45af4..39c7e278ac 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -63,7 +63,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
-public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
+public class DurableSyncNetworkBridgeTest extends 
AbstractDurableSyncNetworkBridgeTest {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
 
@@ -706,14 +706,6 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         return compositeTopic;
     }
 
-    protected void restartBroker(BrokerService broker, boolean 
startNetworkConnector) throws Exception {
-        if (broker.getBrokerName().equals("localBroker")) {
-            restartLocalBroker(startNetworkConnector);
-        } else  {
-            restartRemoteBroker();
-        }
-    }
-
     protected void restartBrokers(boolean startNetworkConnector) throws 
Exception {
         doTearDown();
         doSetUp(false, startNetworkConnector, 
localBroker.getDataDirectoryFile(),
@@ -734,73 +726,6 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         }
     }
 
-    private void waitForBridgeFullyStarted() throws Exception {
-        // Wait for the local bridge to be fully started (advisory consumers 
registered)
-        assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
-            if 
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
-                return false;
-            }
-            final NetworkBridge bridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
-            if (bridge instanceof DemandForwardingBridgeSupport) {
-                return ((DemandForwardingBridgeSupport) 
bridge).startedLatch.getCount() == 0;
-            }
-            return true;
-        }, TimeUnit.SECONDS.toMillis(15), 100));
-
-        // Also wait for the duplex bridge on the remote broker to be fully 
started.
-        // The duplex connector creates a separate DemandForwardingBridge on 
the remote side
-        // that also needs its advisory consumers registered before it can 
process events.
-        assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> 
{
-            final DemandForwardingBridge duplexBridge = findDuplexBridge(
-                remoteBroker.getTransportConnectors().get(0));
-            return duplexBridge != null && 
duplexBridge.startedLatch.getCount() == 0;
-        }, TimeUnit.SECONDS.toMillis(15), 100));
-    }
-
-    protected void restartLocalBroker(boolean startNetworkConnector) throws 
Exception {
-        stopLocalBroker();
-        doSetUpLocalBroker(false, startNetworkConnector, 
localBroker.getDataDirectoryFile());
-    }
-
-    protected void restartRemoteBroker() throws Exception {
-        final int previousPort = 
remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort();
-        final File dataDir = remoteBroker.getDataDirectoryFile();
-        stopRemoteBroker();
-        try {
-            doSetUpRemoteBroker(false, dataDir, previousPort);
-        } catch (final IOException e) {
-            if (e.getCause() instanceof java.net.BindException) {
-                // Previous port still in TIME_WAIT — use a new ephemeral port
-                doSetUpRemoteBroker(false, dataDir, 0);
-                // Update the local broker's network connector to point to the 
new port
-                updateLocalNetworkConnectorUri();
-            } else {
-                throw e;
-            }
-        }
-    }
-
-    /**
-     * When the remote broker restarts on a new ephemeral port (BindException 
fallback),
-     * any existing network connector on the local broker still points to the 
old port.
-     * This method stops the old connector and replaces it with one targeting 
the new URI.
-     */
-    private void updateLocalNetworkConnectorUri() throws Exception {
-        if (localBroker == null) {
-            return;
-        }
-        final List<NetworkConnector> connectors = 
localBroker.getNetworkConnectors();
-        if (connectors.isEmpty()) {
-            return;
-        }
-        final NetworkConnector oldConnector = connectors.get(0);
-        oldConnector.stop();
-        localBroker.removeNetworkConnector(oldConnector);
-        final NetworkConnector newConnector = configureLocalNetworkConnector();
-        localBroker.addNetworkConnector(newConnector);
-        newConnector.start();
-    }
-
     protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean 
startNetworkConnector,
             File dataDir) throws Exception {
         localBroker = createLocalBroker(dataDir, startNetworkConnector);
@@ -882,6 +807,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         return brokerService;
     }
 
+    @Override
     protected NetworkConnector configureLocalNetworkConnector() throws 
Exception {
         List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
         URI remoteURI = transportConnectors.get(0).getConnectUri();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to