This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new ca50b34  ARTEMIS-1975 Removing ThreadLocal for StorageManager
     new 3d5bd38  This closes #3051
ca50b34 is described below

commit ca50b3449e29129ecc2095074aec44ceafdf4eb2
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Wed Mar 25 13:54:34 2020 -0400

    ARTEMIS-1975 Removing ThreadLocal for StorageManager
---
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  8 +++++-
 .../amqp/broker/AMQPLargeMessagePersister.java     |  3 +-
 .../core/paging/cursor/PagedReferenceImpl.java     |  1 -
 .../activemq/artemis/core/paging/impl/Page.java    |  2 ++
 .../artemis/core/paging/impl/PagedMessageImpl.java | 10 ++++++-
 .../journal/AbstractJournalStorageManager.java     | 32 +++++-----------------
 .../persistence/impl/journal/DescribeJournal.java  |  2 +-
 .../impl/journal/JournalStorageManager.java        |  2 ++
 .../core/persistence/impl/journal/LargeBody.java   |  6 +++-
 .../impl/journal/LargeServerMessageImpl.java       |  5 ++++
 .../impl/nullpm/NullStorageLargeServerMessage.java |  5 ++++
 .../artemis/core/protocol/ServerPacketDecoder.java | 19 +++++++++++++
 .../protocol/core/ServerSessionPacketHandler.java  |  1 +
 .../protocol/core/impl/CoreProtocolManager.java    |  2 +-
 .../wireformat/ReplicationPageWriteMessage.java    | 14 +++++++++-
 .../artemis/core/server/LargeServerMessage.java    |  2 ++
 .../ActiveMQServerSideProtocolManagerFactory.java  | 16 +++++++----
 .../artemis/core/server/cluster/BackupManager.java |  4 +--
 .../core/server/cluster/ClusterController.java     |  4 +--
 .../cluster/impl/ClusterConnectionBridge.java      | 10 +++++--
 .../server/cluster/impl/ClusterConnectionImpl.java |  6 ++--
 .../server/impl/BackupRecoveryJournalLoader.java   |  4 ++-
 .../core/server/impl/LiveOnlyActivation.java       |  2 +-
 .../spi/core/protocol/EmbedMessageUtil.java        | 11 +++-----
 .../spi/core/protocol/MessagePersister.java        | 13 ++++++++-
 .../integration/cluster/ClusterControllerTest.java |  4 +--
 .../cluster/distribution/ClusterTestBase.java      |  2 +-
 tests/smoke-tests/pom.xml                          |  8 ++++++
 .../smoke/replicationflow/SoakPagingTest.java      | 13 ++++++++-
 29 files changed, 148 insertions(+), 63 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index f310fb3..fe51f15 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -91,7 +91,7 @@ public class AMQPLargeMessage extends AMQPMessage implements 
LargeServerMessage
 
    private volatile AmqpReadableBuffer parsingData;
 
-   private final StorageManager storageManager;
+   private StorageManager storageManager;
 
    public AMQPLargeMessage(long id,
                            long messageFormat,
@@ -184,6 +184,12 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
    }
 
    @Override
+   public void setStorageManager(StorageManager storageManager) {
+      largeBody.setStorageManager(storageManager);
+      this.storageManager = storageManager;
+   }
+
+   @Override
    public final boolean isDurable() {
       if (fileDurable != null) {
          return fileDurable.booleanValue();
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
index f573e6b..61b41c5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
@@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
-import 
org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
 import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -116,7 +115,7 @@ public class AMQPLargeMessagePersister extends 
MessagePersister {
          properties = null;
       }
 
-      AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, 
properties, null, AbstractJournalStorageManager.getThreadLocal());
+      AMQPLargeMessage largeMessage = new AMQPLargeMessage(id, format, 
properties, null, null);
 
       largeMessage.setFileDurable(durable);
       if (address != null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 7582eca..76f5a05 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -100,7 +100,6 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
 
    @Override
    public void onDelivery(Consumer<? super MessageReference> onDelivery) {
-      assert this.onDelivery == null;
       this.onDelivery = onDelivery;
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 315d566..b0e8419 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -34,6 +34,7 @@ import 
org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
@@ -381,6 +382,7 @@ public final class Page implements Comparable<Page> {
                         fileBuffer.position(endPosition + 1);
                         assert fileBuffer.get(endPosition) == Page.END_BYTE : 
"decoding cannot change end byte";
                         msg.initMessage(storage);
+                        assert msg.getMessage() instanceof LargeServerMessage 
&& ((LargeServerMessage)msg.getMessage()).getStorageManager() != null || 
!(msg.getMessage() instanceof LargeServerMessage);
                         if (logger.isTraceEnabled()) {
                            logger.tracef("Reading message %s on pageId=%d for 
address=%s", msg, pageId, storeName);
                         }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index 67d902f..f137af5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -89,10 +89,17 @@ public class PagedMessageImpl implements PagedMessage {
 
          ActiveMQBuffer buffer = 
ActiveMQBuffers.wrappedBuffer(largeMessageLazyData);
          lgMessage = LargeMessagePersister.getInstance().decode(buffer, 
lgMessage, null);
+         if (lgMessage.toMessage() instanceof LargeServerMessage) {
+            
((LargeServerMessage)lgMessage.toMessage()).setStorageManager(storage);
+         }
          lgMessage.toMessage().usageUp();
          lgMessage.setPaged();
          this.message = lgMessage.toMessage();
          largeMessageLazyData = null;
+      } else {
+         if (message != null && message instanceof LargeServerMessage) {
+            ((LargeServerMessage)message).setStorageManager(storageManager);
+         }
       }
    }
 
@@ -123,10 +130,11 @@ public class PagedMessageImpl implements PagedMessage {
          } else {
             this.message = storageManager.createLargeMessage().toMessage();
             LargeMessagePersister.getInstance().decode(buffer, 
(LargeServerMessage) message, null);
+            ((LargeServerMessage) message).setStorageManager(storageManager);
             ((LargeServerMessage) message).toMessage().usageUp();
          }
       } else {
-         this.message = MessagePersister.getInstance().decode(buffer, null, 
null);
+         this.message = MessagePersister.getInstance().decode(buffer, null, 
null, storageManager);
       }
 
       int queueIDsSize = buffer.readInt();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index aac727d..fdb1e8c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -135,18 +135,6 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    protected static final int CRITICAL_STOP_2 = 2;
 
 
-   public static ThreadLocal<StorageManager> storageManagerThreadLocal = new 
ThreadLocal<>();
-
-   /** Persisters may need to access this on reloading of the journal,
-    *  for large message processing */
-   public static void setupThreadLocal(StorageManager manager) {
-      storageManagerThreadLocal.set(manager);
-   }
-
-   public static StorageManager getThreadLocal() {
-      return storageManagerThreadLocal.get();
-   }
-
    private static final Logger logger = 
Logger.getLogger(AbstractJournalStorageManager.class);
 
    public enum JournalContent {
@@ -857,7 +845,6 @@ public abstract class AbstractJournalStorageManager extends 
CriticalComponentImp
 
       Map<Long, Message> messages = new HashMap<>();
       readLock();
-      setupThreadLocal(this);
       try {
 
          JournalLoadInformation info = messageJournal.load(records, 
preparedTransactions, new LargeMessageTXFailureCallback(this));
@@ -935,15 +922,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
 
                   case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
 
-                     Message message = 
MessagePersister.getInstance().decode(buff, null, pools);
-
-                     /* if (message instanceof LargeServerMessage) {
-                        try {
-                           ((LargeServerMessage) message).finishParse();
-                        } catch (Exception e) {
-                           logger.warn(e.getMessage(), e);
-                        }
-                     } */
+                     Message message = decodeMessage(pools, buff);
 
                      messages.put(record.id, message);
 
@@ -1240,11 +1219,14 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
          return info;
       } finally {
          readUnLock();
-         // need to clear it, otherwise we may have a permanent leak
-         setupThreadLocal(null);
       }
    }
 
+   private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer 
buff) {
+      Message message = MessagePersister.getInstance().decode(buff, null, 
pools, this);
+      return message;
+   }
+
    public void checkInvalidPageTransactions(PagingManager pagingManager,
                                             Set<PageTransactionInfo> 
invalidPageTransactions) {
       if (invalidPageTransactions != null && 
!invalidPageTransactions.isEmpty()) {
@@ -1795,7 +1777,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                   if (pools == null) {
                      pools = new CoreMessageObjectPools();
                   }
-                  Message message = 
MessagePersister.getInstance().decode(buff, null, pools);
+                  Message message = decodeMessage(pools, buff);
 
                   messages.put(record.id, message);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index 9ee72ea..3ac704e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -569,7 +569,7 @@ public final class DescribeJournal {
             return "ADD-MESSAGE is not supported any longer, use 
export/import";
          }
          case ADD_MESSAGE_PROTOCOL: {
-            Message message = MessagePersister.getInstance().decode(buffer, 
null, null);
+            Message message = MessagePersister.getInstance().decode(buffer, 
null, null, storageManager);
             return new MessageDescribe(message);
          }
          case ADD_REF: {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 567ee03..71a7f45 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -359,6 +359,8 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
 
       LargeMessagePersister.getInstance().decode(buff, largeMessage, null);
 
+      largeMessage.setStorageManager(this);
+
       if 
(largeMessage.toMessage().containsProperty(Message.HDR_ORIG_MESSAGE_ID)) {
          // for compatibility: couple with old behaviour, copying the old file 
to avoid message loss
          long originalMessageID = 
largeMessage.toMessage().getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
index 9d17f6e..dec2c2a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java
@@ -44,7 +44,7 @@ public class LargeBody {
 
    private long pendingRecordID = NO_PENDING_ID;
 
-   final StorageManager storageManager;
+   StorageManager storageManager;
 
    private long messageID = -1;
 
@@ -69,6 +69,10 @@ public class LargeBody {
       return storageManager;
    }
 
+   public void setStorageManager(StorageManager storageManager) {
+      this.storageManager = storageManager;
+   }
+
    public ByteBuffer map() throws Exception {
       ensureFileExists(true);
       if (!file.isOpen()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 3ab3f21..c8a82be 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -253,6 +253,11 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements CoreLar
    }
 
    @Override
+   public void setStorageManager(StorageManager storageManager) {
+      this.largeBody.setStorageManager(storageManager);
+   }
+
+   @Override
    public Message copy() {
       SequentialFile newfile = 
storageManager.createFileForLargeMessage(messageID, durable);
       LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, 
properties, newfile, messageID);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 76165e8..8252f34 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -71,6 +71,11 @@ class NullStorageLargeServerMessage extends CoreMessage 
implements CoreLargeServ
    }
 
    @Override
+   public void setStorageManager(StorageManager storageManager) {
+
+   }
+
+   @Override
    public synchronized void addBytes(ActiveMQBuffer bytes) {
       final int readableBytes = bytes.readableBytes();
       if (buffer == null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
index 311cd30..2c40da3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java
@@ -17,7 +17,9 @@
 package org.apache.activemq.artemis.core.protocol;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@@ -28,6 +30,7 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupResp
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
@@ -55,6 +58,7 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSen
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
 import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
@@ -88,6 +92,13 @@ public class ServerPacketDecoder extends ClientPacketDecoder 
{
 
    private static final long serialVersionUID = 3348673114388400766L;
 
+   private final StorageManager storageManager;
+
+   public ServerPacketDecoder(StorageManager storageManager) {
+      assert storageManager != null;
+      this.storageManager = storageManager;
+   }
+
    private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer 
in, CoreRemotingConnection connection) {
       final SessionSendMessage sendMessage;
 
@@ -265,6 +276,14 @@ public class ServerPacketDecoder extends 
ClientPacketDecoder {
 
       packet.decode(in);
 
+      if (packet instanceof MessagePacketI) {
+         Message message = ((MessagePacketI)packet).getMessage();
+         if (message instanceof LargeServerMessage) {
+            assert storageManager != null;
+            ((LargeServerMessage) message).setStorageManager(storageManager);
+         }
+      }
+
       return packet;
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 058d636..46702cf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -1031,6 +1031,7 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
             }
 
             LargeServerMessage message = currentLargeMessage;
+            currentLargeMessage.setStorageManager(storageManager);
             currentLargeMessage = null;
             session.doSend(session.getCurrentTransaction(), 
EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), 
storageManager), null, false, false);
          }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index c596c6e..2e60d63 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -127,7 +127,7 @@ public class CoreProtocolManager implements 
ProtocolManager<Interceptor> {
 
       Executor connectionExecutor = server.getExecutorFactory().getExecutor();
 
-      final CoreRemotingConnection rc = new RemotingConnectionImpl(new 
ServerPacketDecoder(),
+      final CoreRemotingConnection rc = new RemotingConnectionImpl(new 
ServerPacketDecoder(server.getStorageManager()),
                                                                    connection, 
incomingInterceptors, outgoingInterceptors, server.getNodeID(),
                                                                    
connectionExecutor);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
index 7aa9f8a..df43c14 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java
@@ -17,12 +17,13 @@
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.impl.PagedMessageImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.utils.DataConstants;
 
-public class ReplicationPageWriteMessage extends PacketImpl {
+public class ReplicationPageWriteMessage extends PacketImpl implements 
MessagePacketI {
 
    private int pageNumber;
 
@@ -84,6 +85,17 @@ public class ReplicationPageWriteMessage extends PacketImpl {
    }
 
    @Override
+   public Message getMessage() {
+      return pagedMessage.getMessage();
+   }
+
+   @Override
+   public ReplicationPageWriteMessage replaceMessage(Message message) {
+      // nothing to be done
+      return this;
+   }
+
+   @Override
    public boolean equals(Object obj) {
       if (this == obj)
          return true;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 9175781..2dcf404 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -64,5 +64,7 @@ public interface LargeServerMessage extends 
ReplicatedLargeMessage {
 
    LargeBody getLargeBody();
 
+   void setStorageManager(StorageManager storageManager);
+
    void finishParse() throws Exception;
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
index 209f68f..eae6c36 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ActiveMQServerSideProtocolManagerFactory.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster;
 
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.protocol.ServerPacketDecoder;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
@@ -31,6 +32,12 @@ public class ActiveMQServerSideProtocolManagerFactory 
implements ClientProtocolM
 
    ServerLocator locator;
 
+   final StorageManager storageManager;
+
+   private ActiveMQServerSideProtocolManagerFactory(StorageManager 
storageManager) {
+      this.storageManager = storageManager;
+   }
+
    @Override
    public ServerLocator getLocator() {
       return locator;
@@ -41,15 +48,12 @@ public class ActiveMQServerSideProtocolManagerFactory 
implements ClientProtocolM
       this.locator = locator;
    }
 
-   public static ActiveMQServerSideProtocolManagerFactory 
getInstance(ServerLocator locator) {
-      ActiveMQServerSideProtocolManagerFactory instance = new 
ActiveMQServerSideProtocolManagerFactory();
+   public static ActiveMQServerSideProtocolManagerFactory 
getInstance(ServerLocator locator, StorageManager storageManager) {
+      ActiveMQServerSideProtocolManagerFactory instance = new 
ActiveMQServerSideProtocolManagerFactory(storageManager);
       instance.setLocator(locator);
       return instance;
    }
 
-   private ActiveMQServerSideProtocolManagerFactory() {
-   }
-
    private static final long serialVersionUID = 1;
 
    @Override
@@ -66,7 +70,7 @@ public class ActiveMQServerSideProtocolManagerFactory 
implements ClientProtocolM
 
       @Override
       protected PacketDecoder createPacketDecoder() {
-         return new ServerPacketDecoder();
+         return new ServerPacketDecoder(storageManager);
       }
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
index 5aef7ac..25456b4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
@@ -227,7 +227,7 @@ public class BackupManager implements ActiveMQComponent {
             backupServerLocator.setIdentity("backupLocatorFor='" + server + 
"'");
             backupServerLocator.setReconnectAttempts(-1);
             backupServerLocator.setInitialConnectAttempts(-1);
-            
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator));
+            
backupServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(backupServerLocator,
 server.getStorageManager()));
          }
       }
 
@@ -359,7 +359,7 @@ public class BackupManager implements ActiveMQComponent {
             ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, 
tcConfigs);
             locator.setClusterConnection(true);
             locator.setRetryInterval(retryInterval);
-            
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
+            
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator,
 server.getStorageManager()));
             return locator;
          }
          return null;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 86cd0df..42fcf75 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -191,7 +191,7 @@ public class ClusterController implements ActiveMQComponent 
{
       
serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier());
       serverLocator.setMaxRetryInterval(config.getMaxRetryInterval());
       //this is used for replication so need to use the server packet decoder
-      
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
+      
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator,
 server.getStorageManager()));
       serverLocator.setThreadPools(server.getThreadPool(), 
server.getScheduledPool());
       try {
          serverLocator.initialize();
@@ -254,7 +254,7 @@ public class ClusterController implements ActiveMQComponent 
{
     * @return the Cluster Control
     */
    public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal 
sf) {
-      
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator()));
+      
sf.getServerLocator().setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(sf.getServerLocator(),
 server.getStorageManager()));
       return new ClusterControl(sf, server);
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index ade5d0c..77bc936 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -40,6 +40,7 @@ import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -79,6 +80,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
 
    private final long targetNodeEventUID;
 
+   private final StorageManager storageManager;
+
    private final ServerLocatorInternal discoveryLocator;
 
    private final String storeAndForwardPrefix;
@@ -111,7 +114,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
                                   final SimpleString 
managementNotificationAddress,
                                   final MessageFlowRecord flowRecord,
                                   final TransportConfiguration connector,
-                                  final String storeAndForwardPrefix) {
+                                  final String storeAndForwardPrefix,
+                                  final StorageManager storageManager) {
       super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // 
reconnectAttemptsOnSameNode means nothing on the clustering bridge since we 
always try the same
             retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, 
queue, executor, filterString, forwardingAddress, scheduledExecutor, 
transformer, useDuplicateDetection, user, password, server, 
ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultBridgeRoutingType()));
 
@@ -134,11 +138,13 @@ public class ClusterConnectionBridge extends BridgeImpl {
       }
 
       this.storeAndForwardPrefix = storeAndForwardPrefix;
+
+      this.storageManager = storageManager;
    }
 
    @Override
    protected ClientSessionFactoryInternal createSessionFactory() throws 
Exception {
-      
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
+      
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator,
 storageManager));
       ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) 
serverLocator.createSessionFactory(targetNodeID);
       //if it is null then its possible the broker was removed after a 
disconnect so lets try the original connectors
       if (factory == null) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index d82b2ea..b223fc1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -628,7 +628,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
          serverLocator.setAfterConnectionInternalListener(this);
 
-         
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
+         
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator,
 server.getStorageManager()));
 
          serverLocator.start(server.getExecutorFactory().getExecutor());
       }
@@ -816,7 +816,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
 
       targetLocator.setAfterConnectionInternalListener(this);
 
-      
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator));
+      
serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator,
 server.getStorageManager()));
 
       targetLocator.setNodeID(nodeId);
 
@@ -830,7 +830,7 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
       targetLocator.addIncomingInterceptor(new 
IncomingInterceptorLookingForExceptionMessage(manager, 
executorFactory.getExecutor()));
       MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, 
eventUID, targetNodeID, connector, queueName, queue);
 
-      ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, 
manager, targetLocator, serverLocator, initialConnectAttempts, 
reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, 
nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), 
record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, 
null, scheduledExecutor, null, useDuplicateDetection, clusterUser, 
clusterPassword, server, managementService.getManagementAddres [...]
+      ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, 
manager, targetLocator, serverLocator, initialConnectAttempts, 
reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, 
nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), 
record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, 
null, scheduledExecutor, null, useDuplicateDetection, clusterUser, 
clusterPassword, server, managementService.getManagementAddres [...]
 
       targetLocator.setIdentity("(Cluster-connection-bridge::" + 
bridge.toString() + "::" + this.toString() + ")");
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
index 77b9f3f..300ee70 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java
@@ -49,6 +49,7 @@ public class BackupRecoveryJournalLoader extends 
PostOfficeJournalLoader {
    private ActiveMQServer parentServer;
    private ServerLocator locator;
    private final ClusterController clusterController;
+   private final StorageManager storageManager;
 
    public BackupRecoveryJournalLoader(PostOffice postOffice,
                                       PagingManager pagingManager,
@@ -66,6 +67,7 @@ public class BackupRecoveryJournalLoader extends 
PostOfficeJournalLoader {
       this.parentServer = parentServer;
       this.locator = locator;
       this.clusterController = clusterController;
+      this.storageManager = storageManager;
    }
 
    @Override
@@ -88,7 +90,7 @@ public class BackupRecoveryJournalLoader extends 
PostOfficeJournalLoader {
                         ResourceManager resourceManager,
                         Map<SimpleString, List<Pair<byte[], Long>>> 
duplicateIDMap) throws Exception {
       ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, 
postOffice, nodeManager, clusterController, parentServer.getStorageManager());
-      
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
+      
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator,
 storageManager));
 
       try (ClientSessionFactory sessionFactory = 
locator.createSessionFactory()) {
          scaleDownHandler.scaleDown(sessionFactory, resourceManager, 
duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), 
parentServer.getNodeID());
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
index 4c7d4fb..cea644f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
@@ -139,7 +139,7 @@ public class LiveOnlyActivation extends Activation {
       try {
          scaleDownServerLocator = 
ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, activeMQServer);
          //use a Node Locator to connect to the cluster
-         
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator));
+         
scaleDownServerLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(scaleDownServerLocator,
 activeMQServer.getStorageManager()));
          LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null 
? new AnyLiveNodeLocatorForScaleDown(activeMQServer) : new 
NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), 
activeMQServer);
          scaleDownServerLocator.addClusterTopologyListener(nodeLocator);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
index 310971f..279816e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
@@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
-import 
org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.jboss.logging.Logger;
@@ -95,18 +94,16 @@ public class EmbedMessageUtil {
    }
 
    private static Message readEncoded(ICoreMessage message, StorageManager 
storageManager, ActiveMQBuffer buffer) {
-
-
-      AbstractJournalStorageManager.setupThreadLocal(storageManager);
       try {
-         Message returnMessage = MessagePersister.getInstance().decode(buffer, 
null, null);
+         Message returnMessage = MessagePersister.getInstance().decode(buffer, 
null, null, storageManager);
+         if (returnMessage instanceof LargeServerMessage) {
+            
((LargeServerMessage)returnMessage).setStorageManager(storageManager);
+         }
          returnMessage.setMessageID(message.getMessageID());
          return returnMessage;
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);
          return message;
-      } finally {
-         AbstractJournalStorageManager.setupThreadLocal(null);
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
index 808d094..d4d47d6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessagePersister.java
@@ -24,6 +24,8 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister;
 import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.jboss.logging.Logger;
 
 import static 
org.apache.activemq.artemis.core.persistence.PersisterIDs.MAX_PERSISTERS;
@@ -105,11 +107,20 @@ public class MessagePersister implements 
Persister<Message> {
 
    @Override
    public Message decode(ActiveMQBuffer buffer, Message record, 
CoreMessageObjectPools pools) {
+      return decode(buffer, record, pools, null);
+   }
+
+
+   public Message decode(ActiveMQBuffer buffer, Message record, 
CoreMessageObjectPools pools, StorageManager storageManager) {
       byte protocol = buffer.readByte();
       Persister<Message> persister = getPersister(protocol);
       if (persister == null) {
          throw new NullPointerException("couldn't find factory for type=" + 
protocol);
       }
-      return persister.decode(buffer, record, pools);
+      Message message = persister.decode(buffer, record, pools);
+      if (message instanceof LargeServerMessage) {
+         ((LargeServerMessage) message).setStorageManager(storageManager);
+      }
+      return message;
    }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
index f7cbd62..50f0a62 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java
@@ -126,7 +126,7 @@ public class ClusterControllerTest extends ClusterTestBase {
    @Test
    public void controlWithDifferentConnector() throws Exception {
       try (ServerLocatorImpl locator = (ServerLocatorImpl) 
createInVMNonHALocator()) {
-         
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
+         
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator,
 servers[0].getStorageManager()));
          ClusterController controller = new ClusterController(getServer(0), 
getServer(0).getScheduledPool());
          ClusterControl clusterControl = 
controller.connectToNodeInCluster((ClientSessionFactoryInternal) 
locator.createSessionFactory());
          clusterControl.authorize();
@@ -136,7 +136,7 @@ public class ClusterControllerTest extends ClusterTestBase {
    @Test
    public void controlWithDifferentPassword() throws Exception {
       try (ServerLocatorImpl locator = (ServerLocatorImpl) 
createInVMNonHALocator()) {
-         
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
+         
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator,
 servers[0].getStorageManager()));
          ClusterController controller = new ClusterController(getServer(1), 
getServer(1).getScheduledPool());
          ClusterControl clusterControl = 
controller.connectToNodeInCluster((ClientSessionFactoryInternal) 
locator.createSessionFactory());
          try {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 4f92cfd..32d5d29 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -1396,7 +1396,7 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
 
       setSessionFactoryCreateLocator(node, ha, serverTotc);
 
-      
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node]));
+      
locators[node].setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locators[node],
 servers[node].getStorageManager()));
 
       
locators[node].setBlockOnNonDurableSend(true).setBlockOnDurableSend(true);
       addServerLocator(locators[node]);
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 5efbfcb..2482a8d 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -147,6 +147,10 @@
                      <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
                      <instance>${basedir}/target/replicated-static0</instance>
                      
<configuration>${basedir}/target/classes/servers/replicated-static0</configuration>
+                     <args>
+                        <arg>--java-options</arg>
+                        <arg>-ea</arg>
+                     </args>
                   </configuration>
                </execution>
                <execution>
@@ -160,6 +164,10 @@
                      <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
                      <instance>${basedir}/target/replicated-static1</instance>
                      
<configuration>${basedir}/target/classes/servers/replicated-static1</configuration>
+                     <args>
+                        <arg>--java-options</arg>
+                        <arg>-ea</arg>
+                     </args>
                   </configuration>
                </execution>
                <execution>
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
index f72990d..d89fd4a 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
@@ -115,6 +115,11 @@ public class SoakPagingTest extends SmokeTestBase {
 
    public void produce(ConnectionFactory factory) {
       try {
+
+         StringBuffer bufferlarge = new StringBuffer();
+         while (bufferlarge.length() < 110000) {
+            bufferlarge.append("asdflkajdhsf akljsdfh akljsdfh alksjdfh 
alkdjsf ");
+         }
          Connection connection = factory.createConnection("admin", "admin");
 
          connection.start();
@@ -125,7 +130,13 @@ public class SoakPagingTest extends SmokeTestBase {
 
          int i = 0;
          while (true) {
-            Message message = 
session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf");
+
+            Message message;
+            if (i % 100 == 0) {
+               message = session.createTextMessage(bufferlarge.toString());
+            } else {
+               message = 
session.createTextMessage("fkjdslkfjdskljf;lkdsjf;kdsajf;lkjdf;kdsajf;kjdsa;flkjdsa;lfkjdsa;flkj;dsakjf;dsajf;askjd;fkj;dsajflaskfja;fdlkajs;lfdkja;kfj;dsakfj;akdsjf;dsakjf;akfj;lakdsjf;lkasjdf;ksajf;kjdsa;fkj;adskjf;akdsjf;kja;sdkfj;akdsjf;akjdsf;adskjf;akdsjf;askfj;aksjfkdjafndmnfmdsnfjadshfjdsalkfjads;fkjdsa;kfja;skfj;akjfd;akjfd;ksaj;fkja;kfj;dsakjf;dsakjf;dksjf;akdsjf;kdsajf");
+            }
 
             messageProducer.send(message);
             produced.incrementAndGet();

Reply via email to