This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 1d1f03ad5e ARTEMIS-4971 Warning on Unacked messages through mirror
AckManager
1d1f03ad5e is described below
commit 1d1f03ad5e2644b558b5eb65f737665b24697689
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Sep 23 17:02:30 2024 -0400
ARTEMIS-4971 Warning on Unacked messages through mirror AckManager
---
.../api/config/ActiveMQDefaultConfiguration.java | 5 +++
.../protocol/amqp/connect/mirror/AckManager.java | 28 +++++++-----
.../amqp/logger/ActiveMQAMQPProtocolLogger.java | 6 +++
.../artemis/core/config/Configuration.java | 9 ++++
.../core/config/impl/ConfigurationImpl.java | 13 ++++++
.../deployers/impl/FileConfigurationParser.java | 3 ++
.../resources/schema/artemis-configuration.xsd | 8 ++++
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../ConfigurationTest-xinclude-config.xml | 1 +
.../ConfigurationTest-xinclude-schema-config.xml | 1 +
.../integration/amqp/connect/AckManagerTest.java | 51 ++++++++++++++++++++++
12 files changed, 117 insertions(+), 10 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index d8bf42940d..11de47c758 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -715,6 +715,7 @@ public final class ActiveMQDefaultConfiguration {
private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".RETRY_DELAY", "100"));;
+ private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED =
false;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
/**
@@ -1961,6 +1962,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MIRROR_ACK_MANAGER_PAGE_ATTEMPTS;
}
+ public static boolean getMirrorAckManagerWarnUnacked() {
+ return DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED;
+ }
+
public static int getMirrorAckManagerRetryDelay() {
return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 3dc5322136..0ef1a9b649 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -57,6 +57,7 @@ import
org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,6 +79,7 @@ public class AckManager implements ActiveMQComponent {
ActiveMQScheduledComponent scheduledComponent;
public AckManager(ActiveMQServer server) {
+ assert server != null && server.getConfiguration() != null;
this.server = server;
this.configuration = server.getConfiguration();
this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
@@ -260,7 +262,7 @@ public class AckManager implements ActiveMQComponent {
page.usageDown();
}
}
- validateExpiredSet(acksToRetry);
+ validateExpiredSet(address, acksToRetry);
} else {
logger.trace("Page Scan not required for address {}", address);
}
@@ -283,16 +285,19 @@ public class AckManager implements ActiveMQComponent {
}
- private void validateExpiredSet(LongObjectHashMap<JournalHashMap<AckRetry,
AckRetry, Queue>> queuesToRetry) {
- queuesToRetry.forEach(this::validateExpireSet);
+ private void validateExpiredSet(SimpleString address,
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
+ queuesToRetry.forEach((q, r) -> this.validateExpireSet(address, q, r));
}
- private void validateExpireSet(long queueID, JournalHashMap<AckRetry,
AckRetry, Queue> retries) {
+ private void validateExpireSet(SimpleString address, long queueID,
JournalHashMap<AckRetry, AckRetry, Queue> retries) {
for (AckRetry retry : retries.valuesCopy()) {
if (retry.getQueueAttempts() >=
configuration.getMirrorAckManagerQueueAttempts()) {
if (retry.attemptedPage() >=
configuration.getMirrorAckManagerPageAttempts()) {
+ if (configuration.isMirrorAckManagerWarnUnacked()) {
+ ActiveMQAMQPProtocolLogger.LOGGER.ackRetryFailed(retry,
address, queueID);
+ }
if (logger.isDebugEnabled()) {
- logger.debug("Retried {} {} times, giving up on the entry
now. Configuration Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
+ logger.debug("Retried {} {} times, giving up on the entry
now. Configured Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
} else {
@@ -300,6 +305,8 @@ public class AckManager implements ActiveMQComponent {
logger.debug("Retry {} attempted {} times on paging,
Configuration Page Attempts={}", retry, retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
}
}
+ } else {
+ logger.debug("Retry {} queue attempted {} times on paging,
QueueAttempts {} Configuration Page Attempts={}", retry,
retry.getQueueAttempts(), retry.getPageAttempts(),
configuration.getMirrorAckManagerPageAttempts());
}
}
}
@@ -418,9 +425,14 @@ public class AckManager implements ActiveMQComponent {
if (reference == null) {
if (logger.isDebugEnabled()) {
logger.debug("ACK Manager could not find reference nodeID={}
(while localID={}), messageID={} on queue {}, server={}. Adding retry with
minQueue={}, maxPage={}, delay={}", nodeID,
referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(),
server, configuration.getMirrorAckManagerQueueAttempts(),
configuration.getMirrorAckManagerPageAttempts(),
configuration.getMirrorAckManagerRetryDelay());
- printQueueDebug(targetQueue);
}
+
if (allowRetry) {
+ if (configuration != null &&
configuration.isMirrorAckManagerWarnUnacked() && targetQueue.getConsumerCount()
> 0) {
+
ActiveMQAMQPProtocolLogger.LOGGER.unackWithConsumer(targetQueue.getConsumerCount(),
targetQueue.getName(), nodeID, messageID);
+ } else {
+ logger.debug("There are {} consumers on queue {}, what made Ack
for message with nodeID={}, messageID={} enter a retry list",
targetQueue.getConsumerCount(), targetQueue.getName(), nodeID, messageID);
+ }
addRetry(nodeID, targetQueue, messageID, reason);
}
return false;
@@ -436,10 +448,6 @@ public class AckManager implements ActiveMQComponent {
}
}
- private void printQueueDebug(Queue targetQueue) {
- logger.debug("... queue {}/{} had {} consumers, {} messages, {}
scheduled messages, {} delivering messages, paging={}", targetQueue.getID(),
targetQueue.getName(), targetQueue.getConsumerCount(),
targetQueue.getMessageCount(), targetQueue.getScheduledCount(),
targetQueue.getDeliveringCount(), targetQueue.getPagingStore().isPaging());
- }
-
private void doACK(Queue targetQueue, MessageReference reference, AckReason
reason) {
try {
switch (reason) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
index dc7dbda0ac..02e78a783b 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java
@@ -67,4 +67,10 @@ public interface ActiveMQAMQPProtocolLogger {
@LogMessage(id = 111010, value = "Duplicate AckManager node detected.
Queue={}, ServerID={}, recordID={}", level = LogMessage.Level.WARN)
void duplicateNodeStoreID(String queue, String serverId, long recordID,
Exception trace);
+
+ @LogMessage(id = 111011, value = "There are {} consumers on queue {}, what
made the Ack for message with nodeID={}, messageID={} enter a retry list",
level = LogMessage.Level.WARN)
+ void unackWithConsumer(int numberOfConsumers, Object queueName, String
nodeID, long messageID);
+
+ @LogMessage(id = 111012, value = "Acknowledgement retry failed for {} on
address {}, queueID={}", level = LogMessage.Level.WARN)
+ void ackRetryFailed(Object ackRetryInformation, Object address, long
queueID);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 1df52bd0bc..17765802b5 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1535,4 +1535,13 @@ public interface Configuration {
boolean isMirrorPageTransaction();
Configuration setMirrorPageTransaction(boolean ignorePageTransactions);
+
+ /**
+ * should log.warn when ack retries failed.
+ * @param warnUnacked
+ * @return
+ */
+ Configuration setMirrorAckManagerWarnUnacked(boolean warnUnacked);
+
+ boolean isMirrorAckManagerWarnUnacked();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 4983527b6b..88b7721099 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -444,6 +444,8 @@ public class ConfigurationImpl implements Configuration,
Serializable {
private int mirrorAckManagerPageAttempts =
ActiveMQDefaultConfiguration.getMirrorAckManagerPageAttempts();
+ private boolean mirrorAckManagerWarnUnacked =
ActiveMQDefaultConfiguration.getMirrorAckManagerWarnUnacked();
+
private int mirrorAckManagerRetryDelay =
ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
private boolean mirrorPageTransaction =
ActiveMQDefaultConfiguration.getMirrorPageTransaction();
@@ -3385,6 +3387,17 @@ public class ConfigurationImpl implements Configuration,
Serializable {
return mirrorAckManagerQueueAttempts;
}
+ @Override
+ public boolean isMirrorAckManagerWarnUnacked() {
+ return mirrorAckManagerWarnUnacked;
+ }
+
+ @Override
+ public ConfigurationImpl setMirrorAckManagerWarnUnacked(boolean
warnUnacked) {
+ this.mirrorAckManagerWarnUnacked = warnUnacked;
+ return this;
+ }
+
@Override
public ConfigurationImpl setMirrorAckManagerQueueAttempts(int
minQueueAttempts) {
logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}",
minQueueAttempts);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 588e0651fa..6819b307ad 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -386,6 +386,7 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String MIRROR_ACK_MANAGER_PAGE_ATTEMPTS =
"mirror-ack-manager-page-attempts";
private static final String MIRROR_ACK_MANAGER_RETRY_DELAY =
"mirror-ack-manager-retry-delay";
+ private static final String MIRROR_ACK_MANAGER_WARN_UNACKED =
"mirror-ack-manager-warn-unacked";
private static final String MIRROR_PAGE_TRANSACTION =
"mirror-page-transaction";
@@ -868,6 +869,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setMirrorAckManagerRetryDelay(getInteger(e,
MIRROR_ACK_MANAGER_RETRY_DELAY, config.getMirrorAckManagerRetryDelay(),
GT_ZERO));
+ config.setMirrorAckManagerWarnUnacked(getBoolean(e,
MIRROR_ACK_MANAGER_WARN_UNACKED, config.isMirrorAckManagerWarnUnacked()));
+
parseAddressSettings(e, config);
parseResourceLimits(e, config);
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index e16a7dc9b2..4905b77f98 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -944,6 +944,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="mirror-ack-manager-warn-unacked"
type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
+ <xsd:annotation>
+ <xsd:documentation>
+ Should the system log.warn when an acknowledgment retry
fails (unacked).
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="mirror-page-transaction" type="xsd:boolean"
maxOccurs="1" minOccurs="0" default="false">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 3d8fb9b033..e32527ee31 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -582,6 +582,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertEquals(Integer.valueOf(128),
conf.getAddressSettings().get("a2").getInitialQueueBufferSize());
assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
+ assertTrue(conf.isMirrorAckManagerWarnUnacked());
assertEquals(222, conf.getMirrorAckManagerPageAttempts());
assertEquals(333, conf.getMirrorAckManagerRetryDelay());
assertTrue(conf.isMirrorPageTransaction());
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 36d43ad604..f53d5a4844 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -558,6 +558,7 @@
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+ <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
<security-settings>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 689ee7e436..99fac28fca 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -74,6 +74,7 @@
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+ <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
<remoting-incoming-interceptors>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index 1c163b4a14..c26879b294 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -74,6 +74,7 @@
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
<mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
<mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+ <mirror-ack-manager-warn-unacked>true</mirror-ack-manager-warn-unacked>
<mirror-page-transaction>true</mirror-page-transaction>
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
index 89f81fd898..fc3b53efa3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
@@ -45,11 +46,13 @@ import org.apache.activemq.artemis.core.paging.impl.Page;
import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
@@ -64,6 +67,7 @@ import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -294,6 +298,53 @@ public class AckManagerTest extends ActiveMQTestBase {
}
+ @Test
+ public void testLogUnack() throws Throwable {
+ String protocol = "AMQP";
+
+ SimpleString TOPIC_NAME = SimpleString.of("tp" +
RandomUtil.randomString());
+
+ server1.addAddressInfo(new
AddressInfo(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+
+ ConnectionFactory connectionFactory =
CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+ // creating 5 subscriptions
+ for (int i = 0; i < 5; i++) {
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.setClientID("c" + i);
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(TOPIC_NAME.toString());
+ session.createDurableSubscriber(topic, "s" + i);
+ }
+ }
+
+ Queue c1s1 = server1.locateQueue("c1.s1");
+ assertNotNull(c1s1);
+ Queue c2s2 = server1.locateQueue("c2.s2");
+ assertNotNull(c2s2);
+
+ try (AssertionLoggerHandler assertionLoggerHandler = new
AssertionLoggerHandler()) {
+ server1.getConfiguration().setMirrorAckManagerWarnUnacked(true);
+ c1s1.addConsumer(Mockito.mock(Consumer.class));
+ AckManager ackManager = AckManagerProvider.getManager(server1);
+ ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true);
+
+ // ID for there are consumers
+ Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111011"),
5000, 100);
+ // ID for give up retry
+ Wait.assertTrue(() -> assertionLoggerHandler.findText("AMQ111012"),
5000, 100);
+
+ server1.getConfiguration().setMirrorAckManagerWarnUnacked(false);
+ assertionLoggerHandler.clear();
+
+ ackManager.ack("neverFound", c1s1, 1000, AckReason.NORMAL, true);
+
+ // ID for there are consumers
+ assertFalse(assertionLoggerHandler.findText("AMQ111011"));
+ // ID for give up retry
+ assertFalse(assertionLoggerHandler.findText("AMQ111012"));
+ }
+ }
@Test
public void testRetryFromPaging() throws Throwable {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact