This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 230db1fec0 Refactor network connections to process after
ConnectionInfo (#2112)
230db1fec0 is described below
commit 230db1fec0b4bfdb8dcf06cdceeb00585ed8f82c
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jun 15 12:29:14 2026 -0400
Refactor network connections to process after ConnectionInfo (#2112)
This refactors the setup for network connections to be processed
after receiving ConnectionInfo. Network bridges are established by
each broker sending a BrokerInfo command to the other broker to provide
remote broker information, but this is done before ConnectionInfo.
This commit reworks the connection to capture the BrokerInfo information,
but delay processing until after the connection information has been
received and processed by broker.addConnection(). The future that was
previously added for the connection id is no longer needed and removed.
This commit also simplifies durable sync for bridges and prevents a
race condition on startup by making sure the initial bridge and the
duplex side only send back the BrokerSubscriptionInfo command after
fully initialized.
---
.../activemq/broker/TransportConnection.java | 137 ++++++++++-----------
.../activemq/broker/region/RegionBroker.java | 37 ++++++
.../network/DemandForwardingBridgeSupport.java | 14 ++-
.../network/DurableSyncNetworkBridgeAuthTest.java | 115 +++++++++++++++--
4 files changed, 217 insertions(+), 86 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 9e77101744..b6fe548857 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -165,7 +165,6 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
private final ReentrantReadWriteLock serviceLock = new
ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
private final long connectedTimestamp;
- private final CompletableFuture<ConnectionId> initialConnectionId = new
CompletableFuture<>();
/**
* @param taskRunnerFactory - can be null if you want direct dispatch to
the transport
@@ -854,16 +853,14 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
try {
broker.addConnection(context, info);
- // Complete the future with the connectionId if we completed
- // the broker.addConnection() chain successfully
- initialConnectionId.complete(info.getConnectionId());
+ // If we completed broker.addConnection() successfully we can now
+ // continue the required extra setup for any network connections
+ addNetworkConnection();
} catch (Exception e) {
synchronized (brokerConnectionStates) {
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
- // complete with the exception
- initialConnectionId.completeExceptionally(e);
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={}
due to {}",
info.getConnectionId(), clientId, info.getClientIp(),
e.getLocalizedMessage());
//AMQ-6561 - stop for all exceptions on addConnection
@@ -1401,44 +1398,75 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
throw new IOException("Unexpected extra broker info command
received from: " + info.getBrokerId());
}
if (info.isSlaveBroker()) {
- LOG.error(" Slave Brokers are no longer supported - slave trying
to attach is: {}", info.getBrokerName());
- } else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
+ LOG.error("Slave Brokers are no longer supported - slave trying to
attach is: {}", info.getBrokerName());
+ throw new IOException("Slave Brokers are no longer supported -
slave trying to attach is: " + info.getBrokerName());
+ }
+
+ // The only thing this method now does is capture the BrokerInfo
object and mark as a network connection.
+ // Actual processing for starting up duplex bridges and for durable
sync has been moved until
+ // after ConnectionInfo has been received.
+
+ // If this is duplex we need to get the ID configured so we can use it
+ // to close existing connections later that match the same ID
+ // This will be done inside the RegionBroker
+ if (info.isNetworkConnection() && info.isDuplexConnection()) {
+ NetworkBridgeConfiguration config = getNetworkConfiguration(info);
+ config.setBrokerName(broker.getBrokerName());
+ String duplexNetworkConnectorId = config.getName() + "@" +
info.getBrokerId();
+ setDuplexNetworkConnectorId(duplexNetworkConnectorId);
+ }
+
+ this.brokerInfo = info;
+ networkConnection = true;
+ List<TransportConnectionState> connectionStates =
listConnectionStates();
+ for (TransportConnectionState cs : connectionStates) {
+ cs.getContext().setNetworkConnection(true);
+ }
+ return null;
+ }
+
+ // Process the network connection set up
+ private void addNetworkConnection() throws Exception {
+ final BrokerInfo info = this.brokerInfo;
+ if (info == null || !info.isNetworkConnection()){
+ return;
+ }
+
+ // For a one way bridge we need to respond on bridge creation by
sending back the durable
+ // subs if durable sync is enabled via BrokerSubscriptionInfo command.
The bridge is only
+ // initialized on one broker, so if this is the passive side we know
it's initialized and
+ // we can respond.
+ //
+ // For a duplex bridge, we do NOT send back the durable subs. To
simplify and ensure
+ // the bridge is fully initialized, the bridge startup will now handle
sending
+ // BrokerSubscriptionInfo to the remote broker once fully started.
+ if (!info.isDuplexConnection()) {
try {
- // register durable sync to be sent after ConnectionInfo has
been handled
- registerDurableSync(getNetworkConfiguration(info), info);
+ NetworkBridgeConfiguration config =
getNetworkConfiguration(info);
+ if (config.isSyncDurableSubs() && protocolVersion.get() >=
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
+ LOG.debug("SyncDurableSubs is enabled, Sending
BrokerSubscriptionInfo");
+ // Send back the durable subs as this is a one way bridge
+
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(),
config));
+ }
} catch (Exception e) {
- LOG.error("Failed to register durable sync for network bridge
creation from broker {}", info.getBrokerId(), e);
- return null;
+ LOG.error("Failed to respond to network bridge creation from
broker {}", info.getBrokerId(), e);
+ throw e;
}
- } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
+ } else {
+ // duplex
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
NetworkBridgeConfiguration config =
getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());
- // register durable sync to be sent after ConnectionInfo has
been handled
- registerDurableSync(config, info);
-
- // check for existing duplex connection hanging about
-
- // We first look if existing network connection already exists
for the same broker Id and network connector name
- // It's possible in case of brief network fault to have this
transport connector side of the connection always active
- // and the duplex network connector side wanting to open a new
one
- // In this case, the old connection must be broken
- String duplexNetworkConnectorId = config.getName() + "@" +
info.getBrokerId();
- CopyOnWriteArrayList<TransportConnection> connections =
this.connector.getConnections();
- synchronized (connections) {
- for (TransportConnection c : connections) {
- if ((c != this) &&
(duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
- LOG.warn("Stopping an existing active duplex
connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
- c.stopAsync();
- // better to wait for a bit rather than get
connection id already in use and failure to start new bridge
- c.getStopped().await(1, TimeUnit.SECONDS);
- }
- }
- setDuplexNetworkConnectorId(duplexNetworkConnectorId);
- }
+ // Note: Durable sync used to be here and was moved to
DemandForwardingBridgeSupport
+ // inside doStartLocalAndRemoteBridges()
+
+ //The logic to clean up existing network connections for the
same ID
+ // has been moved to the RegionBroker where it will check if
the broker
+ // needs to close the connection before trying to create a
duplicate connection
+
Transport localTransport =
NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
Transport remoteBridgeTransport = transport;
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
@@ -1462,46 +1490,13 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
duplexBridge.setCreatedByDuplex(true);
duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Started responder end of duplex bridge {}",
duplexNetworkConnectorId);
- return null;
} catch (TransportDisposedIOException e) {
LOG.warn("Duplex bridge {} was stopped before it was correctly
started.", duplexNetworkConnectorId);
- return null;
} catch (Exception e) {
LOG.error("Failed to create responder end of duplex network
bridge {}", duplexNetworkConnectorId, e);
- return null;
+ throw e;
}
}
- this.brokerInfo = info;
- networkConnection = true;
- List<TransportConnectionState> connectionStates =
listConnectionStates();
- for (TransportConnectionState cs : connectionStates) {
- cs.getContext().setNetworkConnection(true);
- }
- return null;
- }
-
- private void registerDurableSync(final NetworkBridgeConfiguration config,
final BrokerInfo info) {
- if (config.isSyncDurableSubs() && protocolVersion.get() >=
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
- // this will complete when the connection id has been set, or
immediately if already set
- initialConnectionId.whenComplete((connectionId, t) -> {
- try {
- if (t != null) {
- LOG.warn("SyncDurableSubs will be skipped due to error
{}",
- t.getMessage());
- return;
- }
- // check connection still registered
- if (lookupConnectionState(connectionId) != null) {
- LOG.debug("SyncDurableSubs is enabled, Sending
BrokerSubscriptionInfo");
-
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
- this.broker.getBrokerService(), config));
- }
- } catch (Exception e) {
- LOG.error("Failed to respond to network bridge creation
from broker {}",
- info.getBrokerId(), e);
- }
- });
- }
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -1707,7 +1702,7 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
this.duplexNetworkConnectorId = duplexNetworkConnectorId;
}
- protected synchronized String getDuplexNetworkConnectorId() {
+ public synchronized String getDuplexNetworkConnectorId() {
return this.duplexNetworkConnectorId;
}
@@ -1715,7 +1710,7 @@ public class TransportConnection implements Connection,
Task, CommandVisitor {
return stopping.get();
}
- protected CountDownLatch getStopped() {
+ public CountDownLatch getStopped() {
return stopped;
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 2e6ee20497..7d5de4c184 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -255,6 +256,10 @@ public class RegionBroker extends EmptyBroker {
throw new InvalidClientIDException("No clientID specified for
connection request");
}
+ // Clean up existing duplex network connection if this is a reconnect
attempt
+ // This was moved from TransportConnection
+ cleanupExistingDuplexNetworkConnection(context);
+
ConnectionContext oldContext = null;
synchronized (clientIdSet) {
@@ -289,6 +294,38 @@ public class RegionBroker extends EmptyBroker {
connections.add(context.getConnection());
}
+ // We first look if existing network connection already exists for the
same broker Id and network connector name
+ // It's possible in case of brief network fault to have this transport
connector side of the connection always active
+ // and the duplex network connector side wanting to open a new one
+ // In this case, the old connection must be broken
+ private void cleanupExistingDuplexNetworkConnection(ConnectionContext
context) {
+ try {
+ if (context.isNetworkConnection()
+ && context.getConnection() instanceof TransportConnection)
{
+ final TransportConnection newConn = (TransportConnection)
context.getConnection();
+ if (newConn.getDuplexNetworkConnectorId() != null) {
+ for (Connection c : connections) {
+ if (c instanceof TransportConnection) {
+ final TransportConnection existingConn =
(TransportConnection) c;
+ if (newConn.getDuplexNetworkConnectorId()
+
.equals(existingConn.getDuplexNetworkConnectorId())) {
+ LOG.warn("Stopping an existing active duplex
connection [{}] for network connector ({}).",
+ c,
existingConn.getDuplexNetworkConnectorId());
+ existingConn.stopAsync();
+ // better to wait for a bit rather than get
connection id already in use and failure to start new bridge
+ existingConn.getStopped().await(2,
TimeUnit.SECONDS);
+ break;
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Error cleaning up Duplex connection: {}" ,
e.getMessage());
+ LOG.debug(e.getMessage(), e);
+ }
+ }
+
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo
info, Throwable error) throws Exception {
String clientId = info.getClientId();
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 26a5769ed0..17eb9c5529 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -477,6 +477,15 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
if (safeWaitUntilStarted()) {
setupStaticDestinations();
staticDestinationsLatch.countDown();
+
+ // Send to the remote broker the durable subs if sync is
enabled after statup.
+ // This is done by the initiating side of a bridge as well as
by duplex bridges to
+ // ensure everything is fully initialized before sending.
+ if (configuration.isSyncDurableSubs() &&
+ remoteBroker.getWireFormat().getVersion() >=
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
+
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
+ configuration));
+ }
}
} catch (Throwable e) {
serviceLocalException(e);
@@ -599,11 +608,6 @@ public abstract class DemandForwardingBridgeSupport
implements NetworkBridge, Br
brokerInfo.setNetworkProperties(str);
brokerInfo.setBrokerId(this.localBrokerId);
remoteBroker.oneway(brokerInfo);
- if (configuration.isSyncDurableSubs() &&
- remoteBroker.getWireFormat().getVersion() >=
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
-
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
- configuration));
- }
}
if (remoteConnectionInfo != null) {
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
index 19c95da29b..3a99029617 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.activemq.network;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
@@ -39,8 +41,11 @@ import
org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import
org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.Wait;
import org.junit.After;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -84,6 +89,7 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
private static final String USER_PASSWORD = "password";
private final boolean duplex;
private final AtomicReference<BrokerSubscriptionInfo> brokerSubInfo = new
AtomicReference<>();
+ private final AtomicReference<DiscoveryEvent> serviceFailed = new
AtomicReference<>();
private String ncPassword = USER_PASSWORD;
public DurableSyncNetworkBridgeAuthTest(boolean duplex) {
@@ -94,6 +100,7 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
public void setUp() throws Exception {
this.ncPassword = USER_PASSWORD;
this.brokerSubInfo.set(null);
+ this.serviceFailed.set(null);
}
@After
@@ -110,31 +117,77 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
// automatically on connect so the remote broker will always receive
it. However, the
// remote broker should only send back its list after the connection
is properly authenticated.
assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+ assertNull(serviceFailed.get());
+ DemandForwardingBridge bridge = getActiveBridge();
// Simulate a connection exception and reconnect, we should receive
again
brokerSubInfo.set(null);
localBroker.getNetworkConnectors().get(0).activeBridges().stream()
.findFirst().orElseThrow().serviceRemoteException(new
Exception());
+ // wait for failure
+ assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10));
+ assertTrue(Wait.waitFor(bridge.localBroker::isDisposed,5000,10));
+
+ // should reconnect again and get updated info
+ assertTrue(Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+ TimeUnit.SECONDS.toMillis(5), 10));
assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
}
@Test
public void testAuthFailure() throws Exception {
this.ncPassword = "badpassword";
- try {
- // set a shorter wait time, it won't connect with bad password
- doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
- TimeUnit.SECONDS.toMillis(5));
- throw new IllegalStateException("Should have received assertion
error with bad password");
- } catch (AssertionError e) {
- // expected
- }
+ doSetUpRemoteBroker(true, tempFolder.newFolder(), 0);
+ doSetUpLocalBroker(true, true, tempFolder.newFolder());
+ // Wait for the failure due to authentication
+ assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10));
+ assertTrue(Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty(),
+ TimeUnit.SECONDS.toMillis(5), 10));
// Because the local broker was not authenticated by the remote
broker, the local broker
// should not have received back the BrokerSubscriptionInfo
assertNull(brokerSubInfo.get());
}
+ @Test
+ public void testDuplicateDuplexBridgeFailedAuthIgnored() throws Exception {
+ Assume.assumeTrue(duplex);
+ doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+ TimeUnit.SECONDS.toMillis(15));
+
+ // everything is good, no error and we got the sync command
+ assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+ assertNull(serviceFailed.get());
+
+ // Start a duplicate bridge with the same configuration but bad
password
+ // so authentication fails. This should not cause a failure with the
existing
+ // bridge because this connection won't be authenticated
+ DemandForwardingBridge bridge = getActiveBridge();
+ this.ncPassword = "badpassword";
+ NetworkConnector nc =
localBroker.addNetworkConnector(configureLocalNetworkConnector());
+ nc.start();
+ try {
+ Thread.sleep(2000);
+ // Verify bridge is not disposed and still connected
+ assertFalse(bridge.disposed.get());
+ } finally {
+ nc.stop();
+ }
+
+ // try again, this will connect successfully and the broker will
detect it's a duplex bridge
+ // matching the same config and close the other
+ this.ncPassword = USER_PASSWORD;
+ nc = localBroker.addNetworkConnector(configureLocalNetworkConnector());
+ nc.start();
+ try {
+ // authentication is now correct so the RegionBroker should
terminate the other duplex
+ // bridge as it matches
+ assertTrue(Wait.waitFor(bridge.disposed::get,5000,10));
+ } finally {
+ nc.stop();
+ }
+ }
+
@Test
public void testRestartSync() throws Exception {
doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
@@ -144,12 +197,17 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
// automatically on connect so the remote broker will always receive
it. However, the
// remote broker should only send back its list after the connection
is properly authenticated.
assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+ assertNull(serviceFailed.get());
// Restart, should receive again with new connection
brokerSubInfo.set(null);
restartRemoteBroker();
+ // should fail from restart
+ assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10));
// Wait for the reconnect and receive of BrokerSubInfo
+ assertTrue(Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+ TimeUnit.SECONDS.toMillis(5), 10));
assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
}
@@ -159,10 +217,10 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
TimeUnit.SECONDS.toMillis(15));
assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+ assertNull(serviceFailed.get());
// find the established bridge
- DemandForwardingBridge bridge = (DemandForwardingBridge)
localBroker.getNetworkConnectors().get(0).activeBridges().stream()
- .findFirst().orElseThrow();
+ DemandForwardingBridge bridge = getActiveBridge();
// send to one of the brokers (networked brokers will have already
received a BrokerInfo)
// the duplicate will trigger the bridge connection to close
@@ -232,6 +290,9 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
URI remoteURI = transportConnectors.get(0).getConnectUri();
String uri = "static:(" + remoteURI + ")";
NetworkConnector connector = new DiscoveryNetworkConnector(new
URI(uri)) {
+ {
+ this.setDiscoveryAgent(new
DiscoveryAgentFilter(getDiscoveryAgent()));
+ }
@Override
protected NetworkBridge createBridge(Transport localTransport,
Transport remoteTransport, DiscoveryEvent event) {
@@ -248,6 +309,7 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
}
super.onCommand(command);
}
+
};
return super.createBridge(localTransport, remoteFilter, event);
}
@@ -285,4 +347,37 @@ public class DurableSyncNetworkBridgeAuthTest extends
AbstractDurableSyncNetwork
return brokerService;
}
+ private DemandForwardingBridge getActiveBridge() {
+ return(DemandForwardingBridge)
localBroker.getNetworkConnectors().get(0).activeBridges().stream()
+ .findFirst().orElseThrow();
+ }
+
+ private class DiscoveryAgentFilter implements DiscoveryAgent {
+ private final DiscoveryAgent agent;
+
+ public DiscoveryAgentFilter(DiscoveryAgent agent) {
+ this.agent = agent;
+ }
+
+ public void setDiscoveryListener(DiscoveryListener listener) {
+ agent.setDiscoveryListener(listener);
+ }
+
+ public void start() throws Exception {
+ agent.start();
+ }
+
+ public void stop() throws Exception {
+ agent.stop();
+ }
+
+ public void registerService(String name) throws IOException {
+ agent.registerService(name);
+ }
+
+ public void serviceFailed(DiscoveryEvent event) throws IOException {
+ serviceFailed.set(event);
+ agent.serviceFailed(event);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact