This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 04f6424928 ARTEMIS-4694 Redistribution issues with Almost Large Header
04f6424928 is described below
commit 04f6424928a3d9cbc26f6b26f7c2e9b7f7bf7869
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 20 10:09:26 2024 -0400
ARTEMIS-4694 Redistribution issues with Almost Large Header
Redistribution would add data to the record which would then in turn make
the record too large to redistribute.
The Redistributor and Bridges should not be removed.
Also a warning should be added to warn users about the situation.
---
.../jdbc/store/journal/JDBCJournalImpl.java | 5 +
.../activemq/artemis/core/journal/Journal.java | 2 +
.../core/journal/impl/FileWrapperJournal.java | 4 +
.../artemis/core/journal/impl/JournalImpl.java | 36 +--
.../artemis/journal/ActiveMQJournalLogger.java | 6 +
.../artemis/core/persistence/StorageManager.java | 5 +
.../journal/AbstractJournalStorageManager.java | 6 +-
.../impl/journal/JournalStorageManager.java | 1 +
.../core/replication/ReplicatedJournal.java | 5 +
.../artemis/core/server/ActiveMQServerLogger.java | 2 +-
.../core/server/cluster/impl/Redistributor.java | 19 +-
.../crossprotocol/LargeHeadersClusterTest.java | 292 +++++++++++++++++++++
.../integration/replication/ReplicationTest.java | 5 +
13 files changed, 366 insertions(+), 22 deletions(-)
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 88782dad6c..9f48ab3d4f 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -154,6 +154,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver
implements Journal {
return sqlProvider.getMaxBlobSize();
}
+ @Override
+ public long getWarningRecordSize() {
+ return sqlProvider.getMaxBlobSize() - 2048;
+ }
+
@Override
protected void createSchema() throws SQLException {
createTable(sqlProvider.getCreateJournalTableSQL());
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 43d602a492..67915fc9da 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -388,4 +388,6 @@ public interface Journal extends ActiveMQComponent {
* @return
*/
long getMaxRecordSize();
+
+ long getWarningRecordSize();
}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 0c2d9dc2b7..798260572b 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -131,6 +131,10 @@ public final class FileWrapperJournal extends JournalBase {
return journal.getMaxRecordSize();
}
+ @Override
+ public long getWarningRecordSize() {
+ return journal.getWarningRecordSize();
+ }
/**
* Write the record to the current file.
*/
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 6a3abac2f8..a76f06950d 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -54,6 +54,7 @@ import io.netty.util.collection.ByteObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.DummyCallback;
@@ -923,14 +924,10 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
logger.trace("scheduling appendAddRecord::id={}, userRecordType={},
record = {}", id, recordType, record);
}
- final long maxRecordSize = getMaxRecordSize();
final JournalInternalRecord addRecord = new JournalAddRecord(true, id,
recordType, persister, record);
final int addRecordEncodeSize = addRecord.getEncodeSize();
- if (addRecordEncodeSize > maxRecordSize) {
- //The record size should be larger than max record size only on the
large messages case.
- throw
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize,
maxRecordSize);
- }
+ checkRecordSize(addRecordEncodeSize, record);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync,
callback);
appendExecutor.execute(new Runnable() {
@@ -977,14 +974,9 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
logger.trace("scheduling appendAddEvent::id={}, userRecordType={},
record = {}", id, recordType, record);
}
- final long maxRecordSize = getMaxRecordSize();
final JournalInternalRecord addRecord = new
JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record);
- final int addRecordEncodeSize = addRecord.getEncodeSize();
- if (addRecordEncodeSize > maxRecordSize) {
- //The record size should be larger than max record size only on the
large messages case.
- throw
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize,
maxRecordSize);
- }
+ checkRecordSize(addRecord.getEncodeSize(), record);
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync,
callback);
appendExecutor.execute(() -> {
@@ -1012,6 +1004,18 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
result.get();
}
+ private void checkRecordSize(int addRecordEncodeSize, Object record) throws
ActiveMQIOErrorException {
+ if (addRecordEncodeSize > getWarningRecordSize()) {
+ long maxRecordSize = getMaxRecordSize();
+ ActiveMQJournalLogger.LOGGER.largeHeaderWarning(addRecordEncodeSize,
maxRecordSize, record);
+
+ if (addRecordEncodeSize > maxRecordSize) {
+ //The record size should be larger than max record size only on
the large messages case.
+ throw
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize,
maxRecordSize);
+ }
+ }
+ }
+
@Override
public void appendUpdateRecord(final long id,
final byte recordType,
@@ -1271,10 +1275,7 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id,
recordType, persister, record);
int encodeSize = addRecord.getEncodeSize();
- if (encodeSize > getMaxRecordSize()) {
- //The record size should be larger than max record size only on the
large messages case.
- throw
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize,
getMaxRecordSize());
- }
+ checkRecordSize(encodeSize, record);
appendExecutor.execute(new Runnable() {
@@ -2749,6 +2750,11 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
}
}
+ @Override
+ public long getWarningRecordSize() {
+ return getMaxRecordSize() - 2048;
+ }
+
private void flushExecutor(Executor executor) throws InterruptedException {
if (executor != null) {
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
index c1af466a4b..7b770cbb58 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
@@ -203,4 +203,10 @@ public interface ActiveMQJournalLogger {
// same as criticalIO but with the FileName associated (if there's a file
available)
@LogMessage(id = 144011, value = "Critical IO Exception happened: {} on
{}", level = LogMessage.Level.WARN)
void criticalIOFile(String message, String fileName, Throwable error);
+
+ @LogMessage(id = 144012, value = "Journal Record sized at {}, which is too
close to the max record Size at {}. Record = {}. Internal broker operations
such as redistribution and DLQ may be compromised. Move large headers into the
body of messages.", level = LogMessage.Level.WARN)
+ void largeHeaderWarning(long recordSize, long maxRecordSize, Object
originalData);
+
+
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 4644d6832d..76d94eac05 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -83,6 +83,11 @@ public interface StorageManager extends IDGenerator,
ActiveMQComponent {
return Long.MAX_VALUE;
}
+ default long getWarningRecordSize() {
+ /** Null journal is pretty much memory */
+ return Long.MAX_VALUE;
+ }
+
default void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception
{
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 61b2db6150..b63b88a9e7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -276,12 +276,16 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this);
}
-
@Override
public long getMaxRecordSize() {
return messageJournal.getMaxRecordSize();
}
+ @Override
+ public long getWarningRecordSize() {
+ return messageJournal.getWarningRecordSize();
+ }
+
/**
* Called during initialization. Used by implementations to setup
Journals, Stores etc...
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 85c1b4bd2d..665f13b0ff 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -539,6 +539,7 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(),
logger.getName());
logger.debug("Message header too large for {}", largeMessage);
+ new Exception("Trace").printStackTrace();
throw
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize,
maxRecordSize);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 32b3262d05..e8038ca8f4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -659,4 +659,9 @@ public class ReplicatedJournal implements Journal {
public long getMaxRecordSize() {
return localJournal.getMaxRecordSize();
}
+
+ @Override
+ public long getWarningRecordSize() {
+ return localJournal.getWarningRecordSize();
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 9d97f2d408..19e7facfe3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1201,7 +1201,7 @@ public interface ActiveMQServerLogger {
void failedToDealWithObjectProperty(SimpleString property, String
exceptionMessage);
@LogMessage(id = 222303, value = "Redistribution by {} of messageID = {}
failed", level = LogMessage.Level.WARN)
- void errorRedistributing(String queueName, long m, Throwable t);
+ void errorRedistributing(String queueName, String m, Throwable t);
@LogMessage(id = 222304, value = "Unable to load message from journal",
level = LogMessage.Level.WARN)
void unableToLoadMessageFromJournal(Throwable t);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 8d223a5ba6..222ad1db01 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -127,13 +128,21 @@ public class Redistributor implements Consumer {
RoutingContext context = routingInfo.getA();
Message message = routingInfo.getB();
- postOffice.processRoute(message, context, false);
+ try {
+ postOffice.processRoute(message, context, false);
- if (RefCountMessage.isRefTraceEnabled()) {
- RefCountMessage.deferredDebug(reference.getMessage(),
"redistributing");
- }
+ if (RefCountMessage.isRefTraceEnabled()) {
+ RefCountMessage.deferredDebug(reference.getMessage(),
"redistributing");
+ }
- ackRedistribution(reference, context.getTransaction());
+ ackRedistribution(reference, context.getTransaction());
+ } catch (Throwable e) {
+ if (context.getTransaction() != null) {
+ context.getTransaction().setAsync(true).rollback();
+ }
+
ActiveMQServerLogger.LOGGER.errorRedistributing(String.valueOf(this.queue.getName()),
String.valueOf(message), e);
+ return HandleStatus.NO_MATCH;
+ }
return HandleStatus.HANDLED;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java
new file mode 100644
index 0000000000..b8192fa762
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
+import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class LargeHeadersClusterTest extends ClusterTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final SimpleString queueName =
SimpleString.toSimpleString("queues.0");
+
+ // I'm taking any number that /2 = Odd
+ // to avoid perfect roundings and making sure messages are evenly
distributed
+ private static final int NUMBER_OF_MESSAGES = 77 * 2;
+
+ @Parameterized.Parameters(name = "protocol={0}")
+ public static Collection getParameters() {
+ return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}, {"OPENWIRE"}});
+ }
+
+ @Parameterized.Parameter(0)
+ public String protocol;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ }
+ private void startServers(MessageLoadBalancingType loadBalancingType)
throws Exception {
+ setupServers();
+
+ setRedistributionDelay(0);
+
+ setupCluster(loadBalancingType);
+
+ AddressSettings as = new
AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry"));
+
+ getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+ getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+
+ startServers(0);
+ startServers(1);
+
+ createQueue(SimpleString.toSimpleString("queues.expiry"));
+ createQueue(queueName);
+ }
+
+ private void createQueue(SimpleString queueName) throws Exception {
+ QueueConfiguration queueConfiguration = new
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST);
+ servers[0].createQueue(queueConfiguration);
+ servers[1].createQueue(queueConfiguration);
+ }
+
+ protected boolean isNetty() {
+ return true;
+ }
+
+ private ConnectionFactory getJmsConnectionFactory(int node) {
+ if (protocol.equals("AMQP")) {
+ return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
+ } else if (protocol.equals("OPENWIRE")) {
+ return new
org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:" + (61616 +
node));
+ } else if (protocol.equals("CORE")) {
+ return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 +
node));
+ } else {
+ Assert.fail("Protocol " + protocol + " unknown");
+ return null;
+ }
+ }
+
+ @Test
+ public void testGrowingHeaders() throws Exception {
+ startServers(MessageLoadBalancingType.ON_DEMAND);
+
+ ConnectionFactory cf0 = getJmsConnectionFactory(0);
+ ConnectionFactory cf1 = getJmsConnectionFactory(1);
+ try (Connection cn = cf0.createConnection()) {
+ Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer pd =
sn.createProducer(sn.createQueue(queueName.toString()));
+
+ StringBuffer bufferString = new StringBuffer();
+ for (int i = 0; i < 9_500; i++) {
+ bufferString.append("-");
+ }
+
+ int i = 0;
+
+ try (AssertionLoggerHandler loggerHandler = new
AssertionLoggerHandler()) {
+ try {
+ for (i = 0; i < 1_000; i++) {
+ if (i % 100 == 0) {
+ logger.info("Sent {} messages", i);
+ }
+ TextMessage message = sn.createTextMessage("hello " + i);
+ message.setStringProperty("large", bufferString.toString());
+ message.setBooleanProperty("newSender", false);
+ // we need to send two, one for each server to exercise the
load balancing
+ pd.send(message);
+ pd.send(message);
+ bufferString.append("-"); // growing the header
+ }
+ } catch (Throwable e) {
+ logger.warn("error at {}", i, e);
+ }
+ if (!protocol.equals("AMQP")) {
+ Assert.assertTrue(loggerHandler.findText("AMQ144012"));
+ }
+ }
+ }
+
+ try (Connection connection1 = cf1.createConnection()) {
+ Session session = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("queues.0"));
+ connection1.start();
+ receiveAllMessages(consumer, 1, m -> logger.debug("received {}", m));
+ }
+
+ try (Connection cn = cf0.createConnection()) {
+ Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer pd =
sn.createProducer(sn.createQueue(queueName.toString()));
+
+ try {
+ for (int i = 0; i < 1_000; i++) {
+ if (i % 100 == 0) {
+ logger.info("Sent {} messages", i);
+ }
+ TextMessage message = sn.createTextMessage("newSender " + i);
+ message.setBooleanProperty("newSender", true);
+ // we need to send two, one for each server to exercise the
load balancing
+ pd.send(message);
+ pd.send(message);
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ AtomicBoolean newSenderFound = new AtomicBoolean(false);
+
+ try (Connection connection1 = cf1.createConnection()) {
+ Session session = connection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("queues.0"));
+ connection1.start();
+
+ receiveAllMessages(consumer, 1000, m -> {
+ try {
+ if (m.getBooleanProperty("newSender")) {
+ newSenderFound.set(true);
+ }
+ } catch (Exception ignored) {
+ }
+ });
+
+ }
+
+
+ // messages should still flow
+ Assert.assertTrue(newSenderFound.get());
+ }
+
+
+ private int receiveAllMessages(MessageConsumer messageConsume, int
minMessages, Consumer<Message> messageProcessor) throws JMSException {
+
+ int msg = 0;
+
+ for (;;) {
+ Message message;
+
+ if (msg < minMessages) {
+ message = messageConsume.receive(10_000);
+ } else {
+ message = messageConsume.receive(1000);
+ }
+ if (message == null) {
+ break;
+ }
+
+ msg++;
+
+ if (messageProcessor != null) {
+ messageProcessor.accept(message);
+ }
+ }
+
+ return msg;
+ }
+
+ protected void setupCluster(final MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
+ setupClusterConnection("cluster0", "queues", messageLoadBalancingType,
1, isNetty(), 0, 1);
+
+ setupClusterConnection("cluster1", "queues", messageLoadBalancingType,
1, isNetty(), 1, 0);
+ }
+
+ protected void setRedistributionDelay(final long delay) {
+ }
+
+ protected void setupServers() throws Exception {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+
+ servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+ servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+ servers[0].addProtocolManagerFactory(new
OpenWireProtocolManagerFactory());
+ servers[1].addProtocolManagerFactory(new
OpenWireProtocolManagerFactory());
+
+ servers[0].getConfiguration().setJournalBufferSize_NIO(20 * 1024);
+ servers[0].getConfiguration().setJournalBufferSize_AIO(20 * 1024);
+ servers[1].getConfiguration().setJournalBufferSize_NIO(20 * 1024);
+ servers[1].getConfiguration().setJournalBufferSize_AIO(20 * 1024);
+
+ servers[0].getConfiguration().getAddressSettings().clear();
+ servers[0].getConfiguration().addAddressSetting("#", new
AddressSettings().setRedistributionDelay(10));
+
+ servers[1].getConfiguration().getAddressSettings().clear();
+ servers[1].getConfiguration().addAddressSetting("#", new
AddressSettings().setRedistributionDelay(10));
+ }
+
+ protected void stopServers() throws Exception {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
+
+ stopServers(0, 1);
+
+ clearServer(0, 1);
+ }
+
+ /**
+ * @param serverID
+ * @return
+ * @throws Exception
+ */
+ @Override
+ protected ConfigurationImpl createBasicConfig(final int serverID) {
+ ConfigurationImpl configuration = super.createBasicConfig(serverID);
+ configuration.setMessageExpiryScanPeriod(100);
+
+ return configuration;
+ }
+
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index dc23fa232c..3d2bb82ed3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -1068,6 +1068,11 @@ public final class ReplicationTest extends
ActiveMQTestBase {
public void replicationSyncFinished() {
// no-op
}
+
+ @Override
+ public long getWarningRecordSize() {
+ return getMaxRecordSize() - 2048;
+ }
}
private interface ExtraConfigurer {