This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b967e6d940 ARTEMIS-4362 Produce log.warn when the system cannot depage
because of pending acks
b967e6d940 is described below
commit b967e6d9400a04d81be69a0f4f75d838b27fd665
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Jul 12 16:47:12 2023 -0400
ARTEMIS-4362 Produce log.warn when the system cannot depage because of
pending acks
---
.../artemis/core/server/ActiveMQQueueLogger.java | 40 +++++++
.../artemis/core/server/ActiveMQServerLogger.java | 14 +--
.../artemis/core/server/impl/QueueImpl.java | 33 +++++-
.../integration/paging/PagingMaxReadLimitTest.java | 128 +++++++++++++++++++++
4 files changed, 207 insertions(+), 8 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java
new file mode 100644
index 0000000000..4e1c16cacc
--- /dev/null
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.logs.BundleFactory;
+import org.apache.activemq.artemis.logs.annotation.LogBundle;
+import org.apache.activemq.artemis.logs.annotation.LogMessage;
+
+/**
+ * This is using a separate Logger specific for Queue / QueueImpl.
+ * It's using Queue.class.getName() as the category as it would be possible to
disable this logger with log4j.
+ * This is sharing the codes with ActiveMQServerLogger (meaning the codes
between here and ActiveMQServerLogger have to be unique).
+ */
+@LogBundle(projectCode = "AMQ", regexID = "22[0-9]{4}")
+public interface ActiveMQQueueLogger {
+
+ ActiveMQQueueLogger LOGGER =
BundleFactory.newBundle(ActiveMQQueueLogger.class, Queue.class.getName());
+
+ @LogMessage(id = 224127, value = "Message dispatch from paging is blocked.
Address {}/Queue {} will not read any more messages from paging until pending
messages are acknowledged. There are currently {} messages pending ({} bytes)
with max reads at maxPageReadMessages({}) and maxPageReadBytes({}). Either
increase reading attributes at the address-settings or change your consumers to
acknowledge more often.", level = LogMessage.Level.WARN)
+ void warnPageFlowControl(String address,
+ String queue,
+ long messageCount,
+ long messageBytes,
+ long maxMessages,
+ long maxMessagesBytes);
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index aad5fd93f0..72ef079664 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -641,7 +641,9 @@ public interface ActiveMQServerLogger {
void timeoutLockingConsumer(String consumer, String remoteAddress);
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage
= {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
- void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString
idsHeaderName);
+ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
+ org.apache.activemq.artemis.api.core.Message
messageCopy,
+ SimpleString idsHeaderName);
@LogMessage(id = 222111, value = "exception while invoking {} on {}", level
= LogMessage.Level.TRACE)
void managementOperationError(String op, String resourceName, Exception e);
@@ -905,10 +907,7 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222201, value = "Timed out waiting for activation to
exit", level = LogMessage.Level.WARN)
void activationTimeout();
- @LogMessage(id = 222202, value = "{}: <{}> should not be set to the same
value as <{}>. " +
- "If a system is under high load, or there is a minor network delay, "
+
- "there is a high probability of a cluster split/failure due to
connection timeout.",
- level = LogMessage.Level.WARN)
+ @LogMessage(id = 222202, value = "{}: <{}> should not be set to the same
value as <{}>. " + "If a system is under high load, or there is a minor
network delay, " + "there is a high probability of a cluster split/failure due
to connection timeout.", level = LogMessage.Level.WARN)
void connectionTTLEqualsCheckPeriod(String connectionName, String ttl,
String checkPeriod);
@LogMessage(id = 222203, value = "Classpath lacks a protocol-manager for
protocol {}, Protocol being ignored on acceptor {}", level =
LogMessage.Level.WARN)
@@ -1118,7 +1117,7 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222702, value = "Message ack in prepared tx for queue {}
which does not exist. This ack will be ignored.", level = LogMessage.Level.WARN)
void journalMessageAckMissingQueueInPreparedTX(Long queueID);
- @LogMessage(id = 222703, value = "Address \"{}\" is full. Bridge {} will
disconnect", level = LogMessage.Level.WARN)
+ @LogMessage(id = 222703, value = "Address \"{}\" is full. Bridge {} will
disconnect", level = LogMessage.Level.WARN)
void bridgeAddressFull(String addressName, String bridgeName);
@LogMessage(id = 222274, value = "Failed to deploy address {}: {}", level =
LogMessage.Level.WARN)
@@ -1180,7 +1179,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222295, value = "There is a possible split brain on nodeID
{}. Topology update ignored", level = LogMessage.Level.WARN)
void possibleSplitBrain(String nodeID);
-
@LogMessage(id = 222296, value = "Unable to deploy Hawtio MBeam, console
client side RBAC not available", level = LogMessage.Level.WARN)
void unableToDeployHawtioMBean(Throwable e);
@@ -1584,4 +1582,6 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224126, value = "Failure during protocol handshake on
connection to {} from {}", level = LogMessage.Level.ERROR)
void failureDuringProtocolHandshake(SocketAddress localAddress,
SocketAddress remoteAddress, Throwable e);
+
+ // notice loggerID=224127 is reserved as it's been used at
ActiveMQQueueLogger
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7ded73cf11..2ff193595b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -77,6 +77,7 @@ import
org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQQueueLogger;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
@@ -179,6 +180,16 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private volatile boolean queueDestroyed = false;
+ // Variable to control if we should print a flow controlled message or not.
+ // Once it was flow controlled, we will stop warning until it's cleared
once again
+ private volatile boolean pageFlowControlled = false;
+
+ private volatile long pageFlowControlledLastLog = 0;
+
+ // It is not expected to have an user really changing this. This is a
system property now in case users disagree and find value on changing it.
+ // In case there is in fact value on changing it we may consider bringing
it as an address-settings or broker.xml
+ private static final long PAGE_FLOW_CONTROL_PRINT_INTERVAL =
Long.parseLong(System.getProperty("ARTEMIS_PAGE_FLOW_CONTROL_PRINT_INTERVAL",
"60000"));
+
// once we delivered messages from paging, we need to call asyncDelivery
upon acks
// if we flow control paging, ack more messages will open the space to
deliver more messages
// hence we will need this flag to determine if it was paging before.
@@ -3298,8 +3309,28 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return queueMemorySize.getSize() <
pageSubscription.getPagingStore().getMaxSize() &&
intermediateMessageReferences.size() + messageReferences.size() <
MAX_DEPAGE_NUM;
} else {
- return (maxReadBytes <= 0 || (queueMemorySize.getSize() +
deliveringMetrics.getPersistentSize()) < maxReadBytes) &&
+ boolean needsDepageResult = (maxReadBytes <= 0 ||
(queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) <
maxReadBytes) &&
(maxReadMessages <= 0 || (queueMemorySize.getElements() +
deliveringMetrics.getMessageCount()) < maxReadMessages);
+
+ if (!needsDepageResult) {
+ if (!pageFlowControlled && (maxReadBytes > 0 &&
deliveringMetrics.getPersistentSize() >= maxReadBytes || maxReadMessages > 0 &&
deliveringMetrics.getMessageCount() >= maxReadMessages)) {
+ if (System.currentTimeMillis() - pageFlowControlledLastLog >
PAGE_FLOW_CONTROL_PRINT_INTERVAL) {
+ pageFlowControlledLastLog = System.currentTimeMillis();
+
ActiveMQQueueLogger.LOGGER.warnPageFlowControl(String.valueOf(address),
String.valueOf(name), deliveringMetrics.getMessageCount(),
deliveringMetrics.getPersistentSize(), maxReadMessages, maxReadBytes);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message dispatch from paging is blocked.
Address {}/Queue{} will not read any more messages from paging " +
+ "until pending messages are acknowledged.
There are currently {} messages pending ({} bytes) with max reads at " +
+ "maxPageReadMessages({}) and
maxPageReadBytes({}). Either increase reading attributes at the
address-settings or change your consumers to acknowledge more often.",
+ address, name,
deliveringMetrics.getMessageCount(), deliveringMetrics.getPersistentSize(),
maxReadMessages, maxReadBytes);
+ }
+ pageFlowControlled = true;
+ }
+ } else {
+ pageFlowControlled = false;
+ }
+
+ return needsDepageResult;
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java
new file mode 100644
index 0000000000..a814b17cc0
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PagingMaxReadLimitTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ ActiveMQServer server;
+
+ @Test
+ public void testMaxReadPageMessages() throws Exception {
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ runAfter(service::shutdownNow);
+
+ Configuration config = createDefaultConfig(true);
+
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
+
+ final int PAGE_MAX = 20 * 1024;
+
+ final int PAGE_SIZE = 10 * 1024;
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 100, -1, (long)
(PAGE_MAX * 10), null, null, null);
+ server.start();
+
+ server.addAddressInfo(new
AddressInfo(getName()).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST));
+
+ Wait.assertTrue(() -> server.locateQueue(getName()) != null);
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.locateQueue(getName());
+
+ ConnectionFactory connectionFactory =
CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+
+ try (Connection connection = connectionFactory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ for (int i = 0; i < 500; i++) {
+ producer.send(session.createTextMessage("Hello " + i));
+ }
+ session.commit();
+
+ Assert.assertTrue(serverQueue.getPagingStore().isPaging());
+ }
+
+ AssertionLoggerHandler.startCapture();
+ runAfter(AssertionLoggerHandler::stopCapture);
+
+ AtomicInteger errorCounter = new AtomicInteger(0);
+ CountDownLatch done = new CountDownLatch(1);
+
+ service.execute(() -> {
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(getName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < 500;) {
+ Message message = consumer.receive(10);
+ if (message == null) {
+ session.commit();
+ } else {
+ i++;
+ }
+ }
+ session.commit();
+ } catch (Throwable e) {
+ logger.debug(e.getMessage(), e);
+ errorCounter.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+
+ Assert.assertTrue(done.await(5, TimeUnit.SECONDS));
+ Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ224127"),
2000, 10);
+ Assert.assertEquals(0, errorCounter.get());
+
+ }
+
+}
\ No newline at end of file