This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new e368dacc78 ARTEMIS-4207 Improving redistribution fix over large
messages
e368dacc78 is described below
commit e368dacc78ec61cf58dadc297edc51641337a4ad
Author: Clebert Suconic <[email protected]>
AuthorDate: Sat Apr 8 11:02:57 2023 -0400
ARTEMIS-4207 Improving redistribution fix over large messages
---
.../core/persistence/impl/nullpm/NullStorageManager.java | 5 +++++
.../activemq/artemis/core/postoffice/PostOffice.java | 3 +--
.../artemis/core/postoffice/impl/BindingsImpl.java | 15 ++++++++++-----
.../artemis/core/postoffice/impl/PostOfficeImpl.java | 5 ++---
.../artemis/core/server/cluster/impl/Redistributor.java | 13 ++++++-------
.../jmh/WildcardAddressManagerHeirarchyPerfTest.java | 4 ++--
.../performance/jmh/WildcardAddressManagerPerfTest.java | 4 ++--
.../tests/unit/core/postoffice/impl/BindingsImplTest.java | 10 +++++-----
.../postoffice/impl/WildcardAddressManagerPerfTest.java | 4 ++--
.../tests/unit/core/server/impl/fakes/FakePostOffice.java | 3 +--
10 files changed, 36 insertions(+), 30 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 3f2771af5b..e41f0dd63d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -101,6 +101,11 @@ public class NullStorageManager implements StorageManager {
});
}
+ public NullStorageManager(int nextId) {
+ this();
+ this.setNextId(nextId);
+ }
+
@Override
public void criticalError(Throwable error) {
ioCriticalErrorListener.onIOException(error, error.getMessage(), null);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 4c368be6b9..67e830d9da 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -190,8 +190,7 @@ public interface PostOffice extends ActiveMQComponent {
MessageReference reload(Message message, Queue queue, Transaction tx)
throws Exception;
Pair<RoutingContext, Message> redistribute(Message message,
- Queue originatingQueue,
- Transaction tx) throws
Exception;
+ Queue originatingQueue)
throws Exception;
void processRoute(Message message, RoutingContext context, boolean direct)
throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2698af3f1f..a7f279fbd8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@@ -45,8 +46,8 @@ import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.Proposal;
import org.apache.activemq.artemis.core.server.group.impl.Response;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.CompositeAddress;
-import org.apache.activemq.artemis.utils.IDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@@ -76,7 +77,7 @@ public final class BindingsImpl implements Bindings {
private final SimpleString name;
- private final IDGenerator idGenerator;
+ private final StorageManager storageManager;
private static final AtomicInteger sequenceVersion = new
AtomicInteger(Integer.MIN_VALUE);
@@ -85,9 +86,9 @@ public final class BindingsImpl implements Bindings {
*/
private final AtomicInteger version = new
AtomicInteger(sequenceVersion.incrementAndGet());
- public BindingsImpl(final SimpleString name, final GroupingHandler
groupingHandler, IDGenerator idGenerator) {
+ public BindingsImpl(final SimpleString name, final GroupingHandler
groupingHandler, StorageManager storageManager) {
this.groupingHandler = groupingHandler;
- this.idGenerator = idGenerator;
+ this.storageManager = storageManager;
this.name = name;
}
@@ -235,12 +236,16 @@ public final class BindingsImpl implements Bindings {
// The message needs a new ID during the redistribution
// We have to create the new ID only after we can guarantee it will be
routed
// otherwise we may leave large messages stranded in the folder
- final Message copyRedistribute = message.copy(idGenerator.generateID());
+ final Message copyRedistribute =
message.copy(storageManager.generateID());
if (logger.isDebugEnabled()) {
logger.debug("Message {} being copied as {}", message.getMessageID(),
copyRedistribute.getMessageID());
}
copyRedistribute.setAddress(message.getAddress());
+ if (context.getTransaction() == null) {
+ context.setTransaction(new TransactionImpl(storageManager));
+ }
+
bindingIndex.setIndex(nextPosition);
nextBinding.route(copyRedistribute, context);
return copyRedistribute;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index c926bbc8b6..6bf72f9948 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1387,12 +1387,11 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
*/
@Override
public Pair<RoutingContext, Message> redistribute(final Message message,
- final Queue
originatingQueue,
- final Transaction tx)
throws Exception {
+ final Queue
originatingQueue) throws Exception {
Bindings bindings =
addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
if (bindings != null && bindings.allowRedistribute()) {
- RoutingContext context = new RoutingContextImpl(tx);
+ RoutingContext context = new RoutingContextImpl(null);
// the redistributor will make a copy of the message if it can be
redistributed
Message redistributedMessage = bindings.redistribute(message,
originatingQueue, context);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 63724226bd..ac4992e18e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -32,7 +32,6 @@ import
org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,23 +113,23 @@ public class Redistributor implements Consumer {
return HandleStatus.NO_MATCH;
}
- final Transaction tx = new TransactionImpl(storageManager);
-
- final Pair<RoutingContext, Message> routingInfo =
postOffice.redistribute(reference.getMessage(), queue, tx);
+ final Pair<RoutingContext, Message> routingInfo =
postOffice.redistribute(reference.getMessage(), queue);
if (routingInfo == null) {
logger.debug("postOffice.redistribute return null for message {}",
reference);
- tx.rollback();
return HandleStatus.BUSY;
}
- postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);
+ RoutingContext context = routingInfo.getA();
+ Message message = routingInfo.getB();
+
+ postOffice.processRoute(message, context, false);
if (RefCountMessage.isRefTraceEnabled()) {
RefCountMessage.deferredDebug(reference.getMessage(),
"redistributing");
}
- ackRedistribution(reference, tx);
+ ackRedistribution(reference, context.getTransaction());
return HandleStatus.HANDLED;
}
diff --git
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
index 829abe9db6..8b791146f7 100644
---
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
+++
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerHeirarchyPerfTest.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
+import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -31,7 +32,6 @@ import
org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
@@ -52,7 +52,7 @@ public class WildcardAddressManagerHeirarchyPerfTest {
@Override
public Bindings createBindings(SimpleString address) {
- return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
+ return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}
diff --git
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
index a8ec84ec7a..3fe51c4b4b 100644
---
a/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
+++
b/tests/performance-jmh/src/main/java/org/apache/activemq/artemis/tests/performance/jmh/WildcardAddressManagerPerfTest.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
+import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -32,7 +33,6 @@ import
org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
@@ -54,7 +54,7 @@ public class WildcardAddressManagerPerfTest {
@Override
public Bindings createBindings(SimpleString address) {
- return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
+ return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
index 624a641375..26a37c866a 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -45,7 +46,6 @@ import
org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
import org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Test;
public class BindingsImplTest extends ActiveMQTestBase {
@@ -55,7 +55,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new
SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.ON_DEMAND;
- final Bindings bind = new BindingsImpl(null, null, new
SimpleIDGenerator(1000));
+ final Bindings bind = new BindingsImpl(null, null, new
NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new
FakeTransaction()));
assertEquals(1, fake.routedCount.get());
@@ -66,7 +66,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new
SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType = MessageLoadBalancingType.OFF;
- final Bindings bind = new BindingsImpl(null, null, new
SimpleIDGenerator(1000));
+ final Bindings bind = new BindingsImpl(null, null, new
NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new
FakeTransaction()));
assertEquals(0, fake.routedCount.get());
@@ -77,7 +77,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
final FakeRemoteBinding fake = new FakeRemoteBinding(new
SimpleString("a"));
fake.filter = null; // such that it wil match all messages
fake.messageLoadBalancingType =
MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION;
- final Bindings bind = new BindingsImpl(null, null, new
SimpleIDGenerator(1000));
+ final Bindings bind = new BindingsImpl(null, null, new
NullStorageManager(1000));
bind.addBinding(fake);
bind.route(new CoreMessage(0, 100), new RoutingContextImpl(new
FakeTransaction()));
assertEquals(0, fake.routedCount.get());
@@ -102,7 +102,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
private void internalTest(final boolean route) throws Exception {
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
- final Bindings bind = new BindingsImpl(null, null, new
SimpleIDGenerator(1000));
+ final Bindings bind = new BindingsImpl(null, null, new
NullStorageManager(1000));
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
index 874cf6a8fb..b9858f788b 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerPerfTest.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
+import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -34,7 +35,6 @@ import
org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl;
import org.apache.activemq.artemis.core.postoffice.impl.WildcardAddressManager;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.RoutingContext;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.junit.Ignore;
import org.junit.Test;
@@ -133,7 +133,7 @@ public class WildcardAddressManagerPerfTest {
@Override
public Bindings createBindings(SimpleString address) throws Exception {
- return new BindingsImpl(address, null, new SimpleIDGenerator(1000));
+ return new BindingsImpl(address, null, new NullStorageManager(1000));
}
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index c42914dce5..591fabbb4b 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -251,8 +251,7 @@ public class FakePostOffice implements PostOffice {
@Override
public Pair<RoutingContext, Message> redistribute(final Message message,
- final Queue
originatingQueue,
- final Transaction tx)
throws Exception {
+ final Queue
originatingQueue) throws Exception {
return null;
}