This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 27f76f569c ARTEMIS-5053 support commit interval for scale-down
27f76f569c is described below
commit 27f76f569c4bbc54f3688ef73f61f903fe3ae62d
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Mar 3 12:04:43 2025 -0600
ARTEMIS-5053 support commit interval for scale-down
This commit adds support for configuring how often the transactions
governing scale-down are committed. It also updates HAPolicy with
default implementations which helps clarify which policies really care
about scale-down.
---
.../api/config/ActiveMQDefaultConfiguration.java | 10 ++
.../artemis/core/config/ConfigurationUtils.java | 4 +-
.../core/config/ScaleDownConfiguration.java | 11 +++
.../deployers/impl/FileConfigurationParser.java | 2 +
.../core/server/cluster/ha/BackupPolicy.java | 5 -
.../core/server/cluster/ha/ColocatedPolicy.java | 10 --
.../artemis/core/server/cluster/ha/HAPolicy.java | 12 ++-
.../core/server/cluster/ha/PrimaryOnlyPolicy.java | 5 +
.../core/server/cluster/ha/ReplicatedPolicy.java | 10 --
.../server/cluster/ha/ReplicationBackupPolicy.java | 10 --
.../cluster/ha/ReplicationPrimaryPolicy.java | 10 --
.../core/server/cluster/ha/ScaleDownPolicy.java | 16 +++-
.../cluster/ha/SharedStorePrimaryPolicy.java | 10 --
.../server/impl/BackupRecoveryJournalLoader.java | 2 +-
.../core/server/impl/PrimaryOnlyActivation.java | 6 +-
.../artemis/core/server/impl/ScaleDownHandler.java | 13 ++-
.../resources/schema/artemis-configuration.xsd | 8 ++
.../core/config/impl/ConfigurationImplTest.java | 2 +
.../config/impl/FileConfigurationParserTest.java | 2 +
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../ConfigurationTest-xinclude-config.xml | 1 +
...rationTest-xinclude-schema-config-ha-policy.xml | 1 +
docs/user-manual/ha.adoc | 13 +++
.../server/ScaleDownCommitIntervalTest.java | 103 +++++++++++++++++++++
.../integration/server/ScaleDownDirectTest.java | 2 +-
26 files changed, 203 insertions(+), 67 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 18a76a0e19..2876929513 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -440,6 +440,9 @@ public final class ActiveMQDefaultConfiguration {
// its possible that you only want a server to partake in scale down as a
receiver, via a group. In this case set scale-down to false
private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
+ // How often to commit transactions for moving messages during scale-down
+ private static int DEFAULT_SCALE_DOWN_COMMIT_INTERVAL = -1;
+
// How long to wait for a decision
private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;
@@ -1471,6 +1474,13 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_SCALE_DOWN_ENABLED;
}
+ /**
+ * How often to commit transactions for moving messages during scale-down
+ */
+ public static int getDefaultScaleDownCommitInterval() {
+ return DEFAULT_SCALE_DOWN_COMMIT_INTERVAL;
+ }
+
/**
* How long to wait for a decision
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
index a8ff38aa20..8b9494b8c9 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
@@ -135,9 +135,9 @@ public final class ConfigurationUtils {
public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration
scaleDownConfiguration) {
if (scaleDownConfiguration != null) {
if (scaleDownConfiguration.getDiscoveryGroup() != null) {
- return new
ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(),
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(),
scaleDownConfiguration.isEnabled());
+ return new
ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(),
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(),
scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval());
} else {
- return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(),
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(),
scaleDownConfiguration.isEnabled());
+ return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(),
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(),
scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval());
}
}
return null;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
index 5f58e36bdc..848c54ae1d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
@@ -34,6 +34,8 @@ public class ScaleDownConfiguration implements Serializable {
private boolean enabled =
ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();
+ private int commitInterval =
ActiveMQDefaultConfiguration.getDefaultScaleDownCommitInterval();
+
public List<String> getConnectors() {
return connectors;
}
@@ -83,4 +85,13 @@ public class ScaleDownConfiguration implements Serializable {
this.enabled = enabled;
return this;
}
+
+ public int getCommitInterval() {
+ return commitInterval;
+ }
+
+ public ScaleDownConfiguration setCommitInterval(int commitInterval) {
+ this.commitInterval = commitInterval;
+ return this;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 5e5a0dfa51..48c04105ae 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1952,6 +1952,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement,
"enabled", scaleDownConfiguration.isEnabled()));
+ scaleDownConfiguration.setCommitInterval(getInteger(scaleDownElement,
"commit-interval", scaleDownConfiguration.getCommitInterval(),
MINUS_ONE_OR_GT_ZERO));
+
NodeList discoveryGroupRef =
scaleDownElement.getElementsByTagName("discovery-group-ref");
if (discoveryGroupRef.item(0) != null) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
index 51f5c050fb..3acf7eb574 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
@@ -37,11 +37,6 @@ public abstract class BackupPolicy implements
HAPolicy<Activation> {
return true;
}
- @Override
- public String getScaleDownClustername() {
- return null;
- }
-
@Override
public String getScaleDownGroupName() {
return getScaleDownPolicy() != null ?
getScaleDownPolicy().getGroupName() : null;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
index b4859e4816..8fff208b58 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
@@ -73,11 +73,6 @@ public class ColocatedPolicy implements
HAPolicy<PrimaryActivation> {
return primaryPolicy.getBackupGroupName();
}
- @Override
- public String getScaleDownGroupName() {
- return null;
- }
-
@Override
public boolean isSharedStore() {
return backupPolicy.isSharedStore();
@@ -101,11 +96,6 @@ public class ColocatedPolicy implements
HAPolicy<PrimaryActivation> {
return false;
}
- @Override
- public String getScaleDownClustername() {
- return null;
- }
-
public boolean isRequestBackup() {
return requestBackup;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
index bc7a6e1462..f98337f73f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
@@ -52,9 +52,17 @@ public interface HAPolicy<T extends Activation> {
String getBackupGroupName();
- String getScaleDownGroupName();
+ default String getScaleDownGroupName() {
+ return null;
+ }
+
+ default String getScaleDownClustername() {
+ return null;
+ }
- String getScaleDownClustername();
+ default int getScaleDownCommitInterval() {
+ return -1;
+ }
default boolean useQuorumManager() {
return true;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
index 9fab18f3ab..a98989d043 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
@@ -57,6 +57,11 @@ public class PrimaryOnlyPolicy implements
HAPolicy<Activation> {
return null;
}
+ @Override
+ public int getScaleDownCommitInterval() {
+ return scaleDownPolicy == null ? -1 :
scaleDownPolicy.getCommitInterval();
+ }
+
@Override
public boolean isSharedStore() {
return false;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
index 6f83d2a8e8..1f21bd7a70 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
@@ -187,11 +187,6 @@ public class ReplicatedPolicy implements
HAPolicy<PrimaryActivation> {
return groupName;
}
- @Override
- public String getScaleDownGroupName() {
- return null;
- }
-
public void setGroupName(String groupName) {
this.groupName = groupName;
}
@@ -211,11 +206,6 @@ public class ReplicatedPolicy implements
HAPolicy<PrimaryActivation> {
return false;
}
- @Override
- public String getScaleDownClustername() {
- return null;
- }
-
public void setAllowAutoFailBack(boolean allowAutoFailBack) {
this.allowAutoFailBack = allowAutoFailBack;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
index 50080d2e09..f548440847 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
@@ -117,16 +117,6 @@ public class ReplicationBackupPolicy implements
HAPolicy<ReplicationBackupActiva
return false;
}
- @Override
- public String getScaleDownGroupName() {
- return null;
- }
-
- @Override
- public String getScaleDownClustername() {
- return null;
- }
-
public String getClusterName() {
return clusterName;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
index 8d7af9cf51..6a611cd3c1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
@@ -128,16 +128,6 @@ public class ReplicationPrimaryPolicy implements
HAPolicy<ReplicationPrimaryActi
return groupName;
}
- @Override
- public String getScaleDownGroupName() {
- return null;
- }
-
- @Override
- public String getScaleDownClustername() {
- return null;
- }
-
public boolean isAllowAutoFailBack() {
return allowAutoFailBack;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
index 0ef96d5322..e50a8c3248 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
@@ -41,21 +41,25 @@ public class ScaleDownPolicy {
private boolean enabled;
+ private int commitInterval;
+
public ScaleDownPolicy() {
}
- public ScaleDownPolicy(List<String> connectors, String groupName, String
clusterName, boolean enabled) {
+ public ScaleDownPolicy(List<String> connectors, String groupName, String
clusterName, boolean enabled, int commitInterval) {
this.connectors = connectors;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
+ this.commitInterval = commitInterval;
}
- public ScaleDownPolicy(String discoveryGroup, String groupName, String
clusterName, boolean enabled) {
+ public ScaleDownPolicy(String discoveryGroup, String groupName, String
clusterName, boolean enabled, int commitInterval) {
this.discoveryGroup = discoveryGroup;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
+ this.commitInterval = commitInterval;
}
public List<String> getConnectors() {
@@ -98,6 +102,14 @@ public class ScaleDownPolicy {
this.enabled = enabled;
}
+ public int getCommitInterval() {
+ return commitInterval;
+ }
+
+ public void setCommitInterval(int commitInterval) {
+ this.commitInterval = commitInterval;
+ }
+
public static ServerLocatorInternal getScaleDownConnector(ScaleDownPolicy
scaleDownPolicy,
ActiveMQServer
activeMQServer) throws ActiveMQException {
if (!scaleDownPolicy.getConnectors().isEmpty()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
index 0b5eecceaf..b5cdf2e9af 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
@@ -100,14 +100,4 @@ public class SharedStorePrimaryPolicy implements
HAPolicy<PrimaryActivation> {
public String getBackupGroupName() {
return null;
}
-
- @Override
- public String getScaleDownGroupName() {
- return null;
- }
-
- @Override
- public String getScaleDownClustername() {
- return null;
- }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
index 86476a241f..4a39855aed 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
@@ -92,7 +92,7 @@ public class BackupRecoveryJournalLoader extends
PostOfficeJournalLoader {
public void postLoad(Journal messageJournal,
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>>
duplicateIDMap) throws Exception {
- ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager,
postOffice, nodeManager, clusterController, parentServer != null ?
parentServer.getStorageManager() : storageManager);
+ ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager,
postOffice, nodeManager, clusterController, parentServer != null ?
parentServer.getStorageManager() : storageManager, parentServer != null ?
parentServer.getHAPolicy().getScaleDownCommitInterval() : -1);
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator,
storageManager));
try (ClientSessionFactory sessionFactory =
locator.createSessionFactory()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
index 903a1ec919..0606f60a71 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
@@ -127,7 +127,7 @@ public class PrimaryOnlyActivation extends Activation {
public void postConnectionFreeze() {
if (primaryOnlyPolicy.getScaleDownPolicy() != null &&
primaryOnlyPolicy.getScaleDownPolicy().isEnabled() &&
scaleDownClientSessionFactory != null) {
try {
- scaleDown();
+
scaleDown(primaryOnlyPolicy.getScaleDownPolicy().getCommitInterval());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToScaleDown(e);
} finally {
@@ -190,8 +190,8 @@ public class PrimaryOnlyActivation extends Activation {
}
}
- public long scaleDown() throws Exception {
- ScaleDownHandler scaleDownHandler = new
ScaleDownHandler(activeMQServer.getPagingManager(),
activeMQServer.getPostOffice(), activeMQServer.getNodeManager(),
activeMQServer.getClusterManager().getClusterController(),
activeMQServer.getStorageManager());
+ public long scaleDown(int commitInterval) throws Exception {
+ ScaleDownHandler scaleDownHandler = new
ScaleDownHandler(activeMQServer.getPagingManager(),
activeMQServer.getPostOffice(), activeMQServer.getNodeManager(),
activeMQServer.getClusterManager().getClusterController(),
activeMQServer.getStorageManager(), commitInterval);
ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches =
((PostOfficeImpl) activeMQServer.getPostOffice()).getDuplicateIDCaches();
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new
HashMap<>();
for (SimpleString address : duplicateIDCaches.keySet()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index 2ffe947afb..c45a8ae949 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -76,18 +76,21 @@ public class ScaleDownHandler {
private NodeManager nodeManager;
private final ClusterController clusterController;
private final StorageManager storageManager;
+ private final int commitInterval;
private String targetNodeId;
public ScaleDownHandler(PagingManager pagingManager,
PostOffice postOffice,
NodeManager nodeManager,
ClusterController clusterController,
- StorageManager storageManager) {
+ StorageManager storageManager,
+ int commitInterval) {
this.pagingManager = pagingManager;
this.postOffice = postOffice;
this.nodeManager = nodeManager;
this.clusterController = clusterController;
this.storageManager = storageManager;
+ this.commitInterval = commitInterval;
}
public long scaleDown(ClientSessionFactory sessionFactory,
@@ -214,6 +217,10 @@ public class ScaleDownHandler {
producer.send(address, message);
messageCount++;
+ if (commitInterval > 0 && messageCount % commitInterval ==
0) {
+ tx.commit();
+ tx = new TransactionImpl(storageManager);
+ }
messagesIterator.remove();
@@ -307,6 +314,10 @@ public class ScaleDownHandler {
producer.send(message.getAddress(), message);
messageCount++;
+ if (commitInterval > 0 && messageCount % commitInterval == 0) {
+ tx.commit();
+ tx = new TransactionImpl(storageManager);
+ }
messagesIterator.remove();
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index db1e93eba0..4137a140f1 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3714,6 +3714,14 @@
</xsd:complexType>
</xsd:element>
</xsd:choice>
+ <xsd:element name="commit-interval" type="xsd:int" default="-1"
maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ How often to commit when scaling messages down from one
broker to another.
+ -1 means commit only after processing all the messages from
a queue.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
</xsd:sequence>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 6b9fef9cec..9e019323ae 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -953,6 +953,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
properties.put("HAPolicyConfiguration.scaleDownConfiguration.groupName",
"g0");
properties.put("HAPolicyConfiguration.scaleDownConfiguration.clusterName",
"c0");
properties.put("HAPolicyConfiguration.scaleDownConfiguration.enabled",
"false");
+
properties.put("HAPolicyConfiguration.scaleDownConfiguration.commitInterval",
"73");
}
private void checkScaleDownConfiguration(ScaleDownConfiguration
scaleDownConfiguration) {
@@ -962,6 +963,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
assertEquals("g0", scaleDownConfiguration.getGroupName());
assertEquals("c0", scaleDownConfiguration.getClusterName());
assertFalse(scaleDownConfiguration.isEnabled());
+ assertEquals(73, scaleDownConfiguration.getCommitInterval());
}
@Test
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 2003207dc8..09bdc01aae 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -491,6 +491,7 @@ public class FileConfigurationParserTest extends
ServerTestBase {
<scale-down>
<connectors>
<connector-ref>server0-connector</connector-ref>
+ <commit-interval>33</commit-interval>
</connectors>
</scale-down>
</live-only>
@@ -509,6 +510,7 @@ public class FileConfigurationParserTest extends
ServerTestBase {
assertEquals(1, connectors.size());
String connector = connectors.get(0);
assertEquals("server0-connector", connector);
+ assertEquals(33, scaledownCfg.getCommitInterval());
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 5f82220ad6..c8ff820215 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -451,6 +451,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertNotNull(lopc.getScaleDownConfiguration());
assertEquals("boo!", lopc.getScaleDownConfiguration().getGroupName());
assertEquals("dg1",
lopc.getScaleDownConfiguration().getDiscoveryGroup());
+ assertEquals(33, lopc.getScaleDownConfiguration().getCommitInterval());
for (ClusterConnectionConfiguration ccc :
conf.getClusterConfigurations()) {
if (ccc.getName().equals("cluster-connection3")) {
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index ad9e0b1a29..543b2e96ef 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -413,6 +413,7 @@
<group-name>boo!</group-name>
<!--either a discovery group-->
<discovery-group-ref discovery-group-name="dg1"/>
+ <commit-interval>33</commit-interval>
</scale-down>
</primary-only>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 99fac28fca..40977255a2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -301,6 +301,7 @@
<group-name>boo!</group-name>
<!--either a discovery group-->
<discovery-group-ref discovery-group-name="dg1"/>
+ <commit-interval>33</commit-interval>
</scale-down>
</primary-only>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
index 5913f09197..5d8ae2b1de 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
@@ -23,6 +23,7 @@
<group-name>boo!</group-name>
<!--either a discovery group-->
<discovery-group-ref discovery-group-name="dg1"/>
+ <commit-interval>33</commit-interval>
</scale-down>
</primary-only>
diff --git a/docs/user-manual/ha.adoc b/docs/user-manual/ha.adoc
index 77d15c5927..7cae149fe8 100644
--- a/docs/user-manual/ha.adoc
+++ b/docs/user-manual/ha.adoc
@@ -836,6 +836,19 @@ It is also possible to use discovery to scale down, this
would look like:
</ha-policy>
----
+[NOTE]
+====
+Moving messages from one broker to another during scale-down involves an
internal transaction.
+By default this transaction is only committed once per queue.
+However, as the number of messages in the queue grows so does the memory
requirements for the transaction.
+At some point the memory requirements for the transaction will exceed the
limits of the available heap.
+
+In order to deal with this you can configure the `commit-interval` in the
`scale-down` element.
+This will allow the transaction to be committed every so often which will free
the memory from the transaction.
+It must be greater than `0` or `-1`.
+It is `-1` by default (i.e. don't commit until all the messages in the queue
are scaled-down).
+====
+
=== Scale Down with groups
It is also possible to configure servers to only scale down to servers that
belong in the same group.
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java
new file mode 100644
index 0000000000..212ae65634
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.server.impl.ScaleDownHandler;
+import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ScaleDownCommitIntervalTest extends ClusterTestBase {
+ final int TEST_SIZE = 1000;
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ setupPrimaryServer(0, isFileStorage(), true, true);
+ setupPrimaryServer(1, isFileStorage(), true, true);
+ startServers(0, 1);
+ setupSessionFactory(0, true);
+ setupSessionFactory(1, true);
+ }
+
+ @Test
+ public void testSmallCommitInterval() throws Exception {
+ testCommitInterval(1);
+ }
+
+ @Test
+ public void testMediumCommitInterval() throws Exception {
+ testCommitInterval((int) (TEST_SIZE * 0.33));
+ }
+
+ @Test
+ public void testLargeCommitInterval() throws Exception {
+ testCommitInterval((int) (TEST_SIZE * 0.66));
+ }
+
+ @Test
+ public void testMaxCommitInterval() throws Exception {
+ testCommitInterval(-1);
+ }
+
+ private void testCommitInterval(int commitInterval) throws Exception {
+ final String addressName = "testAddress";
+ final String queueName1 = "testQueue1";
+ final String queueName2 = "testQueue2";
+
+ // create 2 queues on each node mapped to the same address
+ createQueue(0, addressName, queueName1, null, true);
+ createQueue(0, addressName, queueName2, null, true);
+ createQueue(1, addressName, queueName1, null, true);
+ createQueue(1, addressName, queueName2, null, true);
+
+ // send messages to node 0
+ send(0, addressName, TEST_SIZE, true, null);
+
+ // consume a message from queue 2
+ addConsumer(1, 0, queueName2, null, false);
+ ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
+ assertNotNull(clientMessage);
+ clientMessage.acknowledge();
+ consumers[1].getSession().commit();
+ removeConsumer(1);
+
+ Wait.assertEquals((long) TEST_SIZE, () ->
servers[0].locateQueue(queueName1).getMessageCount(), 500, 20);
+ Wait.assertEquals((long) TEST_SIZE - 1, () ->
servers[0].locateQueue(queueName2).getMessageCount(), 500, 20);
+
+ assertEquals((long) TEST_SIZE, performScaledown(commitInterval));
+
+ // trigger scaleDown from node 0 to node 1
+ servers[0].stop();
+
+ Wait.assertEquals((long) TEST_SIZE, () ->
servers[1].locateQueue(queueName1).getMessageCount(), 500, 20);
+ Wait.assertEquals((long) TEST_SIZE - 1, () ->
servers[1].locateQueue(queueName2).getMessageCount(), 500, 20);
+ }
+
+ private long performScaledown(int commitInterval) throws Exception {
+ ScaleDownHandler handler = new
ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(),
servers[0].getNodeManager(),
servers[0].getClusterManager().getClusterController(),
servers[0].getStorageManager(), commitInterval);
+
+ return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(),
servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
index ec81070ad0..000726b001 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
@@ -374,7 +374,7 @@ public class ScaleDownDirectTest extends ClusterTestBase {
}
private long performScaledown() throws Exception {
- ScaleDownHandler handler = new
ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(),
servers[0].getNodeManager(),
servers[0].getClusterManager().getClusterController(),
servers[0].getStorageManager());
+ ScaleDownHandler handler = new
ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(),
servers[0].getNodeManager(),
servers[0].getClusterManager().getClusterController(),
servers[0].getStorageManager(), -1);
return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(),
servers[0].getConfiguration().getClusterUser(),
servers[0].getConfiguration().getClusterPassword());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact