This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 1f0ec50058e64e79dd5f1bfc9b4f1abab3f515cd Author: Clebert Suconic <[email protected]> AuthorDate: Wed Mar 5 14:51:04 2025 -0500 ARTEMIS-5338 AckManager flow control Say there are too many records in the JournalHashMap. In that case the sender should pace sending more data. The target should limit credits when the configured max pending records is reached. --- .../amqp/broker/ProtonProtocolManager.java | 11 ++ .../connect/mirror/AMQPMirrorControllerTarget.java | 32 ++++- .../protocol/amqp/connect/mirror/AckManager.java | 33 ++++- .../amqp/connect/mirror/AckManagerProvider.java | 3 + .../amqp/proton/ProtonAbstractReceiver.java | 6 +- .../org/apache/activemq/artemis/utils/Wait.java | 5 + .../integration/amqp/connect/AckManagerTest.java | 154 ++++++++++++++++++++- .../mirror/AccumulatedInPageSoakTest.java | 5 + 8 files changed, 240 insertions(+), 9 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 52d767c2df..af3398f243 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -112,6 +112,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, private boolean directDeliver = true; + private int mirrorMaxPendingAcks = 10_000; + private final AMQPRoutingHandler routingHandler; /* @@ -164,6 +166,15 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, return amqpMinLargeMessageSize; } + public int getMirrorMaxPendingAcks() { + return mirrorMaxPendingAcks; + } + + public ProtonProtocolManager setMirrorMaxPendingAcks(int maxPendingAcks) { + this.mirrorMaxPendingAcks = maxPendingAcks; + return this; + } + public ProtonProtocolManager setAmqpMinLargeMessageSize(int amqpMinLargeMessageSize) { this.amqpMinLargeMessageSize = amqpMinLargeMessageSize; return this; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index be0895289d..059d3e9122 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -100,6 +100,27 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement return CONTROLLER_THREAD_LOCAL.get(); } + + @Override + public boolean isBusy() { + return getAckManager().size() > getMirrorMaxPendingAcks(); + } + + public int getMirrorMaxPendingAcks() { + try { + return connection.getProtocolManager().getMirrorMaxPendingAcks(); + } catch (Throwable e) { + // It shouldn't happen, but if it did we just log it + logger.warn(e.getMessage(), e); + return 0; + } + } + + public void verifyCredits() { + connection.runNow(creditTopUpRunner); + } + + /** * Objects of this class can be used by either transaction or by OperationContext. It is important that when you're * using the transactions you clear any references to the operation context. Don't use transaction and @@ -473,14 +494,17 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName()); } + getAckManager().ack(nodeID, targetQueue, messageID, reason, true); + + OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL); + } + + private AckManager getAckManager() { if (ackManager == null) { ackManager = AckManagerProvider.getManager(server); ackManager.registerMirror(this); } - - ackManager.ack(nodeID, targetQueue, messageID, reason, true); - - OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL); + return ackManager; } /** diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 0faa078a20..5e3b4b744f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.LongSupplier; import io.netty.util.collection.LongObjectHashMap; @@ -78,6 +79,13 @@ public class AckManager implements ActiveMQComponent { volatile MultiStepProgress progress; ActiveMQScheduledComponent scheduledComponent; + private volatile int size; + private static final AtomicIntegerFieldUpdater<AckManager> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size"); + + public int size() { + return sizeUpdater.get(this); + } + public AckManager(ActiveMQServer server) { assert server != null && server.getConfiguration() != null; this.server = server; @@ -92,6 +100,7 @@ public class AckManager implements ActiveMQComponent { public void reload(RecordInfo recordInfo) { journalHashMapProvider.reload(recordInfo); + sizeUpdater.incrementAndGet(this); } @Override @@ -104,6 +113,13 @@ public class AckManager implements ActiveMQComponent { logger.debug("Stopping ackmanager on server {}", server); } + public synchronized void pause() { + if (scheduledComponent != null) { + scheduledComponent.stop(); + scheduledComponent = null; + } + } + @Override public synchronized boolean isStarted() { return scheduledComponent != null && scheduledComponent.isStarted(); @@ -188,6 +204,11 @@ public class AckManager implements ActiveMQComponent { targetCopy.forEach(AMQPMirrorControllerTarget::flush); } + private void checkFlowControlMirrorTargets() { + List<AMQPMirrorControllerTarget> targetCopy = copyTargets(); + targetCopy.forEach(AMQPMirrorControllerTarget::verifyCredits); + } + private synchronized List<AMQPMirrorControllerTarget> copyTargets() { return new ArrayList<>(mirrorControllerTargets); } @@ -267,6 +288,8 @@ public class AckManager implements ActiveMQComponent { logger.trace("Page Scan not required for address {}", address); } + checkFlowControlMirrorTargets(); + } catch (Throwable e) { logger.warn(e.getMessage(), e); } finally { @@ -299,7 +322,9 @@ public class AckManager implements ActiveMQComponent { if (logger.isDebugEnabled()) { logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } - retries.remove(retry); + if (retries.remove(retry) != null) { + sizeUpdater.decrementAndGet(AckManager.this); + } } else { if (logger.isDebugEnabled()) { logger.debug("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); @@ -353,7 +378,9 @@ public class AckManager implements ActiveMQComponent { } } } - retries.remove(ackRetry, transaction.getID()); + if (retries.remove(ackRetry, transaction.getID()) != null) { + sizeUpdater.decrementAndGet(AckManager.this); + } transaction.setContainsPersistent(); logger.trace("retry performed ok, ackRetry={} for message={} on queue", ackRetry, pagedMessage); } @@ -389,6 +416,7 @@ public class AckManager implements ActiveMQComponent { if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) { logger.trace("Removing retry {} as the retry went ok", retry); queueRetries.remove(retry); + sizeUpdater.decrementAndGet(this); } else { int retried = retry.attemptedQueue(); if (logger.isTraceEnabled()) { @@ -410,6 +438,7 @@ public class AckManager implements ActiveMQComponent { } AckRetry retry = new AckRetry(nodeID, messageID, reason); journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry); + sizeUpdater.incrementAndGet(this); if (scheduledComponent != null) { // we set the retry delay again in case it was changed. scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay()); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java index aaf77641a6..4b570e5433 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java @@ -49,6 +49,9 @@ public class AckManagerProvider { } public static AckManager getManager(ActiveMQServer server) { + if (server == null) { + throw new NullPointerException("server is null"); + } synchronized (managerHashMap) { AckManager ackManager = managerHashMap.get(server); if (ackManager != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 794b9055b4..a5acddbc06 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -175,6 +175,10 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme return state == ReceiverState.STARTED; } + public boolean isBusy() { + return false; + } + public boolean isStopping() { return state == ReceiverState.STOPPING; } @@ -276,7 +280,7 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme if (connection.isHandler()) { connection.requireInHandler(); - if (context.isStarted()) { + if (context.isStarted() && !context.isBusy()) { final int pending = context.pendingSettles; if (isBellowThreshold(receiver.getCredit(), pending, threshold)) { diff --git a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java index 0864f5a8bd..dd287670d8 100644 --- a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java +++ b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java @@ -172,6 +172,11 @@ public class Wait { assertTrue(failureMessage, condition, duration, SLEEP_MILLIS); } + public static <T> T assertNotNull(Supplier<T> supplier, final long duration, final long sleep) throws Exception { + Assertions.assertTrue(waitFor(() -> supplier.get() != null, duration, sleep)); + return supplier.get(); + } + public static void assertTrue(Condition condition, final long duration, final long sleep) { assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java index 650104fd77..a185b9c4a2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java @@ -26,10 +26,13 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import io.netty.util.collection.LongObjectHashMap; +import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.collections.JournalHashMap; @@ -43,9 +46,11 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider; @@ -57,17 +62,27 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; public class AckManagerTest extends ActiveMQTestBase { @@ -86,11 +101,11 @@ public class AckManagerTest extends ActiveMQTestBase { server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20)); server1.getConfiguration().getAcceptorConfigurations().clear(); server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616"); - server1.start(); } @Test public void testDirectACK() throws Throwable { + server1.start(); String protocol = "AMQP"; @@ -289,9 +304,9 @@ public class AckManagerTest extends ActiveMQTestBase { assertEquals(0, AckManagerProvider.getSize()); } - @Test public void testLogUnack() throws Throwable { + server1.start(); String protocol = "AMQP"; SimpleString TOPIC_NAME = SimpleString.of("tp" + RandomUtil.randomUUIDString()); @@ -340,6 +355,7 @@ public class AckManagerTest extends ActiveMQTestBase { @Test public void testRetryFromPaging() throws Throwable { + server1.start(); String protocol = "AMQP"; @@ -440,6 +456,140 @@ public class AckManagerTest extends ActiveMQTestBase { + @Test + public void testFlowControlOnPendingAcks() throws Throwable { + + server1.getConfiguration().getAcceptorConfigurations().clear(); + server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616?mirrorMaxPendingAcks=100&amqpCredits=100"); + server1.start(); + + String protocol = "AMQP"; + + SimpleString QUEUE_NAME = SimpleString.of("queue_" + RandomUtil.randomUUIDString()); + + Queue testQueue = server1.createQueue(QueueConfiguration.of(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + // First step... adding messages to a queue // 50% paging 50% queue + try (Connection connection = connectionFactory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue jmsQueue = session.createQueue(QUEUE_NAME.toString()); + MessageProducer producer = session.createProducer(jmsQueue); + for (int i = 0; i < 100; i++) { + if (i == 50) { + session.commit(); + testQueue.getPagingStore().startPaging(); + } + producer.send(session.createTextMessage("hello there " + i)); + } + session.commit(); + } + + Wait.assertEquals(100, testQueue::getMessageCount); + + AckManager ackManager = AckManagerProvider.getManager(server1); + assertTrue(ackManager.isStarted()); + ackManager.pause(); + assertFalse(ackManager.isStarted()); + assertSame(ackManager, AckManagerProvider.getManager(server1)); + + // adding fake retries to flood the manager beyond capacity + addFakeRetries(ackManager, testQueue, 1000); + + AmqpClient client = new AmqpClient(new URI("tcp://localhost:61616"), null, null); + AmqpConnection connection = client.connect(); + runAfter(connection::close); + AmqpSession session = connection.createSession(); + + Map<Symbol, Object> properties = new HashMap<>(); + properties.put(AMQPMirrorControllerSource.BROKER_ID, "whatever"); + + // this is simulating a mirror connection... + // we play with a direct sender here to make sure flow control is working as expected when the records are beyond capacity. + AmqpSender sender = session.createSender(QueueImpl.MIRROR_ADDRESS, true, new Symbol[]{Symbol.getSymbol("amq.mirror")}, new Symbol[]{Symbol.getSymbol("amq.mirror")}, properties); + + AMQPMirrorControllerTarget mirrorControllerTarget = Wait.assertNotNull(() -> locateMirrorTarget(server1), 5000, 100); + assertEquals(100, mirrorControllerTarget.getConnection().getProtocolManager().getMirrorMaxPendingAcks()); + assertTrue(mirrorControllerTarget.isBusy()); + // first connection it should be beyond flow control capacity, we should not have any credits here now + assertEquals(0, sender.getEndpoint().getCredit()); + ackManager.start(); + + // manager resumed and the records will be eventually removed, we should be back to capacity + Wait.assertEquals(100, () -> sender.getEndpoint().getCredit(), 5000, 100); + ackManager.pause(); + + addFakeRetries(ackManager, testQueue, 1000); + assertEquals(1000, ackManager.size()); + + // we should be able to send 100 messages + for (int i = 0; i < 100; i++) { + AmqpMessage message = new AmqpMessage(); + message.setAddress(testQueue.getAddress().toString()); + message.setText("hello again " + i); + message.setDeliveryAnnotation(INTERNAL_ID.toString(), server1.getStorageManager().generateID()); + message.setDeliveryAnnotation(INTERNAL_DESTINATION.toString(), testQueue.getAddress().toString()); + sender.send(message); + } + // we should not get any credits + assertEquals(0, sender.getEndpoint().getCredit()); + + Wait.assertEquals(200, testQueue::getMessageCount); + + ackManager.start(); + // after restart, we should eventually get replenished on credits + Wait.assertEquals(100, () -> sender.getEndpoint().getCredit(), 5000, 100); + + Wait.assertEquals(200L, testQueue::getMessageCount, 5000, 100); + + AtomicInteger acked = new AtomicInteger(0); + ackManager.pause(); + + // Adding real deletes, we should still flow control credits + testQueue.forEach(ref -> { + long messageID = ref.getMessageID(); + + Long internalID = (Long) ref.getMessage().getAnnotation(AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY); + String nodeId = (String) ref.getMessage().getAnnotation(AMQPMirrorControllerSource.BROKER_ID_SIMPLE_STRING); + if (internalID != null) { + messageID = internalID.longValue(); + } + ackManager.addRetry(nodeId, testQueue, messageID, AckReason.NORMAL); + acked.incrementAndGet(); + }); + + assertEquals(200, acked.get()); + ackManager.start(); + + // Adding hot data... we should be able to flow credits during that + for (int i = 0; i < 100; i++) { + AmqpMessage message = new AmqpMessage(); + message.setAddress(testQueue.getAddress().toString()); + message.setText("one of the last 100"); + message.setDeliveryAnnotation(INTERNAL_ID.toString(), server1.getStorageManager().generateID()); + message.setDeliveryAnnotation(INTERNAL_DESTINATION.toString(), testQueue.getAddress().toString()); + sender.send(message); + } + Wait.assertTrue(() -> sender.getEndpoint().getCredit() > 0, 5000, 100); + + Wait.assertEquals(100L, testQueue::getMessageCount, 5000, 100); + + ackManager.stop(); + + connection.close(); + + server1.stop(); + + assertEquals(0, AckManagerProvider.getSize()); + } + + private void addFakeRetries(AckManager ackManager, Queue testQueue, int size) { + for (int i = 0; i < size; i++) { + // adding retries that will never succeed, just to fillup the storage hashmap + ackManager.addRetry(null, testQueue, server1.getStorageManager().generateID(), AckReason.NORMAL); + } + } private int getCounter(byte typeRecord, Map<Integer, AtomicInteger> values) { AtomicInteger value = values.get((int) typeRecord); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java index 04991e6ef2..ca6cde0225 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; @@ -109,6 +110,10 @@ public class AccumulatedInPageSoakTest extends SoakTestBase { File brokerPropertiesFile = new File(serverLocation, "broker.properties"); saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXML = new File(serverLocation, "/etc/broker.xml"); + // Making sure we flow control mirrorACK on the lower side to make sure things are working + assertTrue(FileUtil.findReplace(brokerXML, "</acceptor>", ";mirrorMaxPendingAcks=50</acceptor>")); } @BeforeAll --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
