This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 04f6424928 ARTEMIS-4694 Redistribution issues with Almost Large Header
04f6424928 is described below

commit 04f6424928a3d9cbc26f6b26f7c2e9b7f7bf7869
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 20 10:09:26 2024 -0400

    ARTEMIS-4694 Redistribution issues with Almost Large Header
    
    Redistribution would add data to the record which would then in turn make 
the record too large to redistribute.
    
    The Redistributor and Bridges should not be removed.
    
    Also a warning should be added to warn users about the situation.
---
 .../jdbc/store/journal/JDBCJournalImpl.java        |   5 +
 .../activemq/artemis/core/journal/Journal.java     |   2 +
 .../core/journal/impl/FileWrapperJournal.java      |   4 +
 .../artemis/core/journal/impl/JournalImpl.java     |  36 +--
 .../artemis/journal/ActiveMQJournalLogger.java     |   6 +
 .../artemis/core/persistence/StorageManager.java   |   5 +
 .../journal/AbstractJournalStorageManager.java     |   6 +-
 .../impl/journal/JournalStorageManager.java        |   1 +
 .../core/replication/ReplicatedJournal.java        |   5 +
 .../artemis/core/server/ActiveMQServerLogger.java  |   2 +-
 .../core/server/cluster/impl/Redistributor.java    |  19 +-
 .../crossprotocol/LargeHeadersClusterTest.java     | 292 +++++++++++++++++++++
 .../integration/replication/ReplicationTest.java   |   5 +
 13 files changed, 366 insertions(+), 22 deletions(-)

diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 88782dad6c..9f48ab3d4f 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -154,6 +154,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       return sqlProvider.getMaxBlobSize();
    }
 
+   @Override
+   public long getWarningRecordSize() {
+      return sqlProvider.getMaxBlobSize() - 2048;
+   }
+
    @Override
    protected void createSchema() throws SQLException {
       createTable(sqlProvider.getCreateJournalTableSQL());
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 43d602a492..67915fc9da 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -388,4 +388,6 @@ public interface Journal extends ActiveMQComponent {
     * @return
     */
    long getMaxRecordSize();
+
+   long getWarningRecordSize();
 }
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 0c2d9dc2b7..798260572b 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -131,6 +131,10 @@ public final class FileWrapperJournal extends JournalBase {
       return journal.getMaxRecordSize();
    }
 
+   @Override
+   public long getWarningRecordSize() {
+      return journal.getWarningRecordSize();
+   }
    /**
     * Write the record to the current file.
     */
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 6a3abac2f8..a76f06950d 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -54,6 +54,7 @@ import io.netty.util.collection.ByteObjectHashMap;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.io.DummyCallback;
@@ -923,14 +924,10 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
          logger.trace("scheduling appendAddRecord::id={}, userRecordType={}, 
record = {}", id, recordType, record);
       }
 
-      final long maxRecordSize = getMaxRecordSize();
       final JournalInternalRecord addRecord = new JournalAddRecord(true, id, 
recordType, persister, record);
       final int addRecordEncodeSize = addRecord.getEncodeSize();
 
-      if (addRecordEncodeSize > maxRecordSize) {
-         //The record size should be larger than max record size only on the 
large messages case.
-         throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, 
maxRecordSize);
-      }
+      checkRecordSize(addRecordEncodeSize, record);
 
       final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);
       appendExecutor.execute(new Runnable() {
@@ -977,14 +974,9 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
          logger.trace("scheduling appendAddEvent::id={}, userRecordType={}, 
record = {}", id, recordType, record);
       }
 
-      final long maxRecordSize = getMaxRecordSize();
       final JournalInternalRecord addRecord = new 
JournalAddRecord(JournalImpl.EVENT_RECORD, id, recordType, persister, record);
-      final int addRecordEncodeSize = addRecord.getEncodeSize();
 
-      if (addRecordEncodeSize > maxRecordSize) {
-         //The record size should be larger than max record size only on the 
large messages case.
-         throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, 
maxRecordSize);
-      }
+      checkRecordSize(addRecord.getEncodeSize(), record);
 
       final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);
       appendExecutor.execute(() -> {
@@ -1012,6 +1004,18 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       result.get();
    }
 
+   private void checkRecordSize(int addRecordEncodeSize, Object record) throws 
ActiveMQIOErrorException {
+      if (addRecordEncodeSize > getWarningRecordSize()) {
+         long maxRecordSize = getMaxRecordSize();
+         ActiveMQJournalLogger.LOGGER.largeHeaderWarning(addRecordEncodeSize, 
maxRecordSize, record);
+
+         if (addRecordEncodeSize > maxRecordSize) {
+            //The record size should be larger than max record size only on 
the large messages case.
+            throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, 
maxRecordSize);
+         }
+      }
+   }
+
    @Override
    public void appendUpdateRecord(final long id,
                                   final byte recordType,
@@ -1271,10 +1275,7 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, 
recordType, persister, record);
       int encodeSize = addRecord.getEncodeSize();
 
-      if (encodeSize > getMaxRecordSize()) {
-         //The record size should be larger than max record size only on the 
large messages case.
-         throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(encodeSize, 
getMaxRecordSize());
-      }
+      checkRecordSize(encodeSize, record);
 
       appendExecutor.execute(new Runnable() {
 
@@ -2749,6 +2750,11 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       }
    }
 
+   @Override
+   public long getWarningRecordSize() {
+      return getMaxRecordSize() - 2048;
+   }
+
    private void flushExecutor(Executor executor) throws InterruptedException {
 
       if (executor != null) {
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
index c1af466a4b..7b770cbb58 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
@@ -203,4 +203,10 @@ public interface ActiveMQJournalLogger {
    // same as criticalIO but with the FileName associated (if there's a file 
available)
    @LogMessage(id = 144011, value = "Critical IO Exception happened: {} on 
{}", level = LogMessage.Level.WARN)
    void criticalIOFile(String message, String fileName, Throwable error);
+
+   @LogMessage(id = 144012, value = "Journal Record sized at {}, which is too 
close to the max record Size at {}. Record = {}. Internal broker operations 
such as redistribution and DLQ may be compromised. Move large headers into the 
body of messages.", level = LogMessage.Level.WARN)
+   void largeHeaderWarning(long recordSize, long maxRecordSize, Object 
originalData);
+
+
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 4644d6832d..76d94eac05 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -83,6 +83,11 @@ public interface StorageManager extends IDGenerator, 
ActiveMQComponent {
       return Long.MAX_VALUE;
    }
 
+   default long getWarningRecordSize() {
+      /** Null journal is pretty much memory */
+      return Long.MAX_VALUE;
+   }
+
    default void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception 
{
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 61b2db6150..b63b88a9e7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -276,12 +276,16 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
       idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this);
    }
 
-
    @Override
    public long getMaxRecordSize() {
       return messageJournal.getMaxRecordSize();
    }
 
+   @Override
+   public long getWarningRecordSize() {
+      return messageJournal.getWarningRecordSize();
+   }
+
 
    /**
     * Called during initialization.  Used by implementations to setup 
Journals, Stores etc...
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 85c1b4bd2d..665f13b0ff 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -539,6 +539,7 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
                
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(),
 logger.getName());
 
                logger.debug("Message header too large for {}", largeMessage);
+               new Exception("Trace").printStackTrace();
 
                throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, 
maxRecordSize);
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 32b3262d05..e8038ca8f4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -659,4 +659,9 @@ public class ReplicatedJournal implements Journal {
    public long getMaxRecordSize() {
       return localJournal.getMaxRecordSize();
    }
+
+   @Override
+   public long getWarningRecordSize() {
+      return localJournal.getWarningRecordSize();
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 9d97f2d408..19e7facfe3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1201,7 +1201,7 @@ public interface ActiveMQServerLogger {
    void failedToDealWithObjectProperty(SimpleString property, String 
exceptionMessage);
 
    @LogMessage(id = 222303, value = "Redistribution by {} of messageID = {} 
failed", level = LogMessage.Level.WARN)
-   void errorRedistributing(String queueName, long m, Throwable t);
+   void errorRedistributing(String queueName, String m, Throwable t);
 
    @LogMessage(id = 222304, value = "Unable to load message from journal", 
level = LogMessage.Level.WARN)
    void unableToLoadMessageFromJournal(Throwable t);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 8d223a5ba6..222ad1db01 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -127,13 +128,21 @@ public class Redistributor implements Consumer {
       RoutingContext context = routingInfo.getA();
       Message message = routingInfo.getB();
 
-      postOffice.processRoute(message, context, false);
+      try {
+         postOffice.processRoute(message, context, false);
 
-      if (RefCountMessage.isRefTraceEnabled()) {
-         RefCountMessage.deferredDebug(reference.getMessage(), 
"redistributing");
-      }
+         if (RefCountMessage.isRefTraceEnabled()) {
+            RefCountMessage.deferredDebug(reference.getMessage(), 
"redistributing");
+         }
 
-      ackRedistribution(reference, context.getTransaction());
+         ackRedistribution(reference, context.getTransaction());
+      } catch (Throwable e) {
+         if (context.getTransaction() != null) {
+            context.getTransaction().setAsync(true).rollback();
+         }
+         
ActiveMQServerLogger.LOGGER.errorRedistributing(String.valueOf(this.queue.getName()),
 String.valueOf(message), e);
+         return HandleStatus.NO_MATCH;
+      }
 
       return HandleStatus.HANDLED;
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java
new file mode 100644
index 0000000000..b8192fa762
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/LargeHeadersClusterTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
+import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import 
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class LargeHeadersClusterTest extends ClusterTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final SimpleString queueName = 
SimpleString.toSimpleString("queues.0");
+
+   // I'm taking any number that /2 = Odd
+   // to avoid perfect roundings and making sure messages are evenly 
distributed
+   private static final int NUMBER_OF_MESSAGES = 77 * 2;
+
+   @Parameterized.Parameters(name = "protocol={0}")
+   public static Collection getParameters() {
+      return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}, {"OPENWIRE"}});
+   }
+
+   @Parameterized.Parameter(0)
+   public String protocol;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+   }
+   private void startServers(MessageLoadBalancingType loadBalancingType) 
throws Exception {
+      setupServers();
+
+      setRedistributionDelay(0);
+
+      setupCluster(loadBalancingType);
+
+      AddressSettings as = new 
AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry"));
+
+      getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+      getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+
+      startServers(0);
+      startServers(1);
+
+      createQueue(SimpleString.toSimpleString("queues.expiry"));
+      createQueue(queueName);
+   }
+
+   private void createQueue(SimpleString queueName) throws Exception {
+      QueueConfiguration queueConfiguration = new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST);
+      servers[0].createQueue(queueConfiguration);
+      servers[1].createQueue(queueConfiguration);
+   }
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   private ConnectionFactory getJmsConnectionFactory(int node) {
+      if (protocol.equals("AMQP")) {
+         return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
+      } else if (protocol.equals("OPENWIRE")) {
+         return new 
org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:" + (61616 + 
node));
+      } else if (protocol.equals("CORE")) {
+         return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + 
node));
+      } else {
+         Assert.fail("Protocol " + protocol + " unknown");
+         return null;
+      }
+   }
+
+   @Test
+   public void testGrowingHeaders() throws Exception {
+      startServers(MessageLoadBalancingType.ON_DEMAND);
+
+      ConnectionFactory cf0 = getJmsConnectionFactory(0);
+      ConnectionFactory cf1 = getJmsConnectionFactory(1);
+      try (Connection cn = cf0.createConnection()) {
+         Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer pd = 
sn.createProducer(sn.createQueue(queueName.toString()));
+
+         StringBuffer bufferString = new StringBuffer();
+         for (int i = 0; i < 9_500; i++) {
+            bufferString.append("-");
+         }
+
+         int i = 0;
+
+         try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+            try {
+               for (i = 0; i < 1_000; i++) {
+                  if (i % 100 == 0) {
+                     logger.info("Sent {} messages", i);
+                  }
+                  TextMessage message = sn.createTextMessage("hello " + i);
+                  message.setStringProperty("large", bufferString.toString());
+                  message.setBooleanProperty("newSender", false);
+                  // we need to send two, one for each server to exercise the 
load balancing
+                  pd.send(message);
+                  pd.send(message);
+                  bufferString.append("-"); // growing the header
+               }
+            } catch (Throwable e) {
+               logger.warn("error at {}", i, e);
+            }
+            if (!protocol.equals("AMQP")) {
+               Assert.assertTrue(loggerHandler.findText("AMQ144012"));
+            }
+         }
+      }
+
+      try (Connection connection1 = cf1.createConnection()) {
+         Session session = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue("queues.0"));
+         connection1.start();
+         receiveAllMessages(consumer, 1, m -> logger.debug("received {}", m));
+      }
+
+      try (Connection cn = cf0.createConnection()) {
+         Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer pd = 
sn.createProducer(sn.createQueue(queueName.toString()));
+
+         try {
+            for (int i = 0; i < 1_000; i++) {
+               if (i % 100 == 0) {
+                  logger.info("Sent {} messages", i);
+               }
+               TextMessage message = sn.createTextMessage("newSender " + i);
+               message.setBooleanProperty("newSender", true);
+               // we need to send two, one for each server to exercise the 
load balancing
+               pd.send(message);
+               pd.send(message);
+            }
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+         }
+      }
+
+      AtomicBoolean newSenderFound = new AtomicBoolean(false);
+
+      try (Connection connection1 = cf1.createConnection()) {
+         Session session = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue("queues.0"));
+         connection1.start();
+
+         receiveAllMessages(consumer, 1000, m -> {
+            try {
+               if (m.getBooleanProperty("newSender")) {
+                  newSenderFound.set(true);
+               }
+            } catch (Exception ignored) {
+            }
+         });
+
+      }
+
+
+      // messages should still flow
+      Assert.assertTrue(newSenderFound.get());
+   }
+
+
+   private int receiveAllMessages(MessageConsumer messageConsume, int 
minMessages, Consumer<Message> messageProcessor) throws JMSException {
+
+      int msg = 0;
+
+      for (;;) {
+         Message message;
+
+         if (msg < minMessages) {
+            message = messageConsume.receive(10_000);
+         } else {
+            message = messageConsume.receive(1000);
+         }
+         if (message == null) {
+            break;
+         }
+
+         msg++;
+
+         if (messageProcessor != null) {
+            messageProcessor.accept(message);
+         }
+      }
+
+      return msg;
+   }
+
+   protected void setupCluster(final MessageLoadBalancingType 
messageLoadBalancingType) throws Exception {
+      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 
1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 
1, isNetty(), 1, 0);
+   }
+
+   protected void setRedistributionDelay(final long delay) {
+   }
+
+   protected void setupServers() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+      servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
+      servers[0].addProtocolManagerFactory(new 
OpenWireProtocolManagerFactory());
+      servers[1].addProtocolManagerFactory(new 
OpenWireProtocolManagerFactory());
+
+      servers[0].getConfiguration().setJournalBufferSize_NIO(20 * 1024);
+      servers[0].getConfiguration().setJournalBufferSize_AIO(20 * 1024);
+      servers[1].getConfiguration().setJournalBufferSize_NIO(20 * 1024);
+      servers[1].getConfiguration().setJournalBufferSize_AIO(20 * 1024);
+
+      servers[0].getConfiguration().getAddressSettings().clear();
+      servers[0].getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(10));
+
+      servers[1].getConfiguration().getAddressSettings().clear();
+      servers[1].getConfiguration().addAddressSetting("#", new 
AddressSettings().setRedistributionDelay(10));
+   }
+
+   protected void stopServers() throws Exception {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      closeAllServerLocatorsFactories();
+
+      stopServers(0, 1);
+
+      clearServer(0, 1);
+   }
+
+   /**
+    * @param serverID
+    * @return
+    * @throws Exception
+    */
+   @Override
+   protected ConfigurationImpl createBasicConfig(final int serverID) {
+      ConfigurationImpl configuration = super.createBasicConfig(serverID);
+      configuration.setMessageExpiryScanPeriod(100);
+
+      return configuration;
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index dc23fa232c..3d2bb82ed3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -1068,6 +1068,11 @@ public final class ReplicationTest extends 
ActiveMQTestBase {
       public void replicationSyncFinished() {
          // no-op
       }
+
+      @Override
+      public long getWarningRecordSize() {
+         return getMaxRecordSize() - 2048;
+      }
    }
 
    private interface ExtraConfigurer {

Reply via email to