This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b967e6d940 ARTEMIS-4362 Produce log.warn when the system cannot depage 
because of pending acks
b967e6d940 is described below

commit b967e6d9400a04d81be69a0f4f75d838b27fd665
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Jul 12 16:47:12 2023 -0400

    ARTEMIS-4362 Produce log.warn when the system cannot depage because of 
pending acks
---
 .../artemis/core/server/ActiveMQQueueLogger.java   |  40 +++++++
 .../artemis/core/server/ActiveMQServerLogger.java  |  14 +--
 .../artemis/core/server/impl/QueueImpl.java        |  33 +++++-
 .../integration/paging/PagingMaxReadLimitTest.java | 128 +++++++++++++++++++++
 4 files changed, 207 insertions(+), 8 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java
new file mode 100644
index 0000000000..4e1c16cacc
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQQueueLogger.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+import org.apache.activemq.artemis.logs.BundleFactory;
+import org.apache.activemq.artemis.logs.annotation.LogBundle;
+import org.apache.activemq.artemis.logs.annotation.LogMessage;
+
+/**
+ * This is using a separate Logger specific for Queue / QueueImpl.
+ * It's using Queue.class.getName() as the category as it would be possible to 
disable this logger with log4j.
+ * This is sharing the codes with ActiveMQServerLogger (meaning the codes 
between here and ActiveMQServerLogger have to be unique).
+ */
+@LogBundle(projectCode = "AMQ", regexID = "22[0-9]{4}")
+public interface ActiveMQQueueLogger {
+
+   ActiveMQQueueLogger LOGGER = 
BundleFactory.newBundle(ActiveMQQueueLogger.class, Queue.class.getName());
+
+   @LogMessage(id = 224127, value = "Message dispatch from paging is blocked. 
Address {}/Queue {} will not read any more messages from paging until pending 
messages are acknowledged. There are currently {} messages pending ({} bytes) 
with max reads at maxPageReadMessages({}) and maxPageReadBytes({}). Either 
increase reading attributes at the address-settings or change your consumers to 
acknowledge more often.", level = LogMessage.Level.WARN)
+   void warnPageFlowControl(String address,
+                            String queue,
+                            long messageCount,
+                            long messageBytes,
+                            long maxMessages,
+                            long maxMessagesBytes);
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index aad5fd93f0..72ef079664 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -641,7 +641,9 @@ public interface ActiveMQServerLogger {
    void timeoutLockingConsumer(String consumer, String remoteAddress);
 
    @LogMessage(id = 222110, value = "no queue IDs defined!,  originalMessage  
= {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
-   void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, 
org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString 
idsHeaderName);
+   void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
+                         org.apache.activemq.artemis.api.core.Message 
messageCopy,
+                         SimpleString idsHeaderName);
 
    @LogMessage(id = 222111, value = "exception while invoking {} on {}", level 
= LogMessage.Level.TRACE)
    void managementOperationError(String op, String resourceName, Exception e);
@@ -905,10 +907,7 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222201, value = "Timed out waiting for activation to 
exit", level = LogMessage.Level.WARN)
    void activationTimeout();
 
-   @LogMessage(id = 222202, value = "{}: <{}> should not be set to the same 
value as <{}>.  " +
-         "If a system is under high load, or there is a minor network delay, " 
+
-         "there is a high probability of a cluster split/failure due to 
connection timeout.",
-         level = LogMessage.Level.WARN)
+   @LogMessage(id = 222202, value = "{}: <{}> should not be set to the same 
value as <{}>.  " + "If a system is under high load, or there is a minor 
network delay, " + "there is a high probability of a cluster split/failure due 
to connection timeout.", level = LogMessage.Level.WARN)
    void connectionTTLEqualsCheckPeriod(String connectionName, String ttl, 
String checkPeriod);
 
    @LogMessage(id = 222203, value = "Classpath lacks a protocol-manager for 
protocol {}, Protocol being ignored on acceptor {}", level = 
LogMessage.Level.WARN)
@@ -1118,7 +1117,7 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222702, value = "Message ack in prepared tx for queue {} 
which does not exist. This ack will be ignored.", level = LogMessage.Level.WARN)
    void journalMessageAckMissingQueueInPreparedTX(Long queueID);
 
-   @LogMessage(id = 222703,  value = "Address \"{}\" is full. Bridge {} will 
disconnect", level = LogMessage.Level.WARN)
+   @LogMessage(id = 222703, value = "Address \"{}\" is full. Bridge {} will 
disconnect", level = LogMessage.Level.WARN)
    void bridgeAddressFull(String addressName, String bridgeName);
 
    @LogMessage(id = 222274, value = "Failed to deploy address {}: {}", level = 
LogMessage.Level.WARN)
@@ -1180,7 +1179,6 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222295, value = "There is a possible split brain on nodeID 
{}. Topology update ignored", level = LogMessage.Level.WARN)
    void possibleSplitBrain(String nodeID);
 
-
    @LogMessage(id = 222296, value = "Unable to deploy Hawtio MBeam, console 
client side RBAC not available", level = LogMessage.Level.WARN)
    void unableToDeployHawtioMBean(Throwable e);
 
@@ -1584,4 +1582,6 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224126, value = "Failure during protocol handshake on 
connection to {} from {}", level = LogMessage.Level.ERROR)
    void failureDuringProtocolHandshake(SocketAddress localAddress, 
SocketAddress remoteAddress, Throwable e);
+
+   // notice loggerID=224127 is reserved as it's been used at 
ActiveMQQueueLogger
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7ded73cf11..2ff193595b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -77,6 +77,7 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQQueueLogger;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Consumer;
@@ -179,6 +180,16 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private volatile boolean queueDestroyed = false;
 
+   // Variable to control if we should print a flow controlled message or not.
+   // Once it was flow controlled, we will stop warning until it's cleared 
once again
+   private volatile boolean pageFlowControlled = false;
+
+   private volatile long pageFlowControlledLastLog = 0;
+
+   // It is not expected to have an user really changing this. This is a 
system property now in case users disagree and find value on changing it.
+   // In case there is in fact value on changing it we may consider bringing 
it as an address-settings or broker.xml
+   private static final long PAGE_FLOW_CONTROL_PRINT_INTERVAL = 
Long.parseLong(System.getProperty("ARTEMIS_PAGE_FLOW_CONTROL_PRINT_INTERVAL", 
"60000"));
+
    // once we delivered messages from paging, we need to call asyncDelivery 
upon acks
    // if we flow control paging, ack more messages will open the space to 
deliver more messages
    // hence we will need this flag to determine if it was paging before.
@@ -3298,8 +3309,28 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          return queueMemorySize.getSize() < 
pageSubscription.getPagingStore().getMaxSize() &&
             intermediateMessageReferences.size() + messageReferences.size() < 
MAX_DEPAGE_NUM;
       } else {
-         return (maxReadBytes <= 0 || (queueMemorySize.getSize() + 
deliveringMetrics.getPersistentSize()) < maxReadBytes) &&
+         boolean needsDepageResult =  (maxReadBytes <= 0 || 
(queueMemorySize.getSize() + deliveringMetrics.getPersistentSize()) < 
maxReadBytes) &&
             (maxReadMessages <= 0 || (queueMemorySize.getElements() + 
deliveringMetrics.getMessageCount()) < maxReadMessages);
+
+         if (!needsDepageResult) {
+            if (!pageFlowControlled && (maxReadBytes > 0 && 
deliveringMetrics.getPersistentSize() >= maxReadBytes || maxReadMessages > 0 && 
deliveringMetrics.getMessageCount() >= maxReadMessages)) {
+               if (System.currentTimeMillis() - pageFlowControlledLastLog > 
PAGE_FLOW_CONTROL_PRINT_INTERVAL) {
+                  pageFlowControlledLastLog = System.currentTimeMillis();
+                  
ActiveMQQueueLogger.LOGGER.warnPageFlowControl(String.valueOf(address), 
String.valueOf(name), deliveringMetrics.getMessageCount(), 
deliveringMetrics.getPersistentSize(), maxReadMessages, maxReadBytes);
+               }
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Message dispatch from paging is blocked. 
Address {}/Queue{} will not read any more messages from paging " +
+                                  "until pending messages are acknowledged. 
There are currently {} messages pending ({} bytes) with max reads at " +
+                                  "maxPageReadMessages({}) and 
maxPageReadBytes({}). Either increase reading attributes at the 
address-settings or change your consumers to acknowledge more often.",
+                               address, name, 
deliveringMetrics.getMessageCount(), deliveringMetrics.getPersistentSize(), 
maxReadMessages, maxReadBytes);
+               }
+               pageFlowControlled = true;
+            }
+         } else {
+            pageFlowControlled = false;
+         }
+
+         return needsDepageResult;
       }
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java
new file mode 100644
index 0000000000..a814b17cc0
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingMaxReadLimitTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PagingMaxReadLimitTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   @Test
+   public void testMaxReadPageMessages() throws Exception {
+
+      ExecutorService service = Executors.newSingleThreadExecutor();
+      runAfter(service::shutdownNow);
+
+      Configuration config = createDefaultConfig(true);
+      
config.setJournalSyncTransactional(false).setJournalSyncTransactional(false);
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 100, -1, (long) 
(PAGE_MAX * 10), null, null, null);
+      server.start();
+
+      server.addAddressInfo(new 
AddressInfo(getName()).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST));
+
+      Wait.assertTrue(() -> server.locateQueue(getName()) != null);
+
+      org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(getName());
+
+      ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+
+      try (Connection connection = connectionFactory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(getName());
+         MessageProducer producer = session.createProducer(queue);
+         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         for (int i = 0; i < 500; i++) {
+            producer.send(session.createTextMessage("Hello " + i));
+         }
+         session.commit();
+
+         Assert.assertTrue(serverQueue.getPagingStore().isPaging());
+      }
+
+      AssertionLoggerHandler.startCapture();
+      runAfter(AssertionLoggerHandler::stopCapture);
+
+      AtomicInteger errorCounter = new AtomicInteger(0);
+      CountDownLatch done = new CountDownLatch(1);
+
+      service.execute(() -> {
+         try (Connection connection = connectionFactory.createConnection()) {
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue(getName());
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            for (int i = 0; i < 500;) {
+               Message message = consumer.receive(10);
+               if (message == null) {
+                  session.commit();
+               } else {
+                  i++;
+               }
+            }
+            session.commit();
+         } catch (Throwable e) {
+            logger.debug(e.getMessage(), e);
+            errorCounter.incrementAndGet();
+         } finally {
+            done.countDown();
+         }
+      });
+
+      Assert.assertTrue(done.await(5, TimeUnit.SECONDS));
+      Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ224127"), 
2000, 10);
+      Assert.assertEquals(0, errorCounter.get());
+
+   }
+
+}
\ No newline at end of file

Reply via email to