This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new e368dacc78 ARTEMIS-4207 Improving redistribution fix over large 
messages
e368dacc78 is described below

commit e368dacc78ec61cf58dadc297edc51641337a4ad
Author: Clebert Suconic <[email protected]>
AuthorDate: Sat Apr 8 11:02:57 2023 -0400

    ARTEMIS-4207 Improving redistribution fix over large messages
---
 .../core/persistence/impl/nullpm/NullStorageManager.java  |  5 +++++
 .../activemq/artemis/core/postoffice/PostOffice.java      |  3 +--
 .../artemis/core/postoffice/impl/BindingsImpl.java        | 15 ++++++++++-----
 .../artemis/core/postoffice/impl/PostOfficeImpl.java      |  5 ++---
 .../artemis/core/server/cluster/impl/Redistributor.java   | 13 ++++++-------
 .../jmh/WildcardAddressManagerHeirarchyPerfTest.java      |  4 ++--
 .../performance/jmh/WildcardAddressManagerPerfTest.java   |  4 ++--
 .../tests/unit/core/postoffice/impl/BindingsImplTest.java | 10 +++++-----
 .../postoffice/impl/WildcardAddressManagerPerfTest.java   |  4 ++--
 .../tests/unit/core/server/impl/fakes/FakePostOffice.java |  3 +--
 10 files changed, 36 insertions(+), 30 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 3f2771af5b..e41f0dd63d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -101,6 +101,11 @@ public class NullStorageManager implements StorageManager {
       });
    }
 
+   public NullStorageManager(int nextId) {
+      this();
+      this.setNextId(nextId);
+   }
+
    @Override
    public void criticalError(Throwable error) {
       ioCriticalErrorListener.onIOException(error, error.getMessage(), null);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 4c368be6b9..67e830d9da 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -190,8 +190,7 @@ public interface PostOffice extends ActiveMQComponent {
    MessageReference reload(Message message, Queue queue, Transaction tx) 
throws Exception;
 
    Pair<RoutingContext, Message> redistribute(Message message,
-                                                    Queue originatingQueue,
-                                                    Transaction tx) throws 
Exception;
+                                                    Queue originatingQueue) 
throws Exception;
 
    void processRoute(Message message, RoutingContext context, boolean direct) 
throws Exception;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2698af3f1f..a7f279fbd8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -45,8 +46,8 @@ import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
 import org.apache.activemq.artemis.core.server.group.GroupingHandler;
 import org.apache.activemq.artemis.core.server.group.impl.Proposal;
 import org.apache.activemq.artemis.core.server.group.impl.Response;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.CompositeAddress;
-import org.apache.activemq.artemis.utils.IDGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
@@ -76,7 +77,7 @@ public final class BindingsImpl implements Bindings {
 
    private final SimpleString name;
 
-   private final IDGenerator idGenerator;
+   private final StorageManager storageManager;
 
    private static final AtomicInteger sequenceVersion = new 
AtomicInteger(Integer.MIN_VALUE);
 
@@ -85,9 +86,9 @@ public final class BindingsImpl implements Bindings {
     */
    private final AtomicInteger version = new 
AtomicInteger(sequenceVersion.incrementAndGet());
 
-   public BindingsImpl(final SimpleString name, final GroupingHandler 
groupingHandler, IDGenerator idGenerator) {
+   public BindingsImpl(final SimpleString name, final GroupingHandler 
groupingHandler, StorageManager storageManager) {
       this.groupingHandler = groupingHandler;
-      this.idGenerator = idGenerator;
+      this.storageManager = storageManager;
       this.name = name;
    }
 
@@ -235,12 +236,16 @@ public final class BindingsImpl implements Bindings {
       // The message needs a new ID during the redistribution
       // We have to create the new ID only after we can guarantee it will be 
routed
       // otherwise we may leave large messages stranded in the folder
-      final Message copyRedistribute = message.copy(idGenerator.generateID());
+      final Message copyRedistribute = 
message.copy(storageManager.generateID());
       if (logger.isDebugEnabled()) {
          logger.debug("Message {} being copied as {}", message.getMessageID(), 
copyRedistribute.getMessageID());
       }
       copyRedistribute.setAddress(message.getAddress());
 
+      if (context.getTransaction() == null) {
+         context.setTransaction(new TransactionImpl(storageManager));
+      }
+
       bindingIndex.setIndex(nextPosition);
       nextBinding.route(copyRedistribute, context);
       return copyRedistribute;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index c926bbc8b6..6bf72f9948 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1387,12 +1387,11 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
     */
    @Override
    public Pair<RoutingContext, Message> redistribute(final Message message,
-                                                     final Queue 
originatingQueue,
-                                                     final Transaction tx) 
throws Exception {
+                                                     final Queue 
originatingQueue) throws Exception {
       Bindings bindings = 
addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
 
       if (bindings != null && bindings.allowRedistribute()) {
-         RoutingContext context = new RoutingContextImpl(tx);
+         RoutingContext context = new RoutingContextImpl(null);
 
          // the redistributor will make a copy of the message if it can be 
redistributed
          Message redistributedMessage = bindings.redistribute(message, 
originatingQueue, context);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 63724226bd..ac4992e18e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -32,7 +32,6 @@ import 
org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,23 +113,23 @@ public class Redistributor implements Consumer {
          return HandleStatus.NO_MATCH;
       }
 
-      final Transaction tx = new TransactionImpl(storageManager);
-
-      final Pair<RoutingContext, Message> routingInfo = 
postOffice.redistribute(reference.getMessage(), queue, tx);
+      final Pair<RoutingContext, Message> routingInfo = 
postOffice.redistribute(reference.getMessage(), queue);
 
       if (routingInfo == null) {
          logger.debug("postOffice.redistribute return null for message {}", 
reference);
-         tx.rollback();
          return HandleStatus.BUSY;
       }
 
-      postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
+      RoutingContext context = routingInfo.getA();
+      Message message = routingInfo.getB();
+
+      postOffice.processRoute(message, context, false);
 
       if (RefCountMessage.isRefTraceEnabled()) {
          RefCountMessage.deferredDebug(reference.getMessage(), 
"redistributing");
       }
 
-      ackRedistribution(reference, tx);
+      ackRedistribution(reference, context.getTransaction());
 
       return HandleStatus.HANDLED;
    }
diff --git 
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
 
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
index 829abe9db6..8b791146f7 100644
--- 
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
+++ 
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
+import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -31,7 +32,6 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
 import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
@@ -52,7 +52,7 @@ public class WildcardAddressManagerHeirarchyPerfTest {
 
       @Override
       public Bindings createBindings(SimpleString address) {
-         return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
+         return new BindingsImpl(address, null, new NullStorageManager(1000));
       }
    }
 
diff --git 
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
 
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
index a8ec84ec7a..3fe51c4b4b 100644
--- 
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
+++ 
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
+import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -32,7 +33,6 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Group;
@@ -54,7 +54,7 @@ public class WildcardAddressManagerPerfTest {
 
       @Override
       public Bindings createBindings(SimpleString address) {
-         return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
+         return new BindingsImpl(address, null, new NullStorageManager(1000));
       }
    }
 
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 624a641375..26a37c866a 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -45,7 +46,6 @@ import 
org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 import org.apache.activemq.artemis.selector.filter.Filterable;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.junit.Test;
 
 public class BindingsImplTest extends ActiveMQTestBase {
@@ -55,7 +55,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
       final FakeRemoteBinding fake = new FakeRemoteBinding(new 
SimpleString("a"));
       fake.filter = null;  // such that it wil match all messages
       fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
-      final Bindings bind = new BindingsImpl(null, null, new 
SimpleIDGenerator(1000));
+      final Bindings bind = new BindingsImpl(null, null, new 
NullStorageManager(1000));
       bind.addBinding(fake);
       bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new 
FakeTransaction()));
       assertEquals(1, fake.routedCount.get());
@@ -66,7 +66,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
       final FakeRemoteBinding fake = new FakeRemoteBinding(new 
SimpleString("a"));
       fake.filter = null;  // such that it wil match all messages
       fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
-      final Bindings bind = new BindingsImpl(null, null, new 
SimpleIDGenerator(1000));
+      final Bindings bind = new BindingsImpl(null, null, new 
NullStorageManager(1000));
       bind.addBinding(fake);
       bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new 
FakeTransaction()));
       assertEquals(0, fake.routedCount.get());
@@ -77,7 +77,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
       final FakeRemoteBinding fake = new FakeRemoteBinding(new 
SimpleString("a"));
       fake.filter = null;  // such that it wil match all messages
       fake.messageLoadBalancingType = 
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
-      final Bindings bind = new BindingsImpl(null, null, new 
SimpleIDGenerator(1000));
+      final Bindings bind = new BindingsImpl(null, null, new 
NullStorageManager(1000));
       bind.addBinding(fake);
       bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new 
FakeTransaction()));
       assertEquals(0, fake.routedCount.get());
@@ -102,7 +102,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
    private void internalTest(final boolean route) throws Exception {
       final FakeBinding fake = new FakeBinding(new SimpleString("a"));
 
-      final Bindings bind = new BindingsImpl(null, null, new 
SimpleIDGenerator(1000));
+      final Bindings bind = new BindingsImpl(null, null, new 
NullStorageManager(1000));
       bind.addBinding(fake);
       bind.addBinding(new FakeBinding(new SimpleString("a")));
       bind.addBinding(new FakeBinding(new SimpleString("a")));
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
index 874cf6a8fb..b9858f788b 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
+import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -34,7 +35,6 @@ import 
org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
 import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -133,7 +133,7 @@ public class WildcardAddressManagerPerfTest {
 
       @Override
       public Bindings createBindings(SimpleString address) throws Exception {
-         return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
+         return new BindingsImpl(address, null, new NullStorageManager(1000));
       }
    }
 
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index c42914dce5..591fabbb4b 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -251,8 +251,7 @@ public class FakePostOffice implements PostOffice {
 
    @Override
    public Pair<RoutingContext, Message> redistribute(final Message message,
-                                                     final Queue 
originatingQueue,
-                                                     final Transaction tx) 
throws Exception {
+                                                     final Queue 
originatingQueue) throws Exception {
       return null;
    }
 

Reply via email to