This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a25650ca3e ARTEMIS-5428 Hardening of the broker around PagingWrites / 
Shutdown
a25650ca3e is described below

commit a25650ca3ec1ba524748f992bd2a5bf315136e49
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Apr 14 16:43:26 2025 -0400

    ARTEMIS-5428 Hardening of the broker around PagingWrites / Shutdown
    
    While the broker is being stopped, and if the system is paging, I
    recently found a few issues that I am addressing here:
    
    - When the server is stopped, the IDGenerator could be used after
    the snapshot is generated, what could lead to data loss in case of a
    regular shutdown while system under heavy load
    
    - delayed Transaction could be completed after exceptions
    (the markRollbackOnly or rollback should clear any pending state)
    
    - The Acceptor notification is being called after the IDGenerator is
    stopped (found this by accident after stopping the IDGEnerator).
---
 .../artemis/utils/actors/ArtemisExecutor.java      |   4 +
 .../core/protocol/openwire/OpenWireConnection.java |   4 +
 .../core/management/impl/AcceptorControlImpl.java  |   1 +
 .../cursor/impl/PageSubscriptionCounterImpl.java   |   3 +
 .../artemis/core/paging/impl/PageTimedWriter.java  | 122 +++++--
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  13 +-
 .../artemis/core/persistence/StorageManager.java   |  14 +
 .../journal/AbstractJournalStorageManager.java     |  14 +-
 .../impl/journal/BatchingIDGenerator.java          |  59 +++-
 .../impl/journal/JournalStorageManager.java        |  33 +-
 .../core/postoffice/impl/PostOfficeImpl.java       |   1 +
 .../core/remoting/impl/invm/InVMAcceptor.java      |  11 +-
 .../core/remoting/impl/netty/NettyAcceptor.java    |  75 ++--
 .../core/remoting/server/RemotingService.java      |   5 +
 .../remoting/server/impl/RemotingServiceImpl.java  |  55 ++-
 .../artemis/core/server/ActiveMQMessageBundle.java |   4 +
 .../core/server/impl/ActiveMQServerImpl.java       |  46 ++-
 .../core/transaction/impl/TransactionImpl.java     |  31 +-
 .../artemis/spi/core/remoting/Acceptor.java        |   2 +
 .../paging/impl/PagingManagerImplAccessor.java     |  11 +
 .../paging/impl/PagingManagerTestAccessor.java     |  38 --
 ...lAccessor.java => PagingStoreImplAccessor.java} |  18 +-
 .../integration/bridge/BridgeSimulationTest.java   | 194 ++++++++++
 .../integration/management/QueueControlTest.java   |   4 +-
 .../integration/paging/MaxMessagesPagingTest.java  |   6 +-
 .../transaction/impl/TransactionImplAccessor.java} |  16 +-
 .../core/paging/impl/PageTimedWriterUnitTest.java  | 392 ++++++++++++++++++---
 .../persistence/impl/BatchIDGeneratorUnitTest.java |  12 +-
 .../server/impl/fake/FakeAcceptorFactory.java      |   5 +
 29 files changed, 964 insertions(+), 229 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 43bbd9cf0c..f878af49a0 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -41,6 +41,10 @@ public interface ArtemisExecutor extends Executor {
       return 0;
    }
 
+   default boolean inHandler() {
+      return false;
+   }
+
    /**
     * To be used to flush an executor from a different thread.
     * <b>WARNING</b>: Do not call this within the executor. That would be 
stoopid ;)
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index cd46df13bc..732606594b 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -690,6 +690,10 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    }
 
    private void disconnect(ActiveMQException me, String reason, boolean fail) {
+      ThresholdActor<Command> localActor = openWireActor;
+      if (localActor != null) {
+         localActor.shutdown();
+      }
 
       if (context == null || destroyed) {
          return;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
index dd1ebd5045..8c1648cd88 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AcceptorControlImpl.java
@@ -136,6 +136,7 @@ public class AcceptorControlImpl extends AbstractControl 
implements AcceptorCont
       }
       clearIO();
       try {
+         acceptor.notifyStop();
          acceptor.stop();
       } finally {
          blockOnIO();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 807bd811ae..b267f4232e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -248,6 +248,9 @@ public class PageSubscriptionCounterImpl extends 
BasePagingCounter {
       try (ArtemisCloseable lock = storage.closeableReadLock()) {
          synchronized (this) {
             if (recordID >= 0) {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Deleting page counter with recordID={}, using 
TX={}", this.recordID, tx.getID());
+               }
                storage.deletePageCounter(tx.getID(), this.recordID);
                tx.setContainsPersistent();
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
index 4725c2d7ae..a6f584f677 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java
@@ -25,7 +25,9 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -34,6 +36,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ArtemisCloseable;
+import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +59,7 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
    private final int maxCredits;
 
    private static final AtomicIntegerFieldUpdater<PageTimedWriter> 
pendingTasksUpdater = 
AtomicIntegerFieldUpdater.newUpdater(PageTimedWriter.class, "pendingTasks");
+   private final ReusableLatch pendingProcessings = new ReusableLatch(0);
 
    public boolean hasPendingIO() {
       return pendingTasksUpdater.get(this) > 0;
@@ -94,9 +98,15 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
    }
 
    @Override
-   public synchronized void stop() {
-      super.stop();
-      processMessages();
+   public void stop() {
+      synchronized (this) {
+         super.stop();
+      }
+      try {
+         pendingProcessings.await(30, TimeUnit.SECONDS);
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+      }
    }
 
    /**
@@ -113,16 +123,21 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
                                     Transaction tx,
                                     RouteContextList listCtx) {
 
-      if (!isStarted()) {
-         throw new IllegalStateException("PageWriter Service is stopped");
-      }
+      logger.trace("Adding paged message {} to paged writer", message);
+
       int credits = Math.min(message.getEncodeSize() + 
PageReadWriter.SIZE_RECORD, maxCredits);
       writeCredits.acquireUninterruptibly(credits);
-      if (tx != null) {
-         // this will delay the commit record until the portion of this task 
has been completed
-         tx.delay();
-      }
       synchronized (this) {
+         if (!isStarted()) {
+            writeCredits.release(credits);
+            throw new IllegalStateException("PageWriter Service is stopped");
+         }
+
+         if (tx != null) {
+            // this will delay the commit record until the portion of this 
task has been completed
+            tx.delay();
+         }
+
          final boolean replicated = storageManager.isReplicated();
          PageEvent event = new PageEvent(context, message, tx, listCtx, 
credits, replicated);
          context.storeLineUp();
@@ -132,13 +147,16 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
          this.pageEvents.add(event);
          delay();
       }
-
    }
 
-   private synchronized  PageEvent[] extractPendingEvents() {
+   private synchronized PageEvent[] extractPendingEvents() {
+      if (!isStarted()) {
+         return null;
+      }
       if (pageEvents.isEmpty()) {
          return null;
       }
+      pendingProcessings.countUp();
       PageEvent[] pendingsWrites = new PageEvent[pageEvents.size()];
       pendingsWrites = pageEvents.toArray(pendingsWrites);
       pageEvents.clear();
@@ -147,6 +165,10 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
 
    @Override
    public void run() {
+      if (!isStarted()) {
+         return;
+      }
+
       ArtemisCloseable closeable = storageManager.closeableReadLock(true);
       if (closeable == null) {
          logger.trace("Delaying PagedTimedWriter as it's currently locked");
@@ -161,51 +183,87 @@ public class PageTimedWriter extends 
ActiveMQScheduledComponent {
    }
 
    protected void processMessages() {
-      PageEvent[] pendingEvents = extractPendingEvents();
-      if (pendingEvents == null) {
-         return;
+      PageEvent[] pendingEvents;
+      boolean wasStarted;
+      synchronized (this) {
+         pendingEvents = extractPendingEvents();
+         if (pendingEvents == null) {
+            return;
+         }
+         wasStarted = isStarted();
       }
+
       OperationContext beforeContext = OperationContextImpl.getContext();
 
       try {
-         boolean requireSync = false;
-         for (PageEvent event : pendingEvents) {
-            OperationContextImpl.setContext(event.context);
-            pagingStore.directWritePage(event.message, false, 
event.replicated);
+         if (wasStarted) {
+            boolean requireSync = false;
+            for (PageEvent event : pendingEvents) {
+               OperationContextImpl.setContext(event.context);
+               logger.trace("writing message {}", event.message);
+               pagingStore.directWritePage(event.message, false, 
event.replicated);
 
-            if (event.tx != null || syncNonTX) {
-               requireSync = true;
+               if (event.tx != null || syncNonTX) {
+                  requireSync = true;
+               }
+            }
+            if (requireSync) {
+               logger.trace("performing sync");
+               performSync();
+            }
+            for (PageEvent event : pendingEvents) {
+               if (event.tx != null) {
+                  event.tx.delayDone();
+               }
+            }
+            logger.trace("Completing events");
+            for (PageEvent event : pendingEvents) {
+               event.context.done();
             }
          }
-         if (requireSync) {
-            performSync();
-         }
+      } catch (Throwable e) {
+         logger.warn("Captured Exception {}", e.getMessage(), e);
+         ActiveMQException amqException = new 
ActiveMQIllegalStateException(e.getMessage());
+         amqException.initCause(e);
+
          for (PageEvent event : pendingEvents) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Error processing Message {}, tx={} ", 
event.message, event.tx);
+            }
             if (event.tx != null) {
-               event.tx.delayDone();
+               if (logger.isTraceEnabled()) {
+                  logger.trace("tx.markRollbackOnly on TX {}", 
event.tx.getID());
+               }
+               event.tx.markAsRollbackOnly(amqException);
             }
          }
 
-      } catch (Exception e) {
-         logger.warn(e.getMessage(), e);
          // In case of failure, The context should propagate an exception to 
the client
          // We send an exception to the client even on the case of a failure
          // to avoid possible locks and the client not getting the exception 
back
          executor.execute(() -> {
+            logger.trace("onError processing for callback", e);
             // The onError has to be called from a separate executor
             // because this PagedWriter will be holding the lock on the 
storage manager
             // and this might lead to a deadlock
             for (PageEvent event : pendingEvents) {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("onError {}, error={}", event.message, 
e.getMessage());
+               }
                event.context.onError(ActiveMQExceptionType.IO_ERROR.getCode(), 
e.getClass() + " during ioSync for paging on " + pagingStore.getStoreName() + 
": " + e.getMessage());
             }
          });
       } finally {
-         for (PageEvent event : pendingEvents) {
-            event.context.done();
-            pendingTasksUpdater.decrementAndGet(this);
-            writeCredits.release(event.credits);
+         try {
+            for (PageEvent event : pendingEvents) {
+               pendingTasksUpdater.decrementAndGet(this);
+               writeCredits.release(event.credits);
+            }
+            OperationContextImpl.setContext(beforeContext);
+         } catch (Throwable t) {
+            logger.debug(t.getMessage(), t);
          }
-         OperationContextImpl.setContext(beforeContext);
+         pendingProcessings.countDown();
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 0ff67d3244..3507fd3759 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -97,7 +97,7 @@ public class PagingStoreImpl implements PagingStore {
    private final PagingStoreFactory storeFactory;
 
    // this is used to batch and sync into paging asynchronously
-   private final PageTimedWriter timedWriter;
+   private PageTimedWriter timedWriter;
 
    private long maxSize;
 
@@ -232,6 +232,11 @@ public class PagingStoreImpl implements PagingStore {
       return localWriter;
    }
 
+   // for tests, used through an accessor
+   protected void replacePagedTimedWriter(PageTimedWriter writer) {
+      this.timedWriter = writer;
+   }
+
    private void overSized() {
       full = true;
    }
@@ -1383,7 +1388,7 @@ public class PagingStoreImpl implements PagingStore {
       return true;
    }
 
-   void directWritePage(PagedMessage pagedMessage, boolean lineUp, boolean 
originalReplicated) throws Exception {
+   protected void directWritePage(PagedMessage pagedMessage, boolean lineUp, 
boolean originalReplicated) throws Exception {
       int bytesToWrite = pagedMessage.getEncodeSize() + 
PageReadWriter.SIZE_RECORD;
 
       currentPageSize += bytesToWrite;
@@ -1398,6 +1403,10 @@ public class PagingStoreImpl implements PagingStore {
       // doing this will give us a possibility of recovering the page counters
       final Page page = currentPage;
 
+      if (!page.isOpen()) {
+         page.open(false);
+      }
+
       page.write(pagedMessage, lineUp, originalReplicated);
 
       if (logger.isTraceEnabled()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 53ff9c92ff..fd8404f3b7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence;
 
 import javax.transaction.xa.Xid;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,6 +66,7 @@ import 
org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.JournalLoader;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.ArtemisCloseable;
 import org.apache.activemq.artemis.utils.IDGenerator;
 
@@ -96,6 +98,14 @@ public interface StorageManager extends MapStorageManager, 
IDGenerator, ActiveMQ
    default void recoverLargeMessagesOnFolder(Set<Long> files) throws Exception 
{
    }
 
+   default void writeLock() {
+
+   }
+
+   default void writeUnlock() {
+
+   }
+
    default SequentialFileFactory getJournalSequentialFileFactory() {
       return null;
    }
@@ -129,6 +139,10 @@ public interface StorageManager extends MapStorageManager, 
IDGenerator, ActiveMQ
     */
    void stop(boolean ioCriticalError, boolean sendFailover) throws Exception;
 
+   default Set<RemotingConnection> getUsedConnections() {
+      return Collections.emptySet();
+   }
+
    // Message related operations
 
    void pageClosed(SimpleString address, long pageNumber);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 7eeb0e0613..32aae12d74 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -455,14 +455,22 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
       }
    }
 
+   @Override
+   public void writeLock() {
+      storageManagerLock.writeLock().lock();
+   }
+
+   @Override
+   public void writeUnlock() {
+      storageManagerLock.writeLock().unlock();
+   }
+
    @Override
    public ArtemisCloseable closeableReadLock(boolean tryLock) {
       if (reentrant.get()) {
          return dummyCloseable;
       }
 
-      reentrant.set(true);
-
       CriticalCloseable measure = measureCritical(CRITICAL_STORE);
 
       if (tryLock) {
@@ -473,6 +481,8 @@ public abstract class AbstractJournalStorageManager extends 
CriticalComponentImp
          storageManagerLock.readLock().lock();
       }
 
+      reentrant.set(true);
+
       if (CriticalMeasure.isDummy(measure)) {
          // The next statement could have been called like this:
          // return storageManagerLock.readLock()::unlock;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
index 3e7a2bb30d..553cb4050d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/BatchingIDGenerator.java
@@ -24,12 +24,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * An ID generator that allocates a batch of IDs of size {@link 
#checkpointSize} and records the ID in the journal only
@@ -47,10 +50,14 @@ public final class BatchingIDGenerator implements 
IDGenerator {
 
    private volatile long nextID;
 
+   private boolean started = true;
+
    private final StorageManager storageManager;
 
    private List<Long> cleanupRecords = null;
 
+   private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
    public BatchingIDGenerator(final long start, final long checkpointSize, 
final StorageManager storageManager) {
       counter = new AtomicLong(start);
 
@@ -62,6 +69,19 @@ public final class BatchingIDGenerator implements 
IDGenerator {
       this.storageManager = storageManager;
    }
 
+   public void stop() {
+      lock.writeLock().lock();
+      try {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Stopping generator", new Exception("Trace"));
+         }
+         persistCurrentID();
+         started = false;
+      } finally {
+         lock.writeLock().unlock();
+      }
+   }
+
    public void persistCurrentID() {
       final long recordID = counter.incrementAndGet();
       storeID(recordID, recordID);
@@ -86,15 +106,23 @@ public final class BatchingIDGenerator implements 
IDGenerator {
    }
 
    public void loadState(final long journalID, final ActiveMQBuffer buffer) {
-      addCleanupRecord(journalID);
-      IDCounterEncoding encoding = new IDCounterEncoding();
+      lock.writeLock().lock();
+      try {
+         addCleanupRecord(journalID);
+         IDCounterEncoding encoding = new IDCounterEncoding();
 
-      encoding.decode(buffer);
+         encoding.decode(buffer);
 
-      // Keep nextID and counter the same, the next generateID will update the 
checkpoint
-      nextID = encoding.id + 1;
+         // Keep nextID and counter the same, the next generateID will update 
the checkpoint
+         nextID = encoding.id + 1;
 
-      counter.set(nextID);
+         counter.set(nextID);
+
+         // if we are loading we are restarting it
+         started = true;
+      } finally {
+         lock.writeLock().unlock();
+      }
    }
 
    // for testcases
@@ -107,12 +135,23 @@ public final class BatchingIDGenerator implements 
IDGenerator {
 
    @Override
    public long generateID() {
-      long id = counter.getAndIncrement();
+      lock.readLock().lock();
+      try {
+         if (!started) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("BatchIDGenerator is not supposed to be used", new 
Exception("trace"));
+            }
+            throw ActiveMQMessageBundle.BUNDLE.idGeneratorStopped();
+         }
+         long id = counter.getAndIncrement();
 
-      if (id >= nextID) {
-         saveCheckPoint(id);
+         if (id >= nextID) {
+            saveCheckPoint(id);
+         }
+         return id;
+      } finally {
+         lock.readLock().unlock();
       }
-      return id;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 54d0624c93..d863ed310e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -20,8 +20,10 @@ package 
org.apache.activemq.artemis.core.persistence.impl.journal;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -64,6 +66,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.ArtemisCloseable;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
@@ -119,6 +122,17 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
       super(config, analyzer, executorFactory, null, ioExecutors, 
criticalErrorListener);
    }
 
+   @Override
+   public Set<RemotingConnection> getUsedConnections() {
+      if (replicator == null) {
+         return Collections.emptySet();
+      } else {
+         HashSet<RemotingConnection> usedConnections = new HashSet<>();
+         usedConnections.add(replicator.getBackupTransportConnection());
+         return usedConnections;
+      }
+   }
+
    @Override
    public SequentialFileFactory getJournalSequentialFileFactory() {
       return journalFF;
@@ -271,9 +285,9 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
 
       if (!ioCriticalError) {
          performCachedLargeMessageDeletes();
-         // Must call close to make sure last id is persisted
+         // Must call stop to make sure last id is persisted
          if (journalLoaded && idGenerator != null)
-            idGenerator.persistCurrentID();
+            idGenerator.stop();
       }
 
       final CountDownLatch latch = new CountDownLatch(1);
@@ -637,16 +651,11 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
                originalBindingsJournal.replicationSyncPreserveOldFiles();
                originalMessageJournal.replicationSyncPreserveOldFiles();
 
-               pagingManager.lock();
-               try {
-                  pagingManager.disableCleanup();
-                  messageFiles = prepareJournalForCopy(originalMessageJournal, 
JournalContent.MESSAGES, nodeID, autoFailBack);
-                  bindingsFiles = 
prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID, 
autoFailBack);
-                  pageFilesToSync = getPageInformationForSync(pagingManager);
-                  pendingLargeMessages = recoverPendingLargeMessages();
-               } finally {
-                  pagingManager.unlock();
-               }
+               pagingManager.disableCleanup();
+               messageFiles = prepareJournalForCopy(originalMessageJournal, 
JournalContent.MESSAGES, nodeID, autoFailBack);
+               bindingsFiles = prepareJournalForCopy(originalBindingsJournal, 
JournalContent.BINDINGS, nodeID, autoFailBack);
+               pageFilesToSync = getPageInformationForSync(pagingManager);
+               pendingLargeMessages = recoverPendingLargeMessages();
             } finally {
                originalMessageJournal.synchronizationUnlock();
                originalBindingsJournal.synchronizationUnlock();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index c442501bed..e45e9dfbf9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1906,6 +1906,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       // if the message is being sent from the bridge, we just ignore the 
duplicate id, and use the internal one
       final DuplicateIDCache cacheBridge = 
getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));
       if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) 
{
+         logger.trace("Message {} hit a bridge duplicate", message);
          context.getTransaction().rollback();
          message.usageDown(); // this will cause large message delete
          return DuplicateCheckResult.DuplicateNotStartedTX;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
index 4e7279777b..0d258081af 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java
@@ -165,6 +165,13 @@ public final class InVMAcceptor extends AbstractAcceptor {
 
       connections.clear();
 
+      started = false;
+
+      paused = false;
+   }
+
+   @Override
+   public void notifyStop() {
       if (notificationService != null) {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(SimpleString.of("factory"), 
SimpleString.of(InVMAcceptorFactory.class.getName()));
@@ -176,10 +183,6 @@ public final class InVMAcceptor extends AbstractAcceptor {
             ActiveMQServerLogger.LOGGER.failedToSendNotification(e);
          }
       }
-
-      started = false;
-
-      paused = false;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 80879c2484..6a089813ec 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -748,47 +748,63 @@ public class NettyAcceptor extends AbstractAcceptor {
          callback.run();
          return;
       }
+      try {
 
-      if (protocolHandler != null) {
-         protocolHandler.close();
-      }
+         if (protocolHandler != null) {
+            protocolHandler.close();
+         }
 
-      if (batchFlusherFuture != null) {
-         batchFlusherFuture.cancel(false);
+         if (batchFlusherFuture != null) {
+            batchFlusherFuture.cancel(false);
 
-         flusher.cancel();
+            flusher.cancel();
 
-         flusher = null;
+            flusher = null;
 
-         batchFlusherFuture = null;
-      }
+            batchFlusherFuture = null;
+         }
 
-      // serverChannelGroup has been unbound in pause()
-      if (serverChannelGroup != null) {
-         serverChannelGroup.close().awaitUninterruptibly();
-      }
+         // serverChannelGroup has been unbound in pause()
+         if (serverChannelGroup != null) {
+            serverChannelGroup.close().awaitUninterruptibly();
+         }
 
-      if (channelGroup != null) {
-         ChannelGroupFuture future = 
channelGroup.close().awaitUninterruptibly();
+         if (channelGroup != null) {
+            ChannelGroupFuture future = 
channelGroup.close().awaitUninterruptibly();
 
-         if (!future.isSuccess()) {
-            ActiveMQServerLogger.LOGGER.nettyChannelGroupError();
-            for (Channel channel : future.group()) {
-               if (channel.isActive()) {
-                  ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, 
channel.remoteAddress());
+            if (!future.isSuccess()) {
+               ActiveMQServerLogger.LOGGER.nettyChannelGroupError();
+               for (Channel channel : future.group()) {
+                  if (channel.isActive()) {
+                     
ActiveMQServerLogger.LOGGER.nettyChannelStillOpen(channel, 
channel.remoteAddress());
+                  }
                }
             }
          }
-      }
 
-      channelClazz = null;
+         channelClazz = null;
 
-      for (Connection connection : connections.values()) {
-         listener.connectionDestroyed(connection.getID(), true);
-      }
+         for (Connection connection : connections.values()) {
+            listener.connectionDestroyed(connection.getID(), true);
+         }
 
-      connections.clear();
+         connections.clear();
+
+         paused = false;
+      } finally {
+         if (eventLoopGroup == null) {
+            callback.run();
+         } else {
+            // Shutdown the EventLoopGroup if no new task was added for 100ms 
or if
+            // 3000ms elapsed.
+            eventLoopGroup.shutdownGracefully(quietPeriod, shutdownTimeout, 
TimeUnit.MILLISECONDS).addListener(f -> callback.run());
+            eventLoopGroup = null;
+         }
+      }
+   }
 
+   @Override
+   public void notifyStop() {
       if (notificationService != null) {
          TypedProperties props = new TypedProperties();
          props.putSimpleStringProperty(SimpleString.of("factory"), 
SimpleString.of(NettyAcceptorFactory.class.getName()));
@@ -801,13 +817,6 @@ public class NettyAcceptor extends AbstractAcceptor {
             ActiveMQServerLogger.LOGGER.failedToSendNotification(e);
          }
       }
-
-      paused = false;
-
-      // Shutdown the EventLoopGroup if no new task was added for 100ms or if
-      // 3000ms elapsed.
-      eventLoopGroup.shutdownGracefully(quietPeriod, shutdownTimeout, 
TimeUnit.MILLISECONDS).addListener(f -> callback.run());
-      eventLoopGroup = null;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index 3ffae9576a..652c89fd86 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -75,6 +75,11 @@ public interface RemotingService {
 
    boolean removeOutgoingInterceptor(BaseInterceptor interceptor);
 
+   void notifyStop();
+
+   /** The Prepare stop will close all the connections however it will use the 
one used by storage manager */
+   void prepareStop(boolean criticalError, Set<RemotingConnection> 
ignoreConnections) throws Exception;
+
    void stop(boolean criticalError) throws Exception;
 
    void start() throws Exception;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index a2ec85ddcd..0ac17307c7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -372,17 +372,22 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
    }
 
    @Override
-   public void stop(final boolean criticalError) throws Exception {
-      if (!started) {
-         return;
-      }
-      SSLContextFactoryProvider.getSSLContextFactory().clearSSLContexts();
-      OpenSSLContextFactory openSSLContextFactory = 
OpenSSLContextFactoryProvider.getOpenSSLContextFactory();
-      if (openSSLContextFactory != null) {
-         openSSLContextFactory.clearSslContexts();
+   public void notifyStop() {
+
+      // We need to stop them accepting first so no new connections are 
accepted after we send the disconnect message
+      for (Acceptor acceptor : acceptors.values()) {
+         logger.debug("send stop notifications on acceptor {}", acceptor);
+
+         try {
+            acceptor.notifyStop();
+         } catch (Throwable t) {
+            
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
+         }
       }
+   }
 
-      failureCheckAndFlushThread.close(criticalError);
+   @Override
+   public void prepareStop(boolean criticalError, Set<RemotingConnection> 
ignoreList) throws Exception {
 
       // We need to stop them accepting first so no new connections are 
accepted after we send the disconnect message
       for (Acceptor acceptor : acceptors.values()) {
@@ -396,7 +401,7 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
 
       }
 
-      logger.debug("Sending disconnect on client connections");
+      logger.info("Sending disconnect on client connections");
 
       Set<ConnectionEntry> connectionEntries = new 
HashSet<>(connections.values());
 
@@ -405,10 +410,34 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
       for (ConnectionEntry entry : connectionEntries) {
          RemotingConnection conn = entry.connection;
 
-         logger.trace("Sending connection.disconnection packet to {}", conn);
+         if (ignoreList.contains(conn)) {
+            logger.debug("ignoring connection {} during the close", conn);
+         } else {
+            logger.debug("Sending disconnect on connection {} from server {}", 
conn.getID(), server);
+            conn.disconnect(criticalError);
+         }
+      }
 
-         conn.disconnect(criticalError);
+   }
+
+   @Override
+   public void stop(final boolean criticalError) throws Exception {
+      if (!started) {
+         return;
       }
+      SSLContextFactoryProvider.getSSLContextFactory().clearSSLContexts();
+      OpenSSLContextFactory openSSLContextFactory = 
OpenSSLContextFactoryProvider.getOpenSSLContextFactory();
+      if (openSSLContextFactory != null) {
+         openSSLContextFactory.clearSslContexts();
+      }
+
+      failureCheckAndFlushThread.close(criticalError);
+
+      // ActiveMQServerImpl already calls prepareStop
+      // however we call this again here for two reasons:
+      // I - ActiveMQServer might have ignored the one connection for 
Replication
+      // II - this method could be called in other places for Embedding or 
testing and the semantic must be kept the same
+      prepareStop(criticalError, Collections.emptySet());
 
       CountDownLatch acceptorCountDownLatch = new 
CountDownLatch(acceptors.size());
       for (Acceptor acceptor : acceptors.values()) {
@@ -586,7 +615,7 @@ public class RemotingServiceImpl implements 
RemotingService, ServerConnectionLif
          AuditLogger.createdConnection(connection.getProtocolConnection() == 
null ? null : connection.getProtocolConnection().getProtocolName(), 
connection.getID(), connection.getRemoteAddress());
       }
       if (logger.isDebugEnabled()) {
-         logger.debug("Adding connection {}, we now have {}", 
connection.getID(), connections.size());
+         logger.debug("Adding connection {}, we now have {} on server {}", 
connection.getID(), connections.size(), server);
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 10783a770f..014616d8fa 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -531,4 +531,8 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 229256, value = "{} must be a positive power of 2 (actual 
value: {})")
    IllegalArgumentException positivePowerOfTwo(String name, Number val);
+
+   @Message(id = 229257, value = "IDGenerator has been stopped")
+   RuntimeException idGeneratorStopped();
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 082adebc7a..1ca6ffe6aa 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1341,37 +1341,71 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
          }
       }
 
+      final RemotingService remotingService = this.remotingService;
+
+      if (remotingService != null) {
+         // The notification must be sent with the StorageManager still up
+         // this is because NotificationService will use 
storageManager.generateID
+         remotingService.notifyStop();
+      }
+
+      // We start the preparation for the broker shutdown by first stopping 
acceptors, and removing connections
+      // this is to avoid Exceptions while the broker is stopping as much as 
possible
+      // by first stopping any pending connections before stopping the storage.
+      // if we stopped the journal first before remoting we could have more 
exceptions being sent for the client.
+      try {
+         if (remotingService != null) {
+            // it will close all connections except to the one used by 
replication
+            remotingService.prepareStop(criticalIOError,  storageManager != 
null ? storageManager.getUsedConnections() : Collections.emptySet());
+         }
+      } catch (Throwable t) {
+         
ActiveMQServerLogger.LOGGER.errorStoppingComponent(remotingService.getClass().getName(),
 t);
+      }
+
+      stopComponent(pagingManager);
+
       if (!criticalIOError && pagingManager != null) {
          pagingManager.counterSnapshot();
       }
 
-      stopComponent(pagingManager);
+      final ManagementService managementService = this.managementService;
 
-      if (storageManager != null)
+      // we have to disable management service before stopping the storage
+      // otherwise management would send notifications eventually
+      if (managementService != null) {
+         try {
+            managementService.enableNotifications(false);
+         } catch (Throwable t) {
+            
ActiveMQServerLogger.LOGGER.errorStoppingComponent(managementService.getClass().getName(),
 t);
+         }
+      }
+
+      if (storageManager != null) {
          try {
             storageManager.stop(criticalIOError, failoverOnServerShutdown);
          } catch (Throwable t) {
             
ActiveMQServerLogger.LOGGER.errorStoppingComponent(storageManager.getClass().getName(),
 t);
          }
+      }
 
       // We stop remotingService before otherwise we may lock the system in 
case of a critical IO
       // error shutdown
-      final RemotingService remotingService = this.remotingService;
-      if (remotingService != null)
+      if (remotingService != null) {
          try {
             remotingService.stop(criticalIOError);
          } catch (Throwable t) {
             
ActiveMQServerLogger.LOGGER.errorStoppingComponent(remotingService.getClass().getName(),
 t);
          }
+      }
 
       // Stop the management service after the remoting service to ensure all 
acceptors are deregistered with JMX
-      final ManagementService managementService = this.managementService;
-      if (managementService != null)
+      if (managementService != null) {
          try {
             managementService.unregisterServer();
          } catch (Throwable t) {
             
ActiveMQServerLogger.LOGGER.errorStoppingComponent(managementService.getClass().getName(),
 t);
          }
+      }
 
       stopComponent(managementService);
       stopComponent(resourceManager);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 83bf5616c2..a245865fc7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -399,6 +399,19 @@ public class TransactionImpl implements Transaction {
       }
    }
 
+   class NonPersistentDelay extends DelayedRunnable {
+      NonPersistentDelay(long id) {
+         super(id);
+      }
+
+      // a non persistent delay means a transaction with non persistent data 
was delayed
+      // no commit was generated but we still need to wait completions and 
delays before we move on
+      @Override
+      protected void actualRun() throws Exception {
+      }
+   }
+
+
    class DelayedCommit extends DelayedRunnable {
       DelayedCommit(long id) {
          super(id);
@@ -414,7 +427,6 @@ public class TransactionImpl implements Transaction {
       }
    }
 
-
    class DelayedPrepare extends DelayedRunnable {
       long id;
       Xid xid;
@@ -445,6 +457,10 @@ public class TransactionImpl implements Transaction {
                storageManager.commit(id);
             }
          }
+      } else {
+         if (delayed > 0) {
+            delayedRunnable = new NonPersistentDelay(id);
+         }
       }
 
       state = State.COMMITTED;
@@ -478,6 +494,8 @@ public class TransactionImpl implements Transaction {
       logger.trace("TransactionImpl::rollback::{}", this);
 
       synchronized (timeoutLock) {
+         // if it was marked as prepare/commit while delay, it needs to be 
cancelled
+         delayedRunnable = null;
          if (state == State.ROLLEDBACK) {
             // I don't think this could happen, but just in case
             logger.debug("TransactionImpl::rollback::{} is being ignored", 
this);
@@ -500,6 +518,10 @@ public class TransactionImpl implements Transaction {
    private void internalRollback() throws Exception {
       logger.trace("TransactionImpl::internalRollback {}", this);
 
+      // even though rollback already sets this as null
+      // I'm setting this again for other cases where internalRollback is 
called
+      delayedRunnable = null;
+
       beforeRollback();
 
       try {
@@ -591,6 +613,9 @@ public class TransactionImpl implements Transaction {
             logger.trace("TransactionImpl::{} marking rollbackOnly for {}, 
msg={}", this, exception.toString(), exception.getMessage());
          }
 
+         // cancelling any delayed commit or prepare
+         delayedRunnable = null;
+
          if (isEffective()) {
             if (logger.isDebugEnabled()) {
                logger.debug("Trying to mark transaction {} xid={} as 
rollbackOnly but it was already effective (prepared, committed or 
rolledback!)", id, xid);
@@ -629,6 +654,10 @@ public class TransactionImpl implements Transaction {
       }
    }
 
+   public int getPendingDelay() {
+      return delayed;
+   }
+
 
    @Override
    public synchronized void addOperation(final TransactionOperation operation) 
{
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
index 0446cbaac8..2f0b08e7de 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java
@@ -42,6 +42,8 @@ public interface Acceptor extends ActiveMQComponent {
     */
    void pause();
 
+   void notifyStop();
+
    /**
     * This will update the list of interceptors for each ProtocolManager 
inside the acceptor.
     */
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
index 4951a0f214..f8fda0ca7d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
@@ -16,8 +16,19 @@
  */
 package org.apache.activemq.artemis.core.paging.impl;
 
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.utils.SizeAwareMetric;
+
 public class PagingManagerImplAccessor {
    public static void setDiskFull(PagingManagerImpl pagingManager, boolean 
diskFull) {
       pagingManager.setDiskFull(diskFull);
    }
+
+   public static void resetMaxSize(PagingManager pagingManager, long maxSize, 
long maxElements) {
+      ((PagingManagerImpl)pagingManager).resetMaxSize(maxSize, maxElements);
+   }
+
+   public static SizeAwareMetric globalSizeAwareMetric(PagingManager 
pagingManager) {
+      return ((PagingManagerImpl)pagingManager).getSizeAwareMetric();
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java
deleted file mode 100644
index b0f213f316..0000000000
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerTestAccessor.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.paging.impl;
-
-import org.apache.activemq.artemis.core.paging.PagingManager;
-import org.apache.activemq.artemis.utils.SizeAwareMetric;
-
-/**
- * Use this class to access things that are meant on test only
- */
-public class PagingManagerTestAccessor {
-
-   public static void resetMaxSize(PagingManager pagingManager, long maxSize, 
long maxElements) {
-      ((PagingManagerImpl)pagingManager).resetMaxSize(maxSize, maxElements);
-   }
-
-   public static SizeAwareMetric globalSizeAwareMetric(PagingManager 
pagingManager) {
-      return ((PagingManagerImpl)pagingManager).getSizeAwareMetric();
-   }
-
-   public static String debugMessages(Page page) throws Exception {
-      return page.debugMessages();
-   }
-}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImplAccessor.java
similarity index 58%
copy from 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
copy to 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImplAccessor.java
index 4951a0f214..3265e3e242 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImplAccessor.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,10 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.artemis.core.paging.impl;
 
-public class PagingManagerImplAccessor {
-   public static void setDiskFull(PagingManagerImpl pagingManager, boolean 
diskFull) {
-      pagingManager.setDiskFull(diskFull);
+import org.apache.activemq.artemis.core.paging.PagingStore;
+
+public class PagingStoreImplAccessor {
+
+   public static void replacePageTimedWriter(PagingStore store, 
PageTimedWriter newWriter) {
+      final PagingStoreImpl storeImpl = (PagingStoreImpl) store;
+      storeImpl.replacePagedTimedWriter(newWriter);
    }
+
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeSimulationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeSimulationTest.java
new file mode 100644
index 0000000000..613253a218
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/bridge/BridgeSimulationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.bridge;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.paging.impl.PageTimedWriter;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImplAccessor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * this test will simulate what a bridge sender would do with auto completes
+ */
+public class BridgeSimulationTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private ActiveMQServer server;
+
+   private final SimpleString address = SimpleString.of("address");
+
+   private final SimpleString queueName = SimpleString.of("queue");
+
+   @Override
+   @BeforeEach
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server = createServer(true);
+      server.start();
+   }
+
+   @Test
+   public void testSendAcknowledgements() throws Exception {
+      ServerLocator locator = createInVMNonHALocator();
+
+      locator.setConfirmationWindowSize(100 * 1024);
+
+      try (ClientSessionFactory csf = createSessionFactory(locator); 
ClientSession session = csf.createSession(null, null, false, true, true, false, 
1)) {
+
+         Queue queue = 
server.createQueue(QueueConfiguration.of(queueName).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+         ClientProducer prod = session.createProducer(address);
+
+         PagingStoreImpl pagingStore = (PagingStoreImpl) 
queue.getPagingStore();
+
+         CountDownLatch allowRunning = new CountDownLatch(1);
+         CountDownLatch enteredSync = new CountDownLatch(1);
+         CountDownLatch allowSync = new CountDownLatch(1);
+         AtomicInteger doneSync = new AtomicInteger(0);
+
+         runAfter(allowRunning::countDown);
+         runAfter(allowSync::countDown);
+
+         PageTimedWriter newWriter = new PageTimedWriter(100_000, 
server.getStorageManager(), pagingStore, server.getScheduledPool(), 
server.getExecutorFactory().getExecutor(), true, 100) {
+            @Override
+            public void run() {
+               logger.info("newWriter waiting to run");
+               try {
+                  allowRunning.await();
+               } catch (InterruptedException e) {
+                  logger.warn(e.getMessage(), e);
+                  Thread.currentThread().interrupt();
+
+               }
+               logger.info("newWriter running");
+               super.run();
+               logger.info("newWriter ran");
+            }
+
+            @Override
+            protected void performSync() throws Exception {
+               logger.info("newWriter waiting to perform sync");
+               enteredSync.countDown();
+               super.performSync();
+               try {
+                  allowSync.await();
+               } catch (InterruptedException e) {
+                  logger.warn(e.getMessage(), e);
+               }
+               doneSync.incrementAndGet();
+               logger.info("newWriter done with sync");
+            }
+         };
+
+         runAfter(newWriter::stop);
+
+         PageTimedWriter olderWriter = pagingStore.getPageTimedWriter();
+         olderWriter.stop();
+
+         newWriter.start();
+
+         PagingStoreImplAccessor.replacePageTimedWriter(pagingStore, 
newWriter);
+
+         CountDownLatch ackDone = new CountDownLatch(1);
+
+         pagingStore.startPaging();
+
+         SendAcknowledgementHandler handler = new SendAcknowledgementHandler() 
{
+            @Override
+            public void sendAcknowledged(Message message) {
+               logger.info("ACK Confirmation from {}", message);
+               ackDone.countDown();
+            }
+         };
+
+         session.setSendAcknowledgementHandler(handler);
+
+         ClientMessage message = session.createMessage(true);
+         message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, 
ByteBuffer.allocate(8).putLong(queue.getID()).array());
+         message.putExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, 
RandomUtil.randomBytes(10));
+         prod.send(message);
+
+         logger.info("Sent..");
+
+         assertEquals(0, queue.getMessageCount());
+         assertFalse(ackDone.await(20, TimeUnit.MILLISECONDS));
+         allowRunning.countDown();
+         assertFalse(ackDone.await(20, TimeUnit.MILLISECONDS));
+         assertEquals(0, queue.getMessageCount());
+         assertTrue(enteredSync.await(10, TimeUnit.SECONDS));
+         assertFalse(ackDone.await(20, TimeUnit.MILLISECONDS));
+         assertEquals(0, queue.getMessageCount());
+         verifyReceive(locator, false);
+         allowSync.countDown();
+         assertTrue(ackDone.await(1000, TimeUnit.MILLISECONDS));
+         Wait.assertEquals(1L, queue::getMessageCount, 5000, 100);
+         verifyReceive(locator, true);
+      }
+
+   }
+
+   private void verifyReceive(ServerLocator locator, boolean receive) throws 
Exception {
+      try (ClientSessionFactory factory = locator.createSessionFactory(); 
ClientSession session = factory.createSession(false, true)) {
+         ClientConsumer consumer = session.createConsumer(queueName);
+         session.start();
+         if (receive) {
+            if (consumer.receive(5_000) == null) {
+               logger.warn("No message received");
+               fail("no message received");
+            }
+         } else {
+            if (consumer.receiveImmediate() != null) {
+               logger.warn("message was received");
+               fail("message was received");
+            }
+         }
+      }
+
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 149fd6e27c..012b169bee 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -78,7 +78,7 @@ import 
org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
 import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
 import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
@@ -3151,7 +3151,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       final int MESSAGE_SIZE = 1024 * 3; // 3k
 
-      PagingManagerTestAccessor.resetMaxSize(server.getPagingManager(), 10240, 
0);
+      PagingManagerImplAccessor.resetMaxSize(server.getPagingManager(), 10240, 
0);
       clearDataRecreateServerDirs();
 
       SimpleString address = RandomUtil.randomUUIDSimpleString();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
index e2d0c9a404..6a0dcdd264 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/MaxMessagesPagingTest.java
@@ -46,7 +46,7 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
+import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
@@ -150,7 +150,7 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase 
{
 
       Wait.assertTrue(queue.getPagingStore()::isPaging);
 
-      SizeAwareMetric globalSizeMetric = 
PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
+      SizeAwareMetric globalSizeMetric = 
PagingManagerImplAccessor.globalSizeAwareMetric(server.getPagingManager());
 
       // this is validating the test is actually validating paging after over 
elements
       assertTrue(globalSizeMetric.isOverElements());
@@ -213,7 +213,7 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase 
{
          }
       }
 
-      SizeAwareMetric globalSizeMetric = 
PagingManagerTestAccessor.globalSizeAwareMetric(server.getPagingManager());
+      SizeAwareMetric globalSizeMetric = 
PagingManagerImplAccessor.globalSizeAwareMetric(server.getPagingManager());
 
       // this is validating the test is actually validating paging after over 
elements
       assertTrue(globalSizeMetric.isOverElements());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplAccessor.java
similarity index 60%
copy from 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
copy to 
tests/unit-tests/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplAccessor.java
index 4951a0f214..9d9d66f88c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImplAccessor.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplAccessor.java
@@ -1,12 +1,12 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,10 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.artemis.core.paging.impl;
 
-public class PagingManagerImplAccessor {
-   public static void setDiskFull(PagingManagerImpl pagingManager, boolean 
diskFull) {
-      pagingManager.setDiskFull(diskFull);
+package org.apache.activemq.artemis.core.transaction.impl;
+
+public class TransactionImplAccessor {
+
+   public static void setContainsPersistent(TransactionImpl tx, boolean 
persistent) {
+      tx.containsPersistent = persistent;
    }
 }
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index 9067ef2d57..be8ce5531a 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -19,15 +19,20 @@ package 
org.apache.activemq.artemis.tests.unit.core.paging.impl;
 
 import javax.transaction.xa.Xid;
 import java.lang.invoke.MethodHandles;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
@@ -49,14 +54,17 @@ import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperation;
 import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import 
org.apache.activemq.artemis.core.transaction.impl.TransactionImplAccessor;
 import 
org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.apache.activemq.artemis.tests.util.ArtemisTestCase;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -119,14 +127,16 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
    AtomicBoolean returnSynchronizing = new AtomicBoolean(false);
    ReplicationManager mockReplicationManager;
 
+   Consumer<PagedMessage> directWriteInterceptor = null;
+   Runnable ioSyncInterceptor = null;
 
    private class MockableJournalStorageManager extends JournalStorageManager {
 
       MockableJournalStorageManager(Configuration config,
-                                           Journal bindingsJournal,
-                                           Journal messagesJournal,
-                                           ExecutorFactory executorFactory,
-                                           ExecutorFactory ioExecutors) {
+                                    Journal bindingsJournal,
+                                    Journal messagesJournal,
+                                    ExecutorFactory executorFactory,
+                                    ExecutorFactory ioExecutors) {
          super(config, Mockito.mock(CriticalAnalyzer.class), executorFactory, 
ioExecutors);
          this.bindingsJournal = bindingsJournal;
          this.messageJournal = messagesJournal;
@@ -144,7 +154,11 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       }
 
       @Override
-      public void pageWrite(final SimpleString address, final PagedMessage 
message, final long pageNumber, boolean storageUp, boolean 
originallyReplicated) {
+      public void pageWrite(final SimpleString address,
+                            final PagedMessage message,
+                            final long pageNumber,
+                            boolean storageUp,
+                            boolean originallyReplicated) {
          super.pageWrite(address, message, pageNumber, storageUp, 
originallyReplicated);
          pageWrites.incrementAndGet();
       }
@@ -173,7 +187,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       return numberOfPreparesMessageJournal.incrementAndGet();
    }
 
-
    @BeforeEach
    public void setupMocks() throws Exception {
       configuration = new ConfigurationImpl();
@@ -227,12 +240,11 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
       AddressSettings settings = new 
AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
-      pageStore = new PagingStoreImpl(ADDRESS, scheduledExecutorService, 100, 
mockPagingManager, realJournalStorageManager, inMemoryFileFactory,
-                                      mockPageStoreFactory, ADDRESS, settings, 
executorFactory.getExecutor(), false) {
+      pageStore = new PagingStoreImpl(ADDRESS, scheduledExecutorService, 100, 
mockPagingManager, realJournalStorageManager, inMemoryFileFactory, 
mockPageStoreFactory, ADDRESS, settings, executorFactory.getExecutor(), false) {
          @Override
          protected PageTimedWriter 
createPageTimedWriter(ScheduledExecutorService scheduledExecutor, long 
syncTimeout) {
 
-            PageTimedWriter timer = new PageTimedWriter(100_000, 
realJournalStorageManager, this, scheduledExecutorService, 
executorFactory.getExecutor(), true, 100) {
+            PageTimedWriter timer = new PageTimedWriter(100_000, 
realJournalStorageManager, this, scheduledExecutorService, getExecutor(), true, 
100) {
                @Override
                public void run() {
                   try {
@@ -260,7 +272,28 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
             timer.start();
             return timer;
+         }
+
+         @Override
+         protected void directWritePage(PagedMessage pagedMessage,
+                                        boolean lineUp,
+                                        boolean originalReplicated) throws 
Exception {
+            if (directWriteInterceptor != null) {
+               directWriteInterceptor.accept(pagedMessage);
+            }
+
+            if (!pageStore.getExecutor().inHandler()) {
+               logger.warn("WHAT????", new Exception("trace"));
+            }
+            super.directWritePage(pagedMessage, lineUp, originalReplicated);
+         }
 
+         @Override
+         public void ioSync() throws Exception {
+            if (ioSyncInterceptor != null) {
+               ioSyncInterceptor.run();
+            }
+            super.ioSync();
          }
       };
 
@@ -273,6 +306,9 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       Queue mockQueue = Mockito.mock(Queue.class);
       Mockito.when(mockQueue.getID()).thenReturn(1L);
       routeContextList.addAckedQueue(mockQueue);
+
+      OperationContextImpl.clearContext();
+      runAfter(OperationContextImpl::clearContext);
    }
 
    // a test to validate if the Mocks are correctly setup
@@ -291,7 +327,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       tx.commit();
       assertEquals(1, count.get(), "tx.commit is not correctly wired on 
mocking");
 
-
       realJournalStorageManager.afterCompleteOperations(new IOCallback() {
          @Override
          public void done() {
@@ -325,7 +360,7 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
    }
 
    PagedMessage createPagedMessage() {
-      return new PagedMessageImpl(createMessage(), new long[] {1}, -1);
+      return new PagedMessageImpl(createMessage(), new long[]{1}, -1);
    }
 
    @Test
@@ -438,39 +473,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       assertEquals(numberOfMessages, pageWrites.get());
    }
 
-   @Test
-   public void testVerifyWritesAfterStop() throws Exception {
-      int numberOfMessages = 100;
-      CountDownLatch latch = new CountDownLatch(numberOfMessages);
-
-      allowRunning.countDown();
-      useReplication.set(false);
-      allowSync.setCount(1);
-
-      for (int i = 0; i < numberOfMessages; i++) {
-         TransactionImpl newTX = new 
TransactionImpl(realJournalStorageManager);
-         newTX.setContainsPersistent();
-         newTX.addOperation(new TransactionOperationAbstract() {
-            @Override
-            public void afterCommit(Transaction tx) {
-               super.afterCommit(tx);
-               latch.countDown();
-            }
-         });
-         pageStore.page(createMessage(), newTX, routeContextList);
-         newTX.commit();
-      }
-
-      assertFalse(latch.await(10, TimeUnit.MILLISECONDS));
-      allowSync.countDown();
-      // issuing a stop should finish whatever was pending before
-      // instead of sending it to the limbo
-      pageStore.stop();
-      assertTrue(latch.await(10, TimeUnit.SECONDS));
-
-      assertEquals(numberOfMessages, pageWrites.get());
-   }
-
    @Test
    public void testVerifyTimedWriterIsStopped() throws Exception {
       allowRunning.countDown();
@@ -507,7 +509,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       Wait.assertEquals(1, numberOfCommitsMessageJournal::get, 5000, 100);
    }
 
-
    @Test
    public void testTXCompletionPrepare() throws Exception {
       CountDownLatch latch = new CountDownLatch(1);
@@ -538,7 +539,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
       Wait.assertEquals(1, numberOfPreparesMessageJournal::get, 5000, 100);
    }
 
-
    // add a task while replicating, process it when no longer replicating 
(disconnect a node scenario)
    @Test
    public void testDisableReplica() throws Exception {
@@ -594,14 +594,6 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
    @Test
    public void testTXCompletion() throws Exception {
-      ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(10);
-      ExecutorService executorService = Executors.newFixedThreadPool(10);
-      runAfter(scheduledExecutorService::shutdownNow);
-      runAfter(executorService::shutdownNow);
-      runAfter(() -> OperationContextImpl.clearContext());
-
-      OrderedExecutorFactory executorFactory = new 
OrderedExecutorFactory(executorService);
-
       OperationContextImpl.clearContext();
 
       OperationContext context = 
OperationContextImpl.getContext(executorFactory);
@@ -628,4 +620,294 @@ public class PageTimedWriterUnitTest extends 
ArtemisTestCase {
 
    }
 
-}
+   @Test
+   public void testDelayNonPersistent() throws Exception {
+      internalDelay(false);
+   }
+
+   @Test
+   public void testDelayPersistent() throws Exception {
+      internalDelay(true);
+   }
+
+   private void internalDelay(boolean persistent) throws Exception {
+      TransactionImpl tx = new TransactionImpl(realJournalStorageManager);
+
+      final AtomicInteger afterRollback = new AtomicInteger(0);
+      final AtomicInteger afterCommit = new AtomicInteger(0);
+
+      tx.addOperation(new TransactionOperation() {
+         @Override
+         public void beforePrepare(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterPrepare(Transaction tx) {
+
+         }
+
+         @Override
+         public void beforeCommit(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterCommit(Transaction tx) {
+            afterCommit.incrementAndGet();
+         }
+
+         @Override
+         public void beforeRollback(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterRollback(Transaction tx) {
+            afterRollback.incrementAndGet();
+         }
+
+         @Override
+         public List<MessageReference> getRelatedMessageReferences() {
+            return null;
+         }
+
+         @Override
+         public List<MessageReference> getListOnConsumer(long consumerID) {
+            return null;
+         }
+      });
+      TransactionImplAccessor.setContainsPersistent(tx, persistent);
+      tx.delay();
+      tx.commit();
+      assertEquals(0, afterCommit.get());
+      assertEquals(0, afterRollback.get());
+      tx.delayDone();
+      Wait.assertEquals(1, afterCommit::get, 5000, 100);
+   }
+
+   @Test
+   public void testRollbackCancelDelay() throws Exception {
+      testRollback(true);
+   }
+
+   @Test
+   public void testMarkRollbackCancelDelay() throws Exception {
+      testRollback(true);
+   }
+
+   private void testRollback(boolean rollback) throws Exception {
+      TransactionImpl tx = new TransactionImpl(realJournalStorageManager);
+
+      final AtomicInteger afterRollback = new AtomicInteger(0);
+      final AtomicInteger afterCommit = new AtomicInteger(0);
+
+      tx.addOperation(new TransactionOperation() {
+         @Override
+         public void beforePrepare(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterPrepare(Transaction tx) {
+
+         }
+
+         @Override
+         public void beforeCommit(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterCommit(Transaction tx) {
+            afterCommit.incrementAndGet();
+         }
+
+         @Override
+         public void beforeRollback(Transaction tx) throws Exception {
+
+         }
+
+         @Override
+         public void afterRollback(Transaction tx) {
+            afterRollback.incrementAndGet();
+         }
+
+         @Override
+         public List<MessageReference> getRelatedMessageReferences() {
+            return null;
+         }
+
+         @Override
+         public List<MessageReference> getListOnConsumer(long consumerID) {
+            return null;
+         }
+      });
+      TransactionImplAccessor.setContainsPersistent(tx, true);
+      tx.delay();
+      tx.commit();
+      assertEquals(0, afterCommit.get());
+      assertEquals(0, afterRollback.get());
+      if (rollback) {
+         tx.markAsRollbackOnly(new ActiveMQException("duh!"));
+      } else {
+         tx.rollback();
+      }
+      tx.delayDone();
+      Wait.assertEquals(0, afterCommit::get, 5000, 100);
+
+      if (!rollback) {
+         tx.rollback();
+      }
+
+      Wait.assertEquals(0, afterCommit::get, 5000, 100);
+      Wait.assertEquals(0, afterRollback::get, 5000, 100);
+   }
+
+   @Test
+   public void testSimulateFlow() throws Exception {
+      AtomicBoolean notSupposedToWrite = new AtomicBoolean(false);
+
+      AtomicInteger errors = new AtomicInteger(0);
+
+      int sleepTime = 1;
+      int totalTime = 500;
+      AtomicBoolean pageStoreThrowsExceptions = new AtomicBoolean(false);
+
+      LinkedHashSet<String> interceptedWrite = new LinkedHashSet<>();
+      LinkedHashSet<String> sentWrite = new LinkedHashSet<>();
+
+      directWriteInterceptor = m -> {
+         if (pageStoreThrowsExceptions.get()) {
+            throw new NullPointerException("simulating a NPE on directWrite");
+         }
+         String messageID = m.getMessage().getStringProperty("testId");
+         if (messageID == null) {
+            logger.warn("no messageID defined on message");
+            errors.incrementAndGet();
+         }
+         if (notSupposedToWrite.get()) {
+            logger.warn("Not supposed to write message {}", 
m.getMessage().getStringProperty("testId"));
+            errors.incrementAndGet();
+         }
+         interceptedWrite.add(m.getMessage().getStringProperty("testId"));
+      };
+      ioSyncInterceptor = () -> {
+         if (pageStoreThrowsExceptions.get()) {
+            throw new NullPointerException("simulating a NPE on ioSync");
+         }
+      };
+
+      allowRunning.countDown();
+      // I don't want to mess with the Executor simulating to be on the
+      ExecutorService testExecutor = Executors.newFixedThreadPool(1);
+
+      AtomicBoolean running = new AtomicBoolean(true);
+      runAfter(() -> running.set(false));
+      runAfter(testExecutor::shutdownNow);
+
+      CountDownLatch runLatch = new CountDownLatch(1);
+      CyclicBarrier flagStart = new CyclicBarrier(2);
+
+      // stop.. start.. stop ... start..
+      testExecutor.execute(() -> {
+         try {
+            flagStart.await(10, TimeUnit.SECONDS);
+            while (running.get()) {
+               Thread.sleep(sleepTime);
+               logger.debug("Forcing page");
+
+               // this is simulating what would happen on replication, we lock 
the storage
+               // and force another page
+               // also no writing should happen while we hold the lock
+               realJournalStorageManager.writeLock();
+               try {
+                  notSupposedToWrite.set(true);
+                  Thread.sleep(sleepTime);
+                  pageStore.forceAnotherPage(false);
+                  notSupposedToWrite.set(false);
+               } finally {
+                  realJournalStorageManager.writeUnlock();
+               }
+               Thread.sleep(sleepTime);
+               logger.debug("stopping");
+               timer.stop();
+               pageStoreThrowsExceptions.set(true);
+               Thread.sleep(sleepTime);
+               pageStoreThrowsExceptions.set(false);
+               logger.debug("starting");
+               timer.start();
+            }
+         } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+            errors.incrementAndGet();
+         } finally {
+            runLatch.countDown();
+         }
+      });
+
+      flagStart.await(10, TimeUnit.SECONDS);
+
+      long timeout = System.currentTimeMillis() + totalTime;
+
+      RouteContextList routeContextListMocked = 
Mockito.mock(RouteContextList.class);
+
+      CountDownLatch committed = new CountDownLatch(1);
+
+      TransactionImpl tx = new TransactionImpl(realJournalStorageManager);
+      tx.afterStore(new TransactionOperationAbstract() {
+         @Override
+         public void afterCommit(Transaction tx) {
+            committed.countDown();
+         }
+      });
+      TransactionImplAccessor.setContainsPersistent(tx, true);
+
+      int sentNumber = 0;
+      while (timeout > System.currentTimeMillis()) {
+         try {
+            PagedMessage message = createPagedMessage();
+            message.getMessage().putStringProperty("testId", 
String.valueOf(sentNumber));
+            timer.addTask(context, message, tx, routeContextListMocked);
+            sentWrite.add(String.valueOf(sentNumber));
+            sentNumber++;
+            if (sentNumber % 1000 == 0) {
+               logger.info("Sent {}", sentNumber);
+            }
+         } catch (IllegalStateException notStarted) {
+            logger.debug("Retrying {}", sentNumber);
+            // ok
+         }
+      }
+
+      running.set(false);
+      assertTrue(runLatch.await(10, TimeUnit.SECONDS));
+      assertTrue(timer.isStarted());
+      timer.delay(); // calling one more delay as the last one done could 
still be missing
+      assertEquals(0, errors.get());
+
+      // not supposed to throw any exception
+      Wait.assertEquals(0, tx::getPendingDelay, 5000, 100);
+      tx.commit();
+      assertTrue(committed.await(10, TimeUnit.SECONDS));
+
+      int interceptorOriginalSize = interceptedWrite.size();
+      int sentOriginalSize = sentWrite.size();
+
+      interceptedWrite.forEach(s -> {
+         sentWrite.remove(s);
+      });
+      sentWrite.forEach(m -> {
+         logger.warn("message {} missed", m);
+      });
+
+      assertEquals(interceptorOriginalSize, sentOriginalSize);
+
+      assertEquals(0, sentWrite.size());
+
+      assertEquals(interceptorOriginalSize, sentOriginalSize);
+      assertEquals(sentNumber, interceptorOriginalSize);
+
+   }
+
+}
\ No newline at end of file
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
index bb232ded08..c889c46075 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
@@ -18,6 +18,7 @@ package 
org.apache.activemq.artemis.tests.unit.core.persistence.impl;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.File;
@@ -67,9 +68,9 @@ public class BatchIDGeneratorUnitTest extends 
ActiveMQTestBase {
 
       assertTrue(id4 > id3 && id4 < 2000);
 
-      batch.persistCurrentID();
-
+      batch.stop();
       journal.stop();
+      validateStoppedGenerator(batch);
       batch = new BatchingIDGenerator(0, 1000, 
getJournalStorageManager(journal));
       loadIDs(journal, batch);
 
@@ -100,8 +101,9 @@ public class BatchIDGeneratorUnitTest extends 
ActiveMQTestBase {
          lastId = id;
       }
 
-      batch.persistCurrentID();
+      batch.stop();
       journal.stop();
+      validateStoppedGenerator(batch);
       batch = new BatchingIDGenerator(0, 1000, 
getJournalStorageManager(journal));
       loadIDs(journal, batch);
 
@@ -117,6 +119,10 @@ public class BatchIDGeneratorUnitTest extends 
ActiveMQTestBase {
 
    }
 
+   private void validateStoppedGenerator(BatchingIDGenerator stoppedGenerator) 
{
+      assertThrowsExactly(RuntimeException.class, 
stoppedGenerator::generateID);
+   }
+
    protected void loadIDs(final Journal journal, final BatchingIDGenerator 
batch) throws Exception {
       List<RecordInfo> records = new ArrayList<>();
       List<PreparedTransactionInfo> tx = new ArrayList<>();
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
index b2305ec1f8..07a7a50c00 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java
@@ -59,6 +59,11 @@ public class FakeAcceptorFactory implements AcceptorFactory {
 
       }
 
+      @Override
+      public void notifyStop() {
+
+      }
+
       @Override
       public void updateInterceptors(List<BaseInterceptor> 
incomingInterceptors,
                                      List<BaseInterceptor> 
outgoingInterceptors) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to