This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new e719622de5 ARTEMIS-4285 Limit number of redelivery records e719622de5 is described below commit e719622de5488f859f70beda926afaa51d5b0ff9 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Thu May 18 13:27:13 2023 -0400 ARTEMIS-4285 Limit number of redelivery records --- .../activemq/artemis/cli/commands/etc/broker.xml | 4 + .../api/config/ActiveMQDefaultConfiguration.java | 7 + .../artemis/core/config/Configuration.java | 7 + .../core/config/impl/ConfigurationImpl.java | 13 ++ .../deployers/impl/FileConfigurationParser.java | 2 + .../journal/AbstractJournalStorageManager.java | 7 + .../journal/codec/ScheduledDeliveryEncoding.java | 2 +- .../resources/schema/artemis-configuration.xsd | 11 ++ .../config/impl/FileConfigurationParserTest.java | 16 ++ .../artemis/tests/util/ActiveMQTestBase.java | 2 +- .../resources/ConfigurationTest-full-config.xml | 1 + docs/user-manual/en/configuration-index.md | 1 + docs/user-manual/en/undelivered-messages.md | 6 + .../client/RescheduleJDBCDeliveryTest.java | 180 +++++++++++++++++++++ 14 files changed, 257 insertions(+), 2 deletions(-) diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index accc00a171..9ea75a2d4f 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -31,6 +31,10 @@ under the License. ${jdbc} <persistence-enabled>${persistence-enabled}</persistence-enabled> + <!-- It is recommended to keep this value as 1, maximizing the number of records stored about redeliveries. + However if you must preserve state of individual redeliveries, you may increase this value or set it to -1 (infinite). --> + <max-redelivery-records>1</max-redelivery-records> + <!-- this could be ASYNCIO, MAPPED, NIO ASYNCIO: Linux Libaio MAPPED: mmap files diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 69c1aba3c9..0686cdd985 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -248,6 +248,9 @@ public final class ActiveMQDefaultConfiguration { // True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. private static boolean DEFAULT_PERSIST_DELIVERY_COUNT_BEFORE_DELIVERY = false; + // Default Maximum number of records we would store for redeliveries + private static int DEFAULT_MAX_REDELIVERY_RECORDS = 10; + // the directory to store paged messages in private static String DEFAULT_PAGING_DIR = "data/paging"; @@ -685,6 +688,10 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_PERSISTENCE_ENABLED; } + public static int getDefaultMaxRedeliveryRecords() { + return DEFAULT_MAX_REDELIVERY_RECORDS; + } + public static boolean isDefaultJournalDatasync() { return DEFAULT_JOURNAL_DATASYNC; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index 29dd21b2c5..44cab79f97 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -138,6 +138,13 @@ public interface Configuration { */ Configuration setPersistenceEnabled(boolean enable); + /** + * Maximum number of redelivery records stored on the journal per message reference. + */ + Configuration setMaxRedeliveryRecords(int maxPersistRedelivery); + + int getMaxRedeliveryRecords(); + /** * Should use fdatasync on journal files. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 6d2b006ce0..45a452cb4e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -148,6 +148,8 @@ public class ConfigurationImpl implements Configuration, Serializable { private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled(); + private int maxRedeliveryRecords = ActiveMQDefaultConfiguration.getDefaultMaxRedeliveryRecords(); + private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync(); protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod(); @@ -928,6 +930,17 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + @Override + public Configuration setMaxRedeliveryRecords(int max) { + maxRedeliveryRecords = max; + return this; + } + + @Override + public int getMaxRedeliveryRecords() { + return maxRedeliveryRecords; + } + @Override public boolean isJournalDatasync() { return journalDatasync; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 82e083cced..842b72681c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -397,6 +397,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { config.setPersistDeliveryCountBeforeDelivery(getBoolean(e, "persist-delivery-count-before-delivery", config.isPersistDeliveryCountBeforeDelivery())); + config.setMaxRedeliveryRecords(getInteger(e, "max-redelivery-records", config.getMaxRedeliveryRecords(), Validators.MINUS_ONE_OR_GE_ZERO)); + config.setScheduledThreadPoolMaxSize(getInteger(e, "scheduled-thread-pool-max-size", config.getScheduledThreadPoolMaxSize(), Validators.GT_ZERO)); config.setThreadPoolMaxSize(getInteger(e, "thread-pool-max-size", config.getThreadPoolMaxSize(), Validators.MINUS_ONE_OR_GT_ZERO)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 1d430d302f..91736ea5ed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -475,6 +475,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { + if (config.getMaxRedeliveryRecords() >= 0 && ref.getDeliveryCount() > config.getMaxRedeliveryRecords()) { + return; + } ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); try (ArtemisCloseable lock = closeableReadLock()) { messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, true, this::recordNotFoundCallback, getContext(syncNonTransactional)); @@ -706,6 +709,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp return; } + if (config.getMaxRedeliveryRecords() >= 0 && ref.getDeliveryCount() > config.getMaxRedeliveryRecords()) { + return; + } + ref.setPersistedCount(ref.getDeliveryCount()); DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java index d0559ec3ee..1f80f505b6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/ScheduledDeliveryEncoding.java @@ -24,7 +24,7 @@ public class ScheduledDeliveryEncoding extends QueueEncoding { @Override public String toString() { - return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "]"; + return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + ", queueID=" + queueID + "]"; } public ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) { diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 863be22f4c..538f6278cd 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -412,6 +412,17 @@ </xsd:annotation> </xsd:element> + <xsd:element name="max-redelivery-records" type="xsd:long" default="10" maxOccurs="1" + minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + The default for this value is 10, however the recommended set for this is 1. + The system will add a store update for every redelivery happening on the system. + It is recommended to keep max-redelivery-records=1 in situations where you are operating with very short redelivery delays as you will be creating unecessary records on the journal. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="populate-validated-user" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index f100623800..f092466589 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -535,6 +535,22 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { Assert.assertEquals(1000, configuration.getGlobalMaxMessages()); } + @Test + public void testConfigurationPersistRedelivery() throws Exception { + StringPrintStream stringPrintStream = new StringPrintStream(); + PrintStream stream = stringPrintStream.newStream(); + + stream.println("<configuration><core>"); + stream.println("<max-redelivery-records>0</max-redelivery-records>"); + stream.println("</core></configuration>"); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(stringPrintStream.getBytes()); + FileConfigurationParser parser = new FileConfigurationParser(); + Configuration configuration = parser.parseMainConfig(inputStream); + + Assert.assertEquals(0, configuration.getMaxRedeliveryRecords()); + } + @Test public void testExceptionMaxSize() throws Exception { StringPrintStream stringPrintStream = new StringPrintStream(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 736c55e496..a7d4ddb471 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -919,7 +919,7 @@ public abstract class ActiveMQTestBase extends Assert { return "memory:" + getTestDir(); } - private String getTestJDBCConnectionUrl() { + protected final String getTestJDBCConnectionUrl() { return System.getProperty("jdbc.connection.url", "jdbc:derby:" + getEmbeddedDataBaseName() + ";create=true"); } diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index e3980a8bce..6758183f63 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -77,6 +77,7 @@ <class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name> </remoting-outgoing-interceptors> <persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery> + <max-redelivery-records>7</max-redelivery-records> <connectors> <connector name="connector1"> tcp://localhost1:5678? diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 8f522934eb..55818d3940 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -210,6 +210,7 @@ name | node name; used in topology notifications if set. | n/a [read-whole-page](paging.md) | If true the whole page would be read, otherwise just seek and read while getting message. | `false` [paging-directory](paging.md#configuration)| the directory to store paged messages in. | `data/paging` [persist-delivery-count-before-delivery](undelivered-messages.md#delivery-count-persistence) | True means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled. | `false` +[max-redelivery-records](undelivered-messages.md#persist-redelivery) | Maximum number of records the system will store for redeliveries. In most cases this should be set to '1'. | `10` [persistence-enabled](persistence.md#zero-persistence)| true means that the server will use the file based journal for persistence. | `true` [persist-id-cache](duplicate-detection.md#configuring-the-duplicate-id-cache) | true means that ID's are persisted to the journal. | `true` queues | **deprecated** [use addresses](#address-type) | n/a diff --git a/docs/user-manual/en/undelivered-messages.md b/docs/user-manual/en/undelivered-messages.md index c24073ba9d..d081509835 100644 --- a/docs/user-manual/en/undelivered-messages.md +++ b/docs/user-manual/en/undelivered-messages.md @@ -31,6 +31,12 @@ fail or rollback. Without a delayed redelivery, the system can get into a and delivery being re-attempted ad infinitum in quick succession, consuming valuable CPU and network resources. +#Persist Redelivery + +Two Journal update records are stored every time a redelivery happens. One for the number of deliveries that happened, and one in case a scheduled redelivery is being used. + +It is recommended to keep max-redelivery-records=1 in situations where you are operating with very short redelivery delays as you will be creating unecessary records on the journal. + ### Configuring Delayed Redelivery Delayed redelivery is defined in the address-setting configuration: diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RescheduleJDBCDeliveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RescheduleJDBCDeliveryTest.java new file mode 100644 index 0000000000..f1d0156574 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RescheduleJDBCDeliveryTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.lang.invoke.MethodHandles; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.cli.commands.tools.PrintData; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RescheduleJDBCDeliveryTest extends ActiveMQTestBase { + + // Set this to true if you're debugging what happened in the journal + private final boolean PRINT_DATA = false; + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + + @Test + public void testRescheduledRedeliveryCORE() throws Exception { + testRescheduledRedelivery("CORE", 0); + } + + @Test + public void testRescheduledRedeliveryCORE_1() throws Exception { + testRescheduledRedelivery("CORE", 1); + } + + @Test + public void testRescheduledRedeliveryAMQP_1() throws Exception { + testRescheduledRedelivery("AMQP", 1); + } + + @Test + public void testRescheduledRedeliveryAMQP() throws Exception { + testRescheduledRedelivery("AMQP", 0); + } + + @Test + public void testRescheduledRedeliveryCOREInfinite() throws Exception { + testRescheduledRedelivery("CORE", -1); + } + + @Test + public void testRescheduledRedeliveryAMQPInfinite() throws Exception { + testRescheduledRedelivery("AMQP", -1); + } + + private void testRescheduledRedelivery(String protocol, int maxRecords) throws Exception { + int maxRedeliveries = 100; + String testQueue = getName(); + Configuration configuration = createDefaultJDBCConfig(true); + configuration.setMaxRedeliveryRecords(maxRecords); + configuration.addAddressSetting("#", new AddressSettings().setRedeliveryDelay(1).setMaxDeliveryAttempts(-1).setDeadLetterAddress(SimpleString.toSimpleString("DLQ"))); + configuration.addAddressConfiguration(new CoreAddressConfiguration().setName("DLQ").addRoutingType(RoutingType.ANYCAST)); + configuration.addQueueConfiguration(new QueueConfiguration("DLQ").setAddress("DLQ").setRoutingType(RoutingType.ANYCAST)); + configuration.addAddressConfiguration(new CoreAddressConfiguration().setName(testQueue).addRoutingType(RoutingType.ANYCAST)); + configuration.addQueueConfiguration(new QueueConfiguration(testQueue).setAddress(testQueue).setRoutingType(RoutingType.ANYCAST)); + ActiveMQServer server = createServer(true, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES); + server.start(); + + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(testQueue); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello")); + session.commit(); + + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + + for (int i = 0; i < maxRedeliveries; i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + logger.debug("received {}", message); + Assert.assertNotNull(message); + session.rollback(); + } + } + + server.stop(); + + java.sql.Connection jdbcConnection = DriverManager.getConnection(getTestJDBCConnectionUrl()); + runAfter(jdbcConnection::close); + + int records = executeQuery(jdbcConnection, "SELECT * FROM MESSAGE WHERE USERRECORDTYPE=36 OR USERRECORDTYPE=34"); + + // manually set this value to true if you need to understand what's in the journal + if (PRINT_DATA) { + PrintData printData = new PrintData(); + printData.printDataJDBC(configuration, System.out); + } + + if (maxRecords < 0) { + Assert.assertEquals(maxRedeliveries * 2, records); + } else { + + Assert.assertEquals(maxRecords * 2, records); + } + + server.start(); + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(testQueue); + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + + TextMessage message = (TextMessage) consumer.receive(5000); + logger.debug("received {}", message); + Assert.assertNotNull(message); + session.commit(); + Assert.assertNull(consumer.receiveNoWait()); + } + } + + protected int executeQuery(java.sql.Connection connection, String sql) throws Exception { + PreparedStatement statement = connection.prepareStatement(sql); + ResultSet result = statement.executeQuery(); + ResultSetMetaData metaData = result.getMetaData(); + int columnCount = metaData.getColumnCount(); + int records = 0; + + while (result.next()) { + if (logger.isDebugEnabled()) { + StringBuffer line = new StringBuffer(); + for (int i = 1; i <= columnCount; i++) { + Object value = result.getObject(i); + line.append(metaData.getColumnLabel(i) + " = " + value); + if (i + 1 <= columnCount) + line.append(", "); + } + logger.info(line.toString()); + } + records++; + } + + return records; + + } +}