This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 68e400b45c ARTEMIS-4184 Bidges with concurrency not cleared properly
on config reload
68e400b45c is described below
commit 68e400b45cf4957de3bf1333e64c01e615f33622
Author: a181321 <[email protected]>
AuthorDate: Thu Jun 15 12:18:01 2023 +0200
ARTEMIS-4184 Bidges with concurrency not cleared properly on config reload
---
.../artemis/core/config/BridgeConfiguration.java | 5 ++-
.../core/server/impl/ActiveMQServerImpl.java | 45 ++++++++++++++--------
.../tests/integration/jms/RedeployTest.java | 2 +
.../src/test/resources/reload-bridge.xml | 1 +
4 files changed, 37 insertions(+), 16 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
index d9bc9ff182..7cfb861d70 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
@@ -275,6 +275,9 @@ public final class BridgeConfiguration implements
Serializable {
*/
public BridgeConfiguration setName(final String name) {
this.name = name;
+ if (this.parentName == null) {
+ this.parentName = name;
+ }
return this;
}
@@ -769,7 +772,7 @@ public final class BridgeConfiguration implements
Serializable {
if (name == null) {
if (other.name != null)
return false;
- } else if (!name.equals(other.name))
+ } else if (!parentName.equals(other.parentName))
return false;
if (password == null) {
if (other.password != null)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 425069f49c..3c3db20b98 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -4619,34 +4619,49 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
deployQueuesFromListQueueConfiguration(configuration.getQueueConfigs());
ActiveMQServerLogger.LOGGER.reloadingConfiguration("bridges");
+
for (BridgeConfiguration newBridgeConfig :
configuration.getBridgeConfigurations()) {
- newBridgeConfig.setParentName(newBridgeConfig.getName());
- Bridge existingBridge =
clusterManager.getBridges().get(newBridgeConfig.getParentName());
- if (existingBridge != null &&
!existingBridge.getConfiguration().equals(newBridgeConfig) &&
existingBridge.getConfiguration().isConfigurationManaged()) {
+
+ String bridgeName = newBridgeConfig.getName();
+ newBridgeConfig.setParentName(bridgeName);
+
+ //Look for bridges with matching parentName. Only need first match
in case of concurrent bridges
+ Bridge existingBridge =
clusterManager.getBridges().values().stream()
+ .filter(bridge ->
bridge.getConfiguration().getParentName().equals(bridgeName))
+ .findFirst()
+ .orElse(null);
+
+ if (existingBridge != null &&
existingBridge.getConfiguration().isConfigurationManaged() &&
!existingBridge.getConfiguration().equals(newBridgeConfig)) {
// this is an existing bridge but the config changed so stop
the current bridge and deploy the new one
- destroyBridge(existingBridge.getName().toString());
+ destroyBridge(bridgeName);
deployBridge(newBridgeConfig);
} else if (existingBridge == null) {
// this is a new bridge
deployBridge(newBridgeConfig);
}
+
}
- for (final Bridge existingBridge:
clusterManager.getBridges().values()) {
+
+ //Look for already running bridges no longer in configuration, stop
if found
+ for (final Bridge existingBridge :
clusterManager.getBridges().values()) {
BridgeConfiguration existingBridgeConfig =
existingBridge.getConfiguration();
- boolean destroy = true;
- for (final BridgeConfiguration newBridgeConfig :
configuration.getBridgeConfigurations()) {
- if (existingBridgeConfig.isConfigurationManaged() &&
(existingBridgeConfig.getParentName().equals(newBridgeConfig.getName()) ||
existingBridgeConfig.getName().equals(newBridgeConfig.getName()) )) {
- destroy = false;
- break;
+
+ if (existingBridgeConfig.isConfigurationManaged()) {
+ String existingBridgeName =
existingBridgeConfig.getParentName();
+
+ boolean noLongerConfigured =
configuration.getBridgeConfigurations().stream()
+ .noneMatch(bridge ->
bridge.getParentName().equals(existingBridgeName));
+
+ if (noLongerConfigured) {
+ destroyBridge(existingBridgeName);
}
}
- if (destroy) {
- // this bridge is running but it isn't in the new config which
means it was removed so destroy it
-
destroyBridge(existingBridge.getConfiguration().getParentName());
- }
+
}
- recoverStoredConnectors();
+
recoverStoredBridges();
+ recoverStoredConnectors();
+
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index 5e6cb02666..a1904dd42a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -518,6 +518,7 @@ public class RedeployTest extends ActiveMQTestBase {
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
+ Wait.assertEquals(3, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount());
}
try (ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory();
@@ -543,6 +544,7 @@ public class RedeployTest extends ActiveMQTestBase {
producer.send(session.createMessage());
Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-new").getMessageCount());
Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
+ Wait.assertEquals(2, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount());
}
try (ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory();
diff --git a/tests/integration-tests/src/test/resources/reload-bridge.xml
b/tests/integration-tests/src/test/resources/reload-bridge.xml
index 91c3ad8726..b904816b59 100644
--- a/tests/integration-tests/src/test/resources/reload-bridge.xml
+++ b/tests/integration-tests/src/test/resources/reload-bridge.xml
@@ -39,6 +39,7 @@ under the License.
<bridge name="a">
<queue-name>a-from</queue-name>
<forwarding-address>a-to</forwarding-address>
+ <concurrency>3</concurrency>
<static-connectors>
<connector-ref>connector</connector-ref>
</static-connectors>