This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 397cef6  ARTEMIS-2462 Allow store-forward queue to be deleted afte 
scaledown
     new 84c4c2a  This closes #2813
397cef6 is described below

commit 397cef699aea5a59cfc3970b7735aaf2068bf1ff
Author: Howard Gao <[email protected]>
AuthorDate: Wed Aug 28 21:16:02 2019 +0800

    ARTEMIS-2462 Allow store-forward queue to be deleted afte scaledown
    
    After a node is scaled down to a target node, the sf queue in the
    target node is not deleted.
    
    Normally this is fine because may be reused when the scaled down
    node is back up.
    
    However in cloud environment many drainer pods can be created and
    then shutdown in order to drain the messages to a live node (pod).
    Each drainer pod will have a different node-id. Over time the sf
    queues in the target broker node grows and those sf queues are
    no longer reused.
    
    Although use can use management API/console to manually delete
    them, it would be nice to have an option to automatically delete
    those sf queue/address resources after scale down.
    
    In this PR it added a boolean configuration parameter called
    cleanup-sf-queue to scale down policy so that if the parameter
    is "true" the broker will send a message to the
    target broker signalling that the SF queue is no longer
    needed and should be deleted.
    
    If the parameter is not defined (default) or is "false"
    the scale down won't remove the sf queue.
---
 .../api/config/ActiveMQDefaultConfiguration.java   |   7 +
 .../core/protocol/core/impl/PacketImpl.java        |   1 +
 .../artemis/core/config/ConfigurationUtils.java    |   4 +-
 .../core/config/ScaleDownConfiguration.java        |  11 ++
 .../deployers/impl/FileConfigurationParser.java    |   4 +-
 .../artemis/core/protocol/ServerPacketDecoder.java |   6 +
 .../impl/wireformat/ScaleDownAnnounceMessage.java  |   8 +-
 ...essage.java => ScaleDownAnnounceMessageV2.java} |  35 +----
 .../apache/activemq/artemis/core/server/Queue.java |   1 +
 .../core/server/cluster/ClusterConnection.java     |   8 ++
 .../core/server/cluster/ClusterControl.java        |   7 +-
 .../core/server/cluster/ClusterController.java     |  10 +-
 .../core/server/cluster/ha/ScaleDownPolicy.java    |  12 +-
 .../core/server/cluster/impl/BridgeImpl.java       |   4 +
 .../cluster/impl/ClusterConnectionBridge.java      |   5 +
 .../server/cluster/impl/ClusterConnectionImpl.java |  27 +++-
 .../server/impl/BackupRecoveryJournalLoader.java   |   9 +-
 .../core/server/impl/LiveOnlyActivation.java       |   3 +-
 .../artemis/core/server/impl/QueueImpl.java        |  26 ++++
 .../artemis/core/server/impl/ScaleDownHandler.java |   6 +-
 .../server/impl/SharedNothingBackupActivation.java |   2 +-
 .../server/impl/SharedStoreBackupActivation.java   |   2 +-
 .../resources/schema/artemis-configuration.xsd     |   7 +
 .../config/impl/FileConfigurationParserTest.java   |  33 +++++
 .../core/config/impl/FileConfigurationTest.java    |   1 +
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   5 +
 docs/user-manual/en/ha.md                          |  24 ++++
 .../integration/server/ScaleDownRemoveSFTest.java  | 148 +++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   5 +
 29 files changed, 372 insertions(+), 49 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index d41e8d3..a327084 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -430,6 +430,9 @@ public final class ActiveMQDefaultConfiguration {
    // its possible that you only want a server to partake in scale down as a 
receiver, via a group. In this case set scale-down to false
    private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
 
+   // will the target node delete the store-and-forward queue for the scaled 
down node.
+   private static boolean DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE = false;
+
    // How long to wait for a decision
    private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;
 
@@ -1531,4 +1534,8 @@ public final class ActiveMQDefaultConfiguration {
    public static long getDefaultRetryReplicationWait() {
       return DEFAULT_RETRY_REPLICATION_WAIT;
    }
+
+   public static boolean isDefaultCleanupSfQueue() {
+      return DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE;
+   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index a7a3253..ef2fae8 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -277,6 +277,7 @@ public class PacketImpl implements Packet {
 
    public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
 
+   public static final byte SCALEDOWN_ANNOUNCEMENT_V2 = -16;
 
    // Static --------------------------------------------------------
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
index a314947..5697460 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java
@@ -125,9 +125,9 @@ public final class ConfigurationUtils {
    public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration 
scaleDownConfiguration) {
       if (scaleDownConfiguration != null) {
          if (scaleDownConfiguration.getDiscoveryGroup() != null) {
-            return new 
ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled());
+            return new 
ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
          } else {
-            return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled());
+            return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), 
scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), 
scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
          }
       }
       return null;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
index 5f58e36..d0ea7d6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java
@@ -34,6 +34,8 @@ public class ScaleDownConfiguration implements Serializable {
 
    private boolean enabled = 
ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();
 
+   private boolean cleanupSfQueue = 
ActiveMQDefaultConfiguration.isDefaultCleanupSfQueue();
+
    public List<String> getConnectors() {
       return connectors;
    }
@@ -83,4 +85,13 @@ public class ScaleDownConfiguration implements Serializable {
       this.enabled = enabled;
       return this;
    }
+
+   public Boolean isCleanupSfQueue() {
+      return this.cleanupSfQueue;
+   }
+
+   public ScaleDownConfiguration setCleanupSfQueue(Boolean cleanupSfQueue) {
+      this.cleanupSfQueue = cleanupSfQueue;
+      return this;
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 9b23c29..70fe47c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -1578,6 +1578,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
          Element scaleDownElement = (Element) scaleDownNode.item(0);
 
+         scaleDownConfiguration.setCleanupSfQueue(getBoolean(scaleDownElement, 
"cleanup-sf-queue", scaleDownConfiguration.isCleanupSfQueue()));
+
          scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, 
"enabled", scaleDownConfiguration.isEnabled()));
 
          NodeList discoveryGroupRef = 
scaleDownElement.getElementsByTagName("discovery-group-ref");
@@ -1791,8 +1793,6 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       int clusterNotificationAttempts = getInteger(e, "notification-attempts", 
ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), 
Validators.GT_ZERO);
 
-      String scaleDownConnector = e.getAttribute("scale-down-connector");
-
       String discoveryGroupName = null;
 
       List<String> staticConnectorNames = new ArrayList<>();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 0428abe..853b3ae 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -47,6 +47,7 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
@@ -76,6 +77,7 @@ import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
+import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2;
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
@@ -252,6 +254,10 @@ public class ServerPacketDecoder extends 
ClientPacketDecoder {
             packet = new ScaleDownAnnounceMessage();
             break;
          }
+         case SCALEDOWN_ANNOUNCEMENT_V2: {
+            packet = new ScaleDownAnnounceMessageV2();
+            break;
+         }
          default: {
             packet = super.decode(packetType, connection);
          }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
index 7a6f147..3c18adb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
@@ -22,13 +22,17 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
 public class ScaleDownAnnounceMessage extends PacketImpl {
 
-   private SimpleString targetNodeId;
-   private SimpleString scaledDownNodeId;
+   protected SimpleString targetNodeId;
+   protected SimpleString scaledDownNodeId;
 
    public ScaleDownAnnounceMessage() {
       super(SCALEDOWN_ANNOUNCEMENT);
    }
 
+   public ScaleDownAnnounceMessage(byte type) {
+      super(type);
+   }
+
    public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString 
scaledDownNodeId) {
       super(SCALEDOWN_ANNOUNCEMENT);
       this.targetNodeId = targetNodeId;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java
similarity index 51%
copy from 
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
copy to 
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java
index 7a6f147..a5c09cd 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java
@@ -16,42 +16,17 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 
-public class ScaleDownAnnounceMessage extends PacketImpl {
+public class ScaleDownAnnounceMessageV2 extends ScaleDownAnnounceMessage {
 
-   private SimpleString targetNodeId;
-   private SimpleString scaledDownNodeId;
-
-   public ScaleDownAnnounceMessage() {
-      super(SCALEDOWN_ANNOUNCEMENT);
+   public ScaleDownAnnounceMessageV2() {
+      super(SCALEDOWN_ANNOUNCEMENT_V2);
    }
 
-   public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString 
scaledDownNodeId) {
-      super(SCALEDOWN_ANNOUNCEMENT);
+   public ScaleDownAnnounceMessageV2(SimpleString targetNodeId, SimpleString 
scaledDownNodeId) {
+      this();
       this.targetNodeId = targetNodeId;
       this.scaledDownNodeId = scaledDownNodeId;
    }
-
-   @Override
-   public void encodeRest(ActiveMQBuffer buffer) {
-      buffer.writeSimpleString(targetNodeId);
-      buffer.writeSimpleString(scaledDownNodeId);
-   }
-
-   @Override
-   public void decodeRest(ActiveMQBuffer buffer) {
-      targetNodeId = buffer.readSimpleString();
-      scaledDownNodeId = buffer.readSimpleString();
-   }
-
-   public SimpleString getTargetNodeId() {
-      return targetNodeId;
-   }
-
-   public SimpleString getScaledDownNodeId() {
-      return scaledDownNodeId;
-   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 8f344ff..3ff1b84 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -450,4 +450,5 @@ public interface Queue extends Bindable,CriticalComponent {
 
    }
 
+   boolean internalDelete();
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
index 6171476..9c4ea18 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java
@@ -25,6 +25,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.core.client.impl.Topology;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
 
@@ -96,4 +97,11 @@ public interface ClusterConnection extends 
ActiveMQComponent, ClusterTopologyLis
     * @return
     */
    BridgeMetrics getBridgeMetrics(String nodeId);
+
+   /**
+    * Remove the store-and-forward queue after scale down
+    */
+   void removeSfQueue(SimpleString scaledDownNodeId);
+
+   void removeSfQueue(Queue queue);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
index 07f0fc2..991b8b3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java
@@ -35,6 +35,7 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnoun
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -195,8 +196,10 @@ public class ClusterControl implements AutoCloseable {
       return requestBackup(backupRequestMessage);
    }
 
-   public void announceScaleDown(SimpleString targetNodeId, SimpleString 
scaledDownNodeId) {
-      ScaleDownAnnounceMessage announceMessage = new 
ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
+   public void announceScaleDown(SimpleString targetNodeId, SimpleString 
scaledDownNodeId, boolean isCleanupSfQueue) {
+
+      ScaleDownAnnounceMessage announceMessage = isCleanupSfQueue ? new 
ScaleDownAnnounceMessageV2(targetNodeId, scaledDownNodeId) : new 
ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
+
       clusterChannel.send(announceMessage);
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 86cd0df..572a919 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -400,12 +401,19 @@ public class ClusterController implements 
ActiveMQComponent {
                Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), 
quorumVoteMessage.getVote());
                
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
                clusterChannel.send(new 
QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
-            } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
+            } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT 
|| packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
                ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) 
packet;
                //we don't really need to check as it should always be true
                if (server.getNodeID().equals(message.getTargetNodeId())) {
                   server.addScaledDownNode(message.getScaledDownNodeId());
                }
+               if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
+                  ClusterManager clusterManager = 
ClusterController.this.server.getClusterManager();
+                  Set<ClusterConnection> ccs = 
clusterManager.getClusterConnections();
+                  for (ClusterConnection cc : ccs) {
+                     cc.removeSfQueue(message.getScaledDownNodeId());
+                  }
+               }
             } else if (channelHandler != null) {
                channelHandler.handlePacket(packet);
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
index 0ef96d5..a7db3e6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java
@@ -41,21 +41,25 @@ public class ScaleDownPolicy {
 
    private boolean enabled;
 
+   private boolean isCleanupSfQueue;
+
    public ScaleDownPolicy() {
    }
 
-   public ScaleDownPolicy(List<String> connectors, String groupName, String 
clusterName, boolean enabled) {
+   public ScaleDownPolicy(List<String> connectors, String groupName, String 
clusterName, boolean enabled, boolean isCleanupSfQueue) {
       this.connectors = connectors;
       this.groupName = groupName;
       this.clusterName = clusterName;
       this.enabled = enabled;
+      this.isCleanupSfQueue = isCleanupSfQueue;
    }
 
-   public ScaleDownPolicy(String discoveryGroup, String groupName, String 
clusterName, boolean enabled) {
+   public ScaleDownPolicy(String discoveryGroup, String groupName, String 
clusterName, boolean enabled, boolean isCleanupSfQueue) {
       this.discoveryGroup = discoveryGroup;
       this.groupName = groupName;
       this.clusterName = clusterName;
       this.enabled = enabled;
+      this.isCleanupSfQueue = isCleanupSfQueue;
    }
 
    public List<String> getConnectors() {
@@ -124,4 +128,8 @@ public class ScaleDownPolicy {
                                                                     
ActiveMQServer activeMQServer) {
       return 
activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
    }
+
+   public boolean isCleanupSfQueue() {
+      return this.isCleanupSfQueue;
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 1024118..6c692e7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -1113,6 +1113,9 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
 
    }
 
+   protected void postStop() {
+   }
+
 
    // Inner classes -------------------------------------------------
 
@@ -1229,6 +1232,7 @@ public class BridgeImpl implements Bridge, 
SessionFailureListener, SendAcknowled
             logger.trace("Removing consumer on stopRunnable " + this + " from 
queue " + queue);
          }
          ActiveMQServerLogger.LOGGER.bridgeStopped(name);
+         postStop();
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index 96caf46..5603dfd 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -382,6 +382,11 @@ public class ClusterConnectionBridge extends BridgeImpl {
       super.nodeUP(member, last);
    }
 
+   @Override
+   protected void postStop() {
+      clusterConnection.removeSfQueue(queue);
+   }
+
 
    @Override
    protected void afterConnect() throws Exception {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 6ee2da4..a8bf90e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -710,7 +710,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
                // New node - create a new flow record
 
-               final SimpleString queueName = new 
SimpleString(storeAndForwardPrefix + name + "." + nodeID);
+               final SimpleString queueName = getSfQueueName(nodeID);
 
                Binding queueBinding = postOffice.getBinding(queueName);
 
@@ -741,6 +741,10 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
       }
    }
 
+   public SimpleString getSfQueueName(String nodeID) {
+      return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
+   }
+
    @Override
    public synchronized void informClusterOfBackup() {
       String nodeID = server.getNodeID().toString();
@@ -770,6 +774,27 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
       return record != null && record.getBridge() != null ? 
record.getBridge().getMetrics() : null;
    }
 
+   @Override
+   public void removeSfQueue(SimpleString scaledDownNodeId) {
+      SimpleString sfQName = getSfQueueName(scaledDownNodeId.toString());
+      Binding binding = server.getPostOffice().getBinding(sfQName);
+
+      if (binding != null) {
+         removeSfQueue((Queue) binding.getBindable());
+      }
+   }
+
+   @Override
+   public void removeSfQueue(Queue queue) {
+      if (queue.internalDelete()) {
+         try {
+            server.removeAddressInfo(queue.getAddress(), null);
+         } catch (Exception e) {
+            logger.debug("Failed to remove sf address: " + queue.getAddress(), 
e);
+         }
+      }
+   }
+
    private void createNewRecord(final long eventUID,
                                 final String targetNodeID,
                                 final TransportConfiguration connector,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
index 77b9f3f..8d700ed 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import 
org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -49,6 +50,7 @@ public class BackupRecoveryJournalLoader extends 
PostOfficeJournalLoader {
    private ActiveMQServer parentServer;
    private ServerLocator locator;
    private final ClusterController clusterController;
+   private ScaleDownPolicy scaleDownPolicy;
 
    public BackupRecoveryJournalLoader(PostOffice postOffice,
                                       PagingManager pagingManager,
@@ -60,12 +62,14 @@ public class BackupRecoveryJournalLoader extends 
PostOfficeJournalLoader {
                                       Configuration configuration,
                                       ActiveMQServer parentServer,
                                       ServerLocatorInternal locator,
-                                      ClusterController clusterController) {
+                                      ClusterController clusterController,
+                                      ScaleDownPolicy scaleDownPolicy) {
 
       super(postOffice, pagingManager, storageManager, queueFactory, 
nodeManager, managementService, groupingHandler, configuration);
       this.parentServer = parentServer;
       this.locator = locator;
       this.clusterController = clusterController;
+      this.scaleDownPolicy = scaleDownPolicy;
    }
 
    @Override
@@ -87,11 +91,12 @@ public class BackupRecoveryJournalLoader extends 
PostOfficeJournalLoader {
    public void postLoad(Journal messageJournal,
                         ResourceManager resourceManager,
                         Map<SimpleString, List<Pair<byte[], Long>>> 
duplicateIDMap) throws Exception {
+
       ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, 
postOffice, nodeManager, clusterController, parentServer.getStorageManager());
       
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
 
       try (ClientSessionFactory sessionFactory = 
locator.createSessionFactory()) {
-         scaleDownHandler.scaleDown(sessionFactory, resourceManager, 
duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), 
parentServer.getNodeID());
+         scaleDownHandler.scaleDown(sessionFactory, resourceManager, 
duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), 
parentServer.getNodeID(), this.scaleDownPolicy);
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
index 8dd160d..bea060c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
@@ -179,6 +179,7 @@ public class LiveOnlyActivation extends Activation {
          DuplicateIDCache duplicateIDCache = 
activeMQServer.getPostOffice().getDuplicateIDCache(address);
          duplicateIDMap.put(address, duplicateIDCache.getMap());
       }
-      return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, 
activeMQServer.getResourceManager(), duplicateIDMap, 
activeMQServer.getManagementService().getManagementAddress(), null);
+
+      return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, 
activeMQServer.getResourceManager(), duplicateIDMap, 
activeMQServer.getManagementService().getManagementAddress(), null, 
this.liveOnlyPolicy.getScaleDownPolicy());
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a3b4809..04ca5d4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -313,6 +313,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private volatile long ringSize;
 
+   private Boolean removeSf;
+
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -2534,6 +2536,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    @Override
    public void setInternalQueue(boolean internalQueue) {
       this.internalQueue = internalQueue;
+      this.removeSf = null;
    }
 
    // Public
@@ -3461,6 +3464,29 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       }
    }
 
+   /**
+    * Delete the store and forward queue
+    * Only the second caller (if there is one) of this method does the actual 
deletion.
+    * The logic makes sure the sf queue is deleted only after bridge is 
stopped.
+    */
+   @Override
+   public synchronized boolean internalDelete() {
+      if (this.isInternalQueue()) {
+         if (removeSf == null) {
+            removeSf = false;
+         } else if (removeSf == false) {
+            try {
+               deleteQueue();
+               removeSf = true;
+               return true;
+            } catch (Exception e) {
+               logger.debug("Error removing sf queue " + getName(), e);
+            }
+         }
+      }
+      return false;
+   }
+
    private boolean checkExpired(final MessageReference reference) {
       try {
          if (reference.getMessage().isExpired()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index db51dcc..d84d199 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
 import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@@ -91,14 +92,15 @@ public class ScaleDownHandler {
                          ResourceManager resourceManager,
                          Map<SimpleString, List<Pair<byte[], Long>>> 
duplicateIDMap,
                          SimpleString managementAddress,
-                         SimpleString targetNodeId) throws Exception {
+                         SimpleString targetNodeId,
+                         ScaleDownPolicy scaleDownPolicy) throws Exception {
       ClusterControl clusterControl = 
clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) 
sessionFactory);
       clusterControl.authorize();
       long num = scaleDownMessages(sessionFactory, targetNodeId, 
clusterControl.getClusterUser(), clusterControl.getClusterPassword());
       ActiveMQServerLogger.LOGGER.infoScaledDownMessages(num);
       scaleDownTransactions(sessionFactory, resourceManager, 
clusterControl.getClusterUser(), clusterControl.getClusterPassword());
       scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, 
clusterControl.getClusterUser(), clusterControl.getClusterPassword());
-      clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), 
nodeManager.getNodeId());
+      clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), 
nodeManager.getNodeId(), scaleDownPolicy.isCleanupSfQueue());
       return num;
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 587b8f0..7adc190 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -411,7 +411,7 @@ public final class SharedNothingBackupActivation extends 
Activation {
                                             Configuration configuration,
                                             ActiveMQServer parentServer) 
throws ActiveMQException {
       if (replicaPolicy.getScaleDownPolicy() != null && 
replicaPolicy.getScaleDownPolicy().isEnabled()) {
-         return new BackupRecoveryJournalLoader(postOffice, pagingManager, 
storageManager, queueFactory, nodeManager, managementService, groupingHandler, 
configuration, parentServer, 
ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), 
activeMQServer), activeMQServer.getClusterManager().getClusterController());
+         return new BackupRecoveryJournalLoader(postOffice, pagingManager, 
storageManager, queueFactory, nodeManager, managementService, groupingHandler, 
configuration, parentServer, 
ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), 
activeMQServer), activeMQServer.getClusterManager().getClusterController(), 
replicaPolicy.getScaleDownPolicy());
       } else {
          return super.createJournalLoader(postOffice, pagingManager, 
storageManager, queueFactory, nodeManager, managementService, groupingHandler, 
configuration, parentServer);
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
index c978ff6..20ad7b3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java
@@ -172,7 +172,7 @@ public final class SharedStoreBackupActivation extends 
Activation {
                                             Configuration configuration,
                                             ActiveMQServer parentServer) 
throws ActiveMQException {
       if (sharedStoreSlavePolicy.getScaleDownPolicy() != null && 
sharedStoreSlavePolicy.getScaleDownPolicy().isEnabled()) {
-         return new BackupRecoveryJournalLoader(postOffice, pagingManager, 
storageManager, queueFactory, nodeManager, managementService, groupingHandler, 
configuration, parentServer, 
ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(),
 activeMQServer), activeMQServer.getClusterManager().getClusterController());
+         return new BackupRecoveryJournalLoader(postOffice, pagingManager, 
storageManager, queueFactory, nodeManager, managementService, groupingHandler, 
configuration, parentServer, 
ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(),
 activeMQServer), activeMQServer.getClusterManager().getClusterController(), 
sharedStoreSlavePolicy.getScaleDownPolicy());
       } else {
          return super.createJournalLoader(postOffice, pagingManager, 
storageManager, queueFactory, nodeManager, managementService, groupingHandler, 
configuration, parentServer);
       }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index fea5cc6..fc76b6f 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2881,6 +2881,13 @@
                </xsd:complexType>
             </xsd:element>
          </xsd:choice>
+         <xsd:element name="cleanup-sf-queue" type="xsd:boolean" 
default="false" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  Tells the target node whether delete the store and forward 
queue after scale down.
+               </xsd:documentation>
+            </xsd:annotation>
+         </xsd:element>
       </xsd:sequence>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index df1ee08..4129ee6 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -28,7 +28,9 @@ import 
org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.FileDeploymentManager;
 import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import 
org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -272,6 +274,37 @@ public class FileConfigurationParserTest extends 
ActiveMQTestBase {
       testParsingOverFlow("<bridges> \n" + "  <bridge 
name=\"price-forward-bridge\"> \n" + "    
<queue-name>priceForwarding</queue-name>  \n" + "    
<forwarding-address>newYorkPriceUpdates</forwarding-address>\n" + "    
<producer-window-size>2147483648</producer-window-size>\n" + "    
<static-connectors> \n" + "      <connector-ref>netty</connector-ref> \n" + "   
 </static-connectors> \n" + "  </bridge> \n" + "</bridges>\n");
    }
 
+   @Test
+   public void testParsingScaleDownConfig() throws Exception {
+      FileConfigurationParser parser = new FileConfigurationParser();
+
+      String configStr = firstPart + "<ha-policy>\n" +
+               "   <live-only>\n" +
+               "      <scale-down>\n" +
+               "         <connectors>\n" +
+               "            
<connector-ref>server0-connector</connector-ref>\n" +
+               "         </connectors>\n" +
+               "         <cleanup-sf-queue>true</cleanup-sf-queue>\n" +
+               "      </scale-down>\n" +
+               "   </live-only>\n" +
+               "</ha-policy>\n" + lastPart;
+      ByteArrayInputStream input = new 
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      Configuration config = parser.parseMainConfig(input);
+
+      HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
+      assertTrue(haConfig instanceof LiveOnlyPolicyConfiguration);
+
+      LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) 
haConfig;
+      ScaleDownConfiguration scaledownCfg = 
liveOnlyCfg.getScaleDownConfiguration();
+      assertTrue(scaledownCfg.isCleanupSfQueue());
+      List<String> connectors = scaledownCfg.getConnectors();
+      assertEquals(1, connectors.size());
+      String connector = connectors.get(0);
+      assertEquals("server0-connector", connector);
+   }
+
+
    private void testParsingOverFlow(String config) throws Exception {
       FileConfigurationParser parser = new FileConfigurationParser();
       String firstPartWithoutAddressSettings = firstPart.substring(0, 
firstPart.indexOf("<address-settings"));
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index e0f6372..352688d 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -299,6 +299,7 @@ public class FileConfigurationTest extends 
ConfigurationImplTest {
       assertNotNull(lopc.getScaleDownConfiguration());
       assertEquals(lopc.getScaleDownConfiguration().getGroupName(), "boo!");
       assertEquals(lopc.getScaleDownConfiguration().getDiscoveryGroup(), 
"dg1");
+      assertFalse(lopc.getScaleDownConfiguration().isCleanupSfQueue());
 
       for (ClusterConnectionConfiguration ccc : 
conf.getClusterConfigurations()) {
          if (ccc.getName().equals("cluster-connection1")) {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 8b4869c..5646905 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -889,6 +889,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public boolean internalDelete() {
+         return false;
+      }
+
+      @Override
       public void unproposed(SimpleString groupID) {
 
       }
diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md
index 1b82426..6df4648 100644
--- a/docs/user-manual/en/ha.md
+++ b/docs/user-manual/en/ha.md
@@ -695,6 +695,30 @@ transactions are there for the client when it reconnects. 
The normal
 reconnect settings apply when the client is reconnecting so these should
 be high enough to deal with the time needed to scale down.
 
+#### Automatic Deleting Store-and-Forward Queue after Scale Down
+
+By default after the node is scaled down to a target node the internal
+SF queue is not deleted. There is a boolean configuration parameter called 
+"cleanup-sf-queue" that can be used in case you want to delete it.
+
+To do so you need to add this parameter to the scale-down policy and
+set it to "true". For example:
+
+```xml
+<ha-policy>
+   <live-only>
+      <scale-down>
+         ...
+         <cleanup-sf-queue>true</cleanup-sf-queue>
+      </scale-down>
+   </live-only>
+</ha-policy>
+```
+
+With the above config in place when the scale down node is
+stopped, it will send a message to the target node once the scale down
+is complete. The target node will then properly delete the SF queue and its 
address.
+
 ## Failover Modes
 
 Apache ActiveMQ Artemis defines two types of client failover:
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
new file mode 100644
index 0000000..92d2635
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(value = Parameterized.class)
+public class ScaleDownRemoveSFTest extends ClusterTestBase {
+
+   @Parameterized.Parameters(name = "RemoveOption={0}")
+   public static Collection getParameters() {
+      return Arrays.asList(new Object[][]{{"default"}, {"true"}, {"false"}});
+   }
+
+   public ScaleDownRemoveSFTest(String option) {
+      this.option = option;
+   }
+
+   private String option;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      ScaleDownConfiguration scaleDownConfiguration = new 
ScaleDownConfiguration();
+      if (!"default".equals(option)) {
+         scaleDownConfiguration.setCleanupSfQueue("true".equals(this.option));
+      }
+      setupLiveServer(0, isFileStorage(), isNetty(), true);
+      setupLiveServer(1, isFileStorage(), isNetty(), true);
+      LiveOnlyPolicyConfiguration haPolicyConfiguration0 = 
(LiveOnlyPolicyConfiguration) 
servers[0].getConfiguration().getHAPolicyConfiguration();
+      haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration);
+      LiveOnlyPolicyConfiguration haPolicyConfiguration1 = 
(LiveOnlyPolicyConfiguration) 
servers[1].getConfiguration().getHAPolicyConfiguration();
+      haPolicyConfiguration1.setScaleDownConfiguration(new 
ScaleDownConfiguration());
+
+      setupClusterConnection("cluster0", "testAddress", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster0", "testAddress", 
MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+      
haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      
haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
+      servers[0].getConfiguration().getAddressesSettings().put("#", new 
AddressSettings().setRedistributionDelay(0));
+      servers[1].getConfiguration().getAddressesSettings().put("#", new 
AddressSettings().setRedistributionDelay(0));
+      startServers(0, 1);
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      super.tearDown();
+   }
+
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testScaleDownCheckSF() throws Exception {
+      final int TEST_SIZE = 2;
+      final String addressName = "testAddress";
+      final String queueName1 = "testQueue1";
+
+      // create 2 queues on each node mapped to the same address
+      createQueue(0, addressName, queueName1, null, true);
+      createQueue(1, addressName, queueName1, null, true);
+
+      // send messages to node 0
+      send(0, addressName, TEST_SIZE, true, null);
+
+      // consume a message from queue 1
+      addConsumer(1, 0, queueName1, null, false);
+      ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+      consumers[1].getSession().commit();
+
+      Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) 
servers[0].getPostOffice().getBinding(new 
SimpleString(queueName1))).getQueue()));
+
+      //check sf queue on server1 exists
+      ClusterConnectionImpl clusterconn1 = (ClusterConnectionImpl) 
servers[1].getClusterManager().getClusterConnection("cluster0");
+      SimpleString sfQueueName = 
clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
+
+      System.out.println("[sf queue on server 1]: " + sfQueueName);
+      QueueQueryResult result = servers[1].queueQuery(sfQueueName);
+      assertTrue(result.isExists());
+
+      // trigger scaleDown from node 0 to node 1
+      servers[0].stop();
+
+      addConsumer(0, 1, queueName1, null);
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      // ensure there are no more messages on queue 1
+      clientMessage = consumers[0].getConsumer().receive(250);
+      Assert.assertNull(clientMessage);
+      removeConsumer(0);
+
+      //check
+      result = servers[1].queueQuery(sfQueueName);
+      AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
+      if ("true".equals(option)) {
+         assertFalse(result.isExists());
+         assertFalse(result2.isExists());
+      } else {
+         assertTrue(result.isExists());
+         assertTrue(result2.isExists());
+      }
+
+   }
+
+}
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 3f236c3..4cf5346 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -197,6 +197,11 @@ public class FakeQueue extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
+   public boolean internalDelete() {
+      return false;
+   }
+
+   @Override
    public boolean isPersistedPause() {
       return false;
    }

Reply via email to