ARTEMIS-1791 Large message files are not removed after redistribution across a 
cluster


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/de5c0d51
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/de5c0d51
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/de5c0d51

Branch: refs/heads/master
Commit: de5c0d51b976a2a3c60235da56cfd854418007a7
Parents: c69d6b0
Author: Howard Gao <howard....@gmail.com>
Authored: Mon Apr 9 11:07:49 2018 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Mon Apr 9 11:06:27 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/postoffice/Bindings.java       |  2 ++
 .../core/postoffice/impl/BindingsImpl.java      |  5 ++++
 .../core/postoffice/impl/PostOfficeImpl.java    | 25 ++++++++++++++++----
 .../core/server/cluster/impl/Redistributor.java |  1 +
 .../impl/WildcardAddressManagerUnitTest.java    |  5 ++++
 5 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
index f3592c4..30a2680 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java
@@ -39,4 +39,6 @@ public interface Bindings extends UnproposalListener {
    boolean redistribute(Message message, Queue originatingQueue, 
RoutingContext context) throws Exception;
 
    void route(Message message, RoutingContext context) throws Exception;
+
+   boolean allowRedistribute();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2e2b31c..478c700 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -155,6 +155,11 @@ public final class BindingsImpl implements Bindings {
    }
 
    @Override
+   public boolean allowRedistribute() {
+      return 
messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
+   }
+
+   @Override
    public boolean redistribute(final Message message,
                                final Queue originatingQueue,
                                final RoutingContext context) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b2bfe37..f1f7a38 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -970,14 +970,29 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
    public Pair<RoutingContext, Message> redistribute(final Message message,
                                                      final Queue 
originatingQueue,
                                                      final Transaction tx) 
throws Exception {
-      // We have to copy the message and store it separately, otherwise we may 
lose remote bindings in case of restart before the message
-      // arrived the target node
-      // as described on https://issues.jboss.org/browse/JBPAPP-6130
-      Message copyRedistribute = message.copy(storageManager.generateID());
 
       Bindings bindings = 
addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
 
-      if (bindings != null) {
+      if (bindings != null && bindings.allowRedistribute()) {
+         // We have to copy the message and store it separately, otherwise we 
may lose remote bindings in case of restart before the message
+         // arrived the target node
+         // as described on https://issues.jboss.org/browse/JBPAPP-6130
+         Message copyRedistribute = message.copy(storageManager.generateID());
+         if (tx != null) {
+            tx.addOperation(new TransactionOperationAbstract() {
+               @Override
+               public void afterRollback(Transaction tx) {
+                  try {
+                     //this will cause large message file to be
+                     //cleaned up
+                     copyRedistribute.decrementRefCount();
+                  } catch (Exception e) {
+                     logger.warn("Failed to clean up message: " + 
copyRedistribute);
+                  }
+               }
+            });
+         }
+
          RoutingContext context = new RoutingContextImpl(tx);
 
          boolean routed = bindings.redistribute(copyRedistribute, 
originatingQueue, context);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index cfb9eee..7982018 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -150,6 +150,7 @@ public class Redistributor implements Consumer {
       final Pair<RoutingContext, Message> routingInfo = 
postOffice.redistribute(reference.getMessage(), queue, tx);
 
       if (routingInfo == null) {
+         tx.rollback();
          return HandleStatus.BUSY;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
index 1c13cbd..40fadf9 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java
@@ -345,6 +345,11 @@ public class WildcardAddressManagerUnitTest extends 
ActiveMQTestBase {
       public void route(Message message, RoutingContext context) throws 
Exception {
          System.out.println("routing message: " + message);
       }
+
+      @Override
+      public boolean allowRedistribute() {
+         return false;
+      }
    }
 
 }

Reply via email to