This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new befb69466e ARTEMIS-5349 Bridges with concurrency can leak
befb69466e is described below
commit befb69466ea19d20041217c36c2391a09a1924e4
Author: AntonRoskvist <[email protected]>
AuthorDate: Mon Mar 17 11:19:15 2025 +0100
ARTEMIS-5349 Bridges with concurrency can leak
---
.../core/server/impl/ActiveMQServerImpl.java | 21 ++++++++++-----------
.../artemis/tests/integration/jms/RedeployTest.java | 8 ++++----
.../src/test/resources/reload-bridge-updated.xml | 7 ++++++-
.../src/test/resources/reload-bridge.xml | 7 ++++++-
4 files changed, 26 insertions(+), 17 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index ad89eee464..af0cb8c664 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -3068,15 +3068,7 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
@Override
public boolean deployBridge(BridgeConfiguration config) throws Exception {
if (clusterManager != null && clusterManager.deployBridge(config)) {
- //copying and modifying bridgeConfig before storing to deal with
"concurrency > 1" bridges
- for (Bridge bridge : clusterManager.getBridges().values()) {
- BridgeConfiguration copyConfig = new
BridgeConfiguration(bridge.getConfiguration());
- if (copyConfig.getConcurrency() > 1 &&
!copyConfig.getName().endsWith("-0")) {
- continue;
- }
- copyConfig.setName(copyConfig.getParentName());
- storageManager.storeBridgeConfiguration(new
PersistedBridgeConfiguration(copyConfig));
- }
+ storageManager.storeBridgeConfiguration(new
PersistedBridgeConfiguration(config));
return true;
}
return false;
@@ -3089,9 +3081,9 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
for (Bridge bridge : clusterManager.getBridges().values()) {
if (bridge.getConfiguration().getParentName().equals(name)) {
clusterManager.destroyBridge(bridge.getConfiguration().getName());
+
storageManager.deleteBridgeConfiguration(bridge.getConfiguration().getParentName());
}
}
- storageManager.deleteBridgeConfiguration(name);
}
}
@@ -4478,7 +4470,14 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
private void recoverStoredBridges() throws Exception {
if (storageManager.recoverBridgeConfigurations() != null) {
for (PersistedBridgeConfiguration persistedBridgeConfiguration :
storageManager.recoverBridgeConfigurations()) {
-
deployBridge(persistedBridgeConfiguration.getBridgeConfiguration());
+
+ boolean bridgeMissing =
clusterManager.getBridges().values().stream()
+ .noneMatch(bridge -> bridge.getConfiguration().getParentName()
+ .equals(persistedBridgeConfiguration.getName()));
+
+ if (bridgeMissing) {
+
deployBridge(persistedBridgeConfiguration.getBridgeConfiguration());
+ }
}
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index b3e934224f..d263ea5816 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -590,9 +590,9 @@ public class RedeployTest extends ActiveMQTestBase {
Queue queue = session.createQueue("a-from");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
- Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-new").getMessageCount());
- Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
- Wait.assertEquals(2, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount());
+ Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-new").getMessageCount(),
2000);
+ Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount(),
2000);
+ Wait.assertEquals(2, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount(),
2000);
}
try (ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory();
@@ -610,7 +610,7 @@ public class RedeployTest extends ActiveMQTestBase {
Queue queue = session.createQueue("c-from");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
- Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("c-to").getMessageCount());
+ Wait.assertEquals(1, () ->
embeddedActiveMQ.getActiveMQServer().locateQueue("c-to").getMessageCount(),
2000);
}
} finally {
diff --git
a/tests/integration-tests/src/test/resources/reload-bridge-updated.xml
b/tests/integration-tests/src/test/resources/reload-bridge-updated.xml
index f268f85916..79be5598b6 100644
--- a/tests/integration-tests/src/test/resources/reload-bridge-updated.xml
+++ b/tests/integration-tests/src/test/resources/reload-bridge-updated.xml
@@ -24,9 +24,14 @@ under the License.
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
- <persistence-enabled>false</persistence-enabled>
+ <persistence-enabled>true</persistence-enabled>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
<paging-directory>./target/tmp/reload-bridge-test/paging</paging-directory>
+
<bindings-directory>./target/tmp/reload-bridge-test/binding</bindings-directory>
+
<journal-directory>./target/tmp/reload-bridge-test/journal</journal-directory>
+
<large-messages-directory>./target/tmp/reload-bridge-test/largemessages</large-messages-directory>
+
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
diff --git a/tests/integration-tests/src/test/resources/reload-bridge.xml
b/tests/integration-tests/src/test/resources/reload-bridge.xml
index b904816b59..2de198b602 100644
--- a/tests/integration-tests/src/test/resources/reload-bridge.xml
+++ b/tests/integration-tests/src/test/resources/reload-bridge.xml
@@ -24,9 +24,14 @@ under the License.
<core xmlns="urn:activemq:core">
<security-enabled>false</security-enabled>
- <persistence-enabled>false</persistence-enabled>
+ <persistence-enabled>true</persistence-enabled>
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
<paging-directory>./target/tmp/reload-bridge-test/paging</paging-directory>
+
<bindings-directory>./target/tmp/reload-bridge-test/binding</bindings-directory>
+
<journal-directory>./target/tmp/reload-bridge-test/journal</journal-directory>
+
<large-messages-directory>./target/tmp/reload-bridge-test/largemessages</large-messages-directory>
+
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
</acceptors>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact