This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 27f76f569c ARTEMIS-5053 support commit interval for scale-down
27f76f569c is described below

commit 27f76f569c4bbc54f3688ef73f61f903fe3ae62d
Author: Justin Bertram <[email protected]>
AuthorDate: Mon Mar 3 12:04:43 2025 -0600

    ARTEMIS-5053 support commit interval for scale-down
    
    This commit adds support for configuring how often the transactions
    governing scale-down are committed. It also updates HAPolicy with
    default implementations which helps clarify which policies really care
    about scale-down.
---
 .../api/config/ActiveMQDefaultConfiguration.java   |  10 ++
 .../artemis/core/config/ConfigurationUtils.java    |   4 +-
 .../core/config/ScaleDownConfiguration.java        |  11 +++
 .../deployers/impl/FileConfigurationParser.java    |   2 +
 .../core/server/cluster/ha/BackupPolicy.java       |   5 -
 .../core/server/cluster/ha/ColocatedPolicy.java    |  10 --
 .../artemis/core/server/cluster/ha/HAPolicy.java   |  12 ++-
 .../core/server/cluster/ha/PrimaryOnlyPolicy.java  |   5 +
 .../core/server/cluster/ha/ReplicatedPolicy.java   |  10 --
 .../server/cluster/ha/ReplicationBackupPolicy.java |  10 --
 .../cluster/ha/ReplicationPrimaryPolicy.java       |  10 --
 .../core/server/cluster/ha/ScaleDownPolicy.java    |  16 +++-
 .../cluster/ha/SharedStorePrimaryPolicy.java       |  10 --
 .../server/impl/BackupRecoveryJournalLoader.java   |   2 +-
 .../core/server/impl/PrimaryOnlyActivation.java    |   6 +-
 .../artemis/core/server/impl/ScaleDownHandler.java |  13 ++-
 .../resources/schema/artemis-configuration.xsd     |   8 ++
 .../core/config/impl/ConfigurationImplTest.java    |   2 +
 .../config/impl/FileConfigurationParserTest.java   |   2 +
 .../core/config/impl/FileConfigurationTest.java    |   1 +
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 .../ConfigurationTest-xinclude-config.xml          |   1 +
 ...rationTest-xinclude-schema-config-ha-policy.xml |   1 +
 docs/user-manual/ha.adoc                           |  13 +++
 .../server/ScaleDownCommitIntervalTest.java        | 103 +++++++++++++++++++++
 .../integration/server/ScaleDownDirectTest.java    |   2 +-
 26 files changed, 203 insertions(+), 67 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 18a76a0e19..2876929513 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -440,6 +440,9 @@ public final class ActiveMQDefaultConfiguration {
    // its possible that you only want a server to partake in scale down as a 
receiver, via a group. In this case set scale-down to false
    private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
 
+   // How often to commit transactions for moving messages during scale-down
+   private static int DEFAULT_SCALE_DOWN_COMMIT_INTERVAL = -1;
+
    // How long to wait for a decision
    private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;
 
@@ -1471,6 +1474,13 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_SCALE_DOWN_ENABLED;
    }
 
+   /**
+    * How often to commit transactions for moving messages during scale-down
+    */
+   public static int getDefaultScaleDownCommitInterval() {
+      return DEFAULT_SCALE_DOWN_COMMIT_INTERVAL;
+   }
+
    /**
     * How long to wait for a decision
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
index a8ff38aa20..8b9494b8c9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
@@ -135,9 +135,9 @@ public final class ConfigurationUtils {
    public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration 
scaleDownConfiguration) {
       if (scaleDownConfiguration != null) {
          if (scaleDownConfiguration.getDiscoveryGroup() != null) {
-            return new 
ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled());
+            return new 
ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval());
          } else {
-            return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled());
+            return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval());
          }
       }
       return null;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
index 5f58e36bdc..848c54ae1d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
@@ -34,6 +34,8 @@ public class ScaleDownConfiguration implements Serializable {
 
    private boolean enabled = 
ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();
 
+   private int commitInterval = 
ActiveMQDefaultConfiguration.getDefaultScaleDownCommitInterval();
+
    public List<String> getConnectors() {
       return connectors;
    }
@@ -83,4 +85,13 @@ public class ScaleDownConfiguration implements Serializable {
       this.enabled = enabled;
       return this;
    }
+
+   public int getCommitInterval() {
+      return commitInterval;
+   }
+
+   public ScaleDownConfiguration setCommitInterval(int commitInterval) {
+      this.commitInterval = commitInterval;
+      return this;
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 5e5a0dfa51..48c04105ae 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1952,6 +1952,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
          scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, 
"enabled", scaleDownConfiguration.isEnabled()));
 
+         scaleDownConfiguration.setCommitInterval(getInteger(scaleDownElement, 
"commit-interval", scaleDownConfiguration.getCommitInterval(), 
MINUS_ONE_OR_GT_ZERO));
+
          NodeList discoveryGroupRef = 
scaleDownElement.getElementsByTagName("discovery-group-ref");
 
          if (discoveryGroupRef.item(0) != null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
index 51f5c050fb..3acf7eb574 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java
@@ -37,11 +37,6 @@ public abstract class BackupPolicy implements 
HAPolicy<Activation> {
       return true;
    }
 
-   @Override
-   public String getScaleDownClustername() {
-      return null;
-   }
-
    @Override
    public String getScaleDownGroupName() {
       return getScaleDownPolicy() != null ? 
getScaleDownPolicy().getGroupName() : null;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
index b4859e4816..8fff208b58 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java
@@ -73,11 +73,6 @@ public class ColocatedPolicy implements 
HAPolicy<PrimaryActivation> {
       return primaryPolicy.getBackupGroupName();
    }
 
-   @Override
-   public String getScaleDownGroupName() {
-      return null;
-   }
-
    @Override
    public boolean isSharedStore() {
       return backupPolicy.isSharedStore();
@@ -101,11 +96,6 @@ public class ColocatedPolicy implements 
HAPolicy<PrimaryActivation> {
       return false;
    }
 
-   @Override
-   public String getScaleDownClustername() {
-      return null;
-   }
-
    public boolean isRequestBackup() {
       return requestBackup;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
index bc7a6e1462..f98337f73f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java
@@ -52,9 +52,17 @@ public interface HAPolicy<T extends Activation> {
 
    String getBackupGroupName();
 
-   String getScaleDownGroupName();
+   default String getScaleDownGroupName() {
+      return null;
+   }
+
+   default String getScaleDownClustername() {
+      return null;
+   }
 
-   String getScaleDownClustername();
+   default int getScaleDownCommitInterval() {
+      return -1;
+   }
 
    default boolean useQuorumManager() {
       return true;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
index 9fab18f3ab..a98989d043 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java
@@ -57,6 +57,11 @@ public class PrimaryOnlyPolicy implements 
HAPolicy<Activation> {
       return null;
    }
 
+   @Override
+   public int getScaleDownCommitInterval() {
+      return scaleDownPolicy == null ? -1 : 
scaleDownPolicy.getCommitInterval();
+   }
+
    @Override
    public boolean isSharedStore() {
       return false;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
index 6f83d2a8e8..1f21bd7a70 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java
@@ -187,11 +187,6 @@ public class ReplicatedPolicy implements 
HAPolicy<PrimaryActivation> {
       return groupName;
    }
 
-   @Override
-   public String getScaleDownGroupName() {
-      return null;
-   }
-
    public void setGroupName(String groupName) {
       this.groupName = groupName;
    }
@@ -211,11 +206,6 @@ public class ReplicatedPolicy implements 
HAPolicy<PrimaryActivation> {
       return false;
    }
 
-   @Override
-   public String getScaleDownClustername() {
-      return null;
-   }
-
    public void setAllowAutoFailBack(boolean allowAutoFailBack) {
       this.allowAutoFailBack = allowAutoFailBack;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
index 50080d2e09..f548440847 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java
@@ -117,16 +117,6 @@ public class ReplicationBackupPolicy implements 
HAPolicy<ReplicationBackupActiva
       return false;
    }
 
-   @Override
-   public String getScaleDownGroupName() {
-      return null;
-   }
-
-   @Override
-   public String getScaleDownClustername() {
-      return null;
-   }
-
    public String getClusterName() {
       return clusterName;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
index 8d7af9cf51..6a611cd3c1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java
@@ -128,16 +128,6 @@ public class ReplicationPrimaryPolicy implements 
HAPolicy<ReplicationPrimaryActi
       return groupName;
    }
 
-   @Override
-   public String getScaleDownGroupName() {
-      return null;
-   }
-
-   @Override
-   public String getScaleDownClustername() {
-      return null;
-   }
-
    public boolean isAllowAutoFailBack() {
       return allowAutoFailBack;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
index 0ef96d5322..e50a8c3248 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
@@ -41,21 +41,25 @@ public class ScaleDownPolicy {
 
    private boolean enabled;
 
+   private int commitInterval;
+
    public ScaleDownPolicy() {
    }
 
-   public ScaleDownPolicy(List<String> connectors, String groupName, String 
clusterName, boolean enabled) {
+   public ScaleDownPolicy(List<String> connectors, String groupName, String 
clusterName, boolean enabled, int commitInterval) {
       this.connectors = connectors;
       this.groupName = groupName;
       this.clusterName = clusterName;
       this.enabled = enabled;
+      this.commitInterval = commitInterval;
    }
 
-   public ScaleDownPolicy(String discoveryGroup, String groupName, String 
clusterName, boolean enabled) {
+   public ScaleDownPolicy(String discoveryGroup, String groupName, String 
clusterName, boolean enabled, int commitInterval) {
       this.discoveryGroup = discoveryGroup;
       this.groupName = groupName;
       this.clusterName = clusterName;
       this.enabled = enabled;
+      this.commitInterval = commitInterval;
    }
 
    public List<String> getConnectors() {
@@ -98,6 +102,14 @@ public class ScaleDownPolicy {
       this.enabled = enabled;
    }
 
+   public int getCommitInterval() {
+      return commitInterval;
+   }
+
+   public void setCommitInterval(int commitInterval) {
+      this.commitInterval = commitInterval;
+   }
+
    public static ServerLocatorInternal getScaleDownConnector(ScaleDownPolicy 
scaleDownPolicy,
                                                              ActiveMQServer 
activeMQServer) throws ActiveMQException {
       if (!scaleDownPolicy.getConnectors().isEmpty()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
index 0b5eecceaf..b5cdf2e9af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java
@@ -100,14 +100,4 @@ public class SharedStorePrimaryPolicy implements 
HAPolicy<PrimaryActivation> {
    public String getBackupGroupName() {
       return null;
    }
-
-   @Override
-   public String getScaleDownGroupName() {
-      return null;
-   }
-
-   @Override
-   public String getScaleDownClustername() {
-      return null;
-   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
index 86476a241f..4a39855aed 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
@@ -92,7 +92,7 @@ public class BackupRecoveryJournalLoader extends 
PostOfficeJournalLoader {
    public void postLoad(Journal messageJournal,
                         ResourceManager resourceManager,
                         Map<SimpleString, List<Pair<byte[], Long>>> 
duplicateIDMap) throws Exception {
-      ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, 
postOffice, nodeManager, clusterController, parentServer != null ? 
parentServer.getStorageManager() : storageManager);
+      ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, 
postOffice, nodeManager, clusterController, parentServer != null ? 
parentServer.getStorageManager() : storageManager, parentServer != null ? 
parentServer.getHAPolicy().getScaleDownCommitInterval() : -1);
       
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator,
 storageManager));
 
       try (ClientSessionFactory sessionFactory = 
locator.createSessionFactory()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
index 903a1ec919..0606f60a71 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java
@@ -127,7 +127,7 @@ public class PrimaryOnlyActivation extends Activation {
    public void postConnectionFreeze() {
       if (primaryOnlyPolicy.getScaleDownPolicy() != null && 
primaryOnlyPolicy.getScaleDownPolicy().isEnabled() && 
scaleDownClientSessionFactory != null) {
          try {
-            scaleDown();
+            
scaleDown(primaryOnlyPolicy.getScaleDownPolicy().getCommitInterval());
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.failedToScaleDown(e);
          } finally {
@@ -190,8 +190,8 @@ public class PrimaryOnlyActivation extends Activation {
       }
    }
 
-   public long scaleDown() throws Exception {
-      ScaleDownHandler scaleDownHandler = new 
ScaleDownHandler(activeMQServer.getPagingManager(), 
activeMQServer.getPostOffice(), activeMQServer.getNodeManager(), 
activeMQServer.getClusterManager().getClusterController(), 
activeMQServer.getStorageManager());
+   public long scaleDown(int commitInterval) throws Exception {
+      ScaleDownHandler scaleDownHandler = new 
ScaleDownHandler(activeMQServer.getPagingManager(), 
activeMQServer.getPostOffice(), activeMQServer.getNodeManager(), 
activeMQServer.getClusterManager().getClusterController(), 
activeMQServer.getStorageManager(), commitInterval);
       ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = 
((PostOfficeImpl) activeMQServer.getPostOffice()).getDuplicateIDCaches();
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new 
HashMap<>();
       for (SimpleString address : duplicateIDCaches.keySet()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index 2ffe947afb..c45a8ae949 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -76,18 +76,21 @@ public class ScaleDownHandler {
    private NodeManager nodeManager;
    private final ClusterController clusterController;
    private final StorageManager storageManager;
+   private final int commitInterval;
    private String targetNodeId;
 
    public ScaleDownHandler(PagingManager pagingManager,
                            PostOffice postOffice,
                            NodeManager nodeManager,
                            ClusterController clusterController,
-                           StorageManager storageManager) {
+                           StorageManager storageManager,
+                           int commitInterval) {
       this.pagingManager = pagingManager;
       this.postOffice = postOffice;
       this.nodeManager = nodeManager;
       this.clusterController = clusterController;
       this.storageManager = storageManager;
+      this.commitInterval = commitInterval;
    }
 
    public long scaleDown(ClientSessionFactory sessionFactory,
@@ -214,6 +217,10 @@ public class ScaleDownHandler {
 
                   producer.send(address, message);
                   messageCount++;
+                  if (commitInterval > 0 && messageCount % commitInterval == 
0) {
+                     tx.commit();
+                     tx = new TransactionImpl(storageManager);
+                  }
 
                   messagesIterator.remove();
 
@@ -307,6 +314,10 @@ public class ScaleDownHandler {
                producer.send(message.getAddress(), message);
 
                messageCount++;
+               if (commitInterval > 0 && messageCount % commitInterval == 0) {
+                  tx.commit();
+                  tx = new TransactionImpl(storageManager);
+               }
 
                messagesIterator.remove();
 
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index db1e93eba0..4137a140f1 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3714,6 +3714,14 @@
                </xsd:complexType>
             </xsd:element>
          </xsd:choice>
+         <xsd:element name="commit-interval" type="xsd:int" default="-1" 
maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  How often to commit when scaling messages down from one 
broker to another.
+                  -1 means commit only after processing all the messages from 
a queue.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:sequence>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 6b9fef9cec..9e019323ae 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -953,6 +953,7 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       properties.put("HAPolicyConfiguration.scaleDownConfiguration.groupName", 
"g0");
       
properties.put("HAPolicyConfiguration.scaleDownConfiguration.clusterName", 
"c0");
       properties.put("HAPolicyConfiguration.scaleDownConfiguration.enabled", 
"false");
+      
properties.put("HAPolicyConfiguration.scaleDownConfiguration.commitInterval", 
"73");
    }
 
    private void checkScaleDownConfiguration(ScaleDownConfiguration 
scaleDownConfiguration) {
@@ -962,6 +963,7 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       assertEquals("g0", scaleDownConfiguration.getGroupName());
       assertEquals("c0", scaleDownConfiguration.getClusterName());
       assertFalse(scaleDownConfiguration.isEnabled());
+      assertEquals(73, scaleDownConfiguration.getCommitInterval());
    }
 
    @Test
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 2003207dc8..09bdc01aae 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -491,6 +491,7 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
                <scale-down>
                   <connectors>
                      <connector-ref>server0-connector</connector-ref>
+                     <commit-interval>33</commit-interval>
                   </connectors>
                </scale-down>
             </live-only>
@@ -509,6 +510,7 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
       assertEquals(1, connectors.size());
       String connector = connectors.get(0);
       assertEquals("server0-connector", connector);
+      assertEquals(33, scaledownCfg.getCommitInterval());
    }
 
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 5f82220ad6..c8ff820215 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -451,6 +451,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
       assertNotNull(lopc.getScaleDownConfiguration());
       assertEquals("boo!", lopc.getScaleDownConfiguration().getGroupName());
       assertEquals("dg1", 
lopc.getScaleDownConfiguration().getDiscoveryGroup());
+      assertEquals(33, lopc.getScaleDownConfiguration().getCommitInterval());
 
       for (ClusterConnectionConfiguration ccc : 
conf.getClusterConfigurations()) {
          if (ccc.getName().equals("cluster-connection3")) {
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index ad9e0b1a29..543b2e96ef 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -413,6 +413,7 @@
                <group-name>boo!</group-name>
                <!--either a discovery group-->
                <discovery-group-ref discovery-group-name="dg1"/>
+               <commit-interval>33</commit-interval>
             </scale-down>
          </primary-only>
 
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 99fac28fca..40977255a2 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -301,6 +301,7 @@
                <group-name>boo!</group-name>
                <!--either a discovery group-->
                <discovery-group-ref discovery-group-name="dg1"/>
+               <commit-interval>33</commit-interval>
             </scale-down>
          </primary-only>
 
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
index 5913f09197..5d8ae2b1de 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml
@@ -23,6 +23,7 @@
          <group-name>boo!</group-name>
          <!--either a discovery group-->
          <discovery-group-ref discovery-group-name="dg1"/>
+         <commit-interval>33</commit-interval>
       </scale-down>
    </primary-only>
 
diff --git a/docs/user-manual/ha.adoc b/docs/user-manual/ha.adoc
index 77d15c5927..7cae149fe8 100644
--- a/docs/user-manual/ha.adoc
+++ b/docs/user-manual/ha.adoc
@@ -836,6 +836,19 @@ It is also possible to use discovery to scale down, this 
would look like:
 </ha-policy>
 ----
 
+[NOTE]
+====
+Moving messages from one broker to another during scale-down involves an 
internal transaction.
+By default this transaction is only committed once per queue.
+However, as the number of messages in the queue grows so does the memory 
requirements for the transaction.
+At some point the memory requirements for the transaction will exceed the 
limits of the available heap.
+
+In order to deal with this you can configure the `commit-interval` in the 
`scale-down` element.
+This will allow the transaction to be committed every so often which will free 
the memory from the transaction.
+It must be greater than `0` or `-1`.
+It is `-1` by default (i.e. don't commit until all the messages in the queue 
are scaled-down).
+====
+
 === Scale Down with groups
 
 It is also possible to configure servers to only scale down to servers that 
belong in the same group.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java
new file mode 100644
index 0000000000..212ae65634
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.server.impl.ScaleDownHandler;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ScaleDownCommitIntervalTest extends ClusterTestBase {
+   final int TEST_SIZE = 1000;
+
+   @Override
+   @BeforeEach
+   public void setUp() throws Exception {
+      super.setUp();
+      setupPrimaryServer(0, isFileStorage(), true, true);
+      setupPrimaryServer(1, isFileStorage(), true, true);
+      startServers(0, 1);
+      setupSessionFactory(0, true);
+      setupSessionFactory(1, true);
+   }
+
+   @Test
+   public void testSmallCommitInterval() throws Exception {
+      testCommitInterval(1);
+   }
+
+   @Test
+   public void testMediumCommitInterval() throws Exception {
+      testCommitInterval((int) (TEST_SIZE * 0.33));
+   }
+
+   @Test
+   public void testLargeCommitInterval() throws Exception {
+      testCommitInterval((int) (TEST_SIZE * 0.66));
+   }
+
+   @Test
+   public void testMaxCommitInterval() throws Exception {
+      testCommitInterval(-1);
+   }
+
+   private void testCommitInterval(int commitInterval) throws Exception {
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+      final String queueName2 = "testQueue2";
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName1, null, true);
+      createQueue(0, addressName, queueName2, null, true);
+      createQueue(1, addressName, queueName1, null, true);
+      createQueue(1, addressName, queueName2, null, true);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, true, null);
+
+      // consume a message from queue 2
+      addConsumer(1, 0, queueName2, null, false);
+      ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
+      assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      consumers[1].getSession().commit();
+      removeConsumer(1);
+
+      Wait.assertEquals((long) TEST_SIZE, () ->  
servers[0].locateQueue(queueName1).getMessageCount(), 500, 20);
+      Wait.assertEquals((long) TEST_SIZE - 1, () ->  
servers[0].locateQueue(queueName2).getMessageCount(), 500, 20);
+
+      assertEquals((long) TEST_SIZE, performScaledown(commitInterval));
+
+      // trigger scaleDown from node 0 to node 1
+      servers[0].stop();
+
+      Wait.assertEquals((long) TEST_SIZE, () -> 
servers[1].locateQueue(queueName1).getMessageCount(), 500, 20);
+      Wait.assertEquals((long) TEST_SIZE - 1, () -> 
servers[1].locateQueue(queueName2).getMessageCount(), 500, 20);
+   }
+
+   private long performScaledown(int commitInterval) throws Exception {
+      ScaleDownHandler handler = new 
ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(), 
servers[0].getNodeManager(), 
servers[0].getClusterManager().getClusterController(), 
servers[0].getStorageManager(), commitInterval);
+
+      return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(), 
servers[0].getConfiguration().getClusterUser(), 
servers[0].getConfiguration().getClusterPassword());
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
index ec81070ad0..000726b001 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java
@@ -374,7 +374,7 @@ public class ScaleDownDirectTest extends ClusterTestBase {
    }
 
    private long performScaledown() throws Exception {
-      ScaleDownHandler handler = new 
ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(), 
servers[0].getNodeManager(), 
servers[0].getClusterManager().getClusterController(), 
servers[0].getStorageManager());
+      ScaleDownHandler handler = new 
ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(), 
servers[0].getNodeManager(), 
servers[0].getClusterManager().getClusterController(), 
servers[0].getStorageManager(), -1);
 
       return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(), 
servers[0].getConfiguration().getClusterUser(), 
servers[0].getConfiguration().getClusterPassword());
    }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to