This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new af796d5  ARTEMIS-2701 Improving DLQ/check over previously removed 
records
     new b2342b6  This closes #3067
af796d5 is described below

commit af796d5ce4129b9c0359d9868d434dadd3f30beb
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Apr 2 17:38:12 2020 -0400

    ARTEMIS-2701 Improving DLQ/check over previously removed records
---
 .../jdbc/store/journal/JDBCJournalImpl.java        |  37 +++++
 .../activemq/artemis/core/journal/Journal.java     |  35 ++++-
 .../core/journal/impl/FileWrapperJournal.java      |  19 +++
 .../artemis/core/journal/impl/JournalBase.java     |  43 +++++-
 .../artemis/core/journal/impl/JournalImpl.java     |  85 ++++++++++-
 .../core/protocol/openwire/OpenWireConnection.java |   6 +
 .../artemis/core/persistence/StorageManager.java   |   6 +-
 .../journal/AbstractJournalStorageManager.java     |  14 +-
 .../impl/nullpm/NullStorageManager.java            |   9 +-
 .../core/replication/ReplicatedJournal.java        |  61 ++++++++
 .../artemis/core/server/impl/QueueImpl.java        |   9 +-
 .../core/transaction/impl/TransactionImplTest.java |  12 +-
 .../tests/integration/client/ForceDeleteQueue.java | 155 +++++++++++++++++++++
 .../tests/integration/client/SendAckFailTest.java  |  12 +-
 .../persistence/DeleteMessagesOnStartupTest.java   |   4 +-
 .../integration/replication/ReplicationTest.java   |  34 +++++
 .../integration/server/FakeStorageManager.java     |   3 +-
 .../servers/replicated-static0/broker.xml          |   5 +
 .../servers/replicated-static1/broker.xml          |   5 +
 .../smoke/replicationflow/SoakPagingTest.java      |  27 ++--
 .../core/journal/impl/JournalImplTestBase.java     |  28 ++++
 .../core/journal/impl/JournalImplTestUnit.java     |  32 +++++
 22 files changed, 581 insertions(+), 60 deletions(-)

diff --git 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 181a61c..f87d7a7 100644
--- 
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ 
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -502,6 +502,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
    }
 
    @Override
+   public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] 
record, boolean sync) throws Exception {
+      appendUpdateRecord(id, recordType, record, sync);
+      return true;
+   }
+
+   @Override
    public void appendUpdateRecord(long id, byte recordType, Persister 
persister, Object record, boolean sync) throws Exception {
       JDBCJournalRecord r = new JDBCJournalRecord(id, 
JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
@@ -517,6 +523,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
    }
 
    @Override
+   public boolean tryAppendUpdateRecord(long id, byte recordType, Persister 
persister, Object record, boolean sync) throws Exception {
+      appendUpdateRecord(id, recordType, persister, record, sync);
+      return true;
+   }
+
+   @Override
    public void appendUpdateRecord(long id,
                                   byte recordType,
                                   Persister persister,
@@ -539,6 +551,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
       appendRecord(r);
    }
 
+
+   @Override
+   public boolean tryAppendUpdateRecord(long id,
+                                  byte recordType,
+                                  Persister persister,
+                                  Object record,
+                                  boolean sync,
+                                  IOCompletion completionCallback) throws 
Exception {
+      appendUpdateRecord(id, recordType, persister, record, sync, 
completionCallback);
+      return true;
+   }
+
+
    @Override
    public void appendDeleteRecord(long id, boolean sync) throws Exception {
       checkStatus();
@@ -554,6 +579,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
    }
 
    @Override
+   public boolean tryAppendDeleteRecord(long id, boolean sync) throws 
Exception {
+      appendDeleteRecord(id, sync);
+      return true;
+   }
+
+   @Override
    public void appendDeleteRecord(long id, boolean sync, IOCompletion 
completionCallback) throws Exception {
       checkStatus(completionCallback);
 
@@ -570,6 +601,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver 
implements Journal {
    }
 
    @Override
+   public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion 
completionCallback) throws Exception {
+      appendDeleteRecord(id, sync, completionCallback);
+      return true;
+   }
+
+   @Override
    public void appendAddRecordTransactional(long txID, long id, byte 
recordType, byte[] record) throws Exception {
       checkStatus();
 
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 15ba4d3..473ea1c 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -85,20 +85,36 @@ public interface Journal extends ActiveMQComponent {
 
    void appendUpdateRecord(long id, byte recordType, byte[] record, boolean 
sync) throws Exception;
 
+   boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record, 
boolean sync) throws Exception;
+
    default void appendUpdateRecord(long id, byte recordType, EncodingSupport 
record, boolean sync) throws Exception {
       appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), 
record, sync);
    }
 
+   default boolean tryAppendUpdateRecord(long id, byte recordType, 
EncodingSupport record, boolean sync) throws Exception {
+      return tryAppendUpdateRecord(id, recordType, 
EncoderPersister.getInstance(), record, sync);
+   }
+
    void appendUpdateRecord(long id, byte recordType, Persister persister, 
Object record, boolean sync) throws Exception;
 
+   boolean tryAppendUpdateRecord(long id, byte recordType, Persister 
persister, Object record, boolean sync) throws Exception;
+
    default void appendUpdateRecord(long id,
-                           byte recordType,
-                           EncodingSupport record,
-                           boolean sync,
-                           IOCompletion completionCallback) throws Exception {
+                                   byte recordType,
+                                   EncodingSupport record,
+                                   boolean sync,
+                                   IOCompletion completionCallback) throws 
Exception {
       appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), 
record, sync, completionCallback);
    }
 
+   default boolean tryAppendUpdateRecord(long id,
+                                   byte recordType,
+                                   EncodingSupport record,
+                                   boolean sync,
+                                   IOCompletion completionCallback) throws 
Exception {
+      return tryAppendUpdateRecord(id, recordType, 
EncoderPersister.getInstance(), record, sync, completionCallback);
+   }
+
    void appendUpdateRecord(long id,
                            byte recordType,
                            Persister persister,
@@ -106,10 +122,21 @@ public interface Journal extends ActiveMQComponent {
                            boolean sync,
                            IOCompletion callback) throws Exception;
 
+   boolean tryAppendUpdateRecord(long id,
+                           byte recordType,
+                           Persister persister,
+                           Object record,
+                           boolean sync,
+                           IOCompletion callback) throws Exception;
+
    void appendDeleteRecord(long id, boolean sync) throws Exception;
 
+   boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception;
+
    void appendDeleteRecord(long id, boolean sync, IOCompletion 
completionCallback) throws Exception;
 
+   boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion 
completionCallback) throws Exception;
+
    // Transactional operations
 
    void appendAddRecordTransactional(long txID, long id, byte recordType, 
byte[] record) throws Exception;
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 85f55dc..c76ed68 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -172,6 +172,13 @@ public final class FileWrapperJournal extends JournalBase {
       writeRecord(deleteRecord, false, -1, false, callback);
    }
 
+
+   @Override
+   public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion 
callback) throws Exception {
+      appendDeleteRecord(id, sync, callback);
+      return true;
+   }
+
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, 
EncodingSupport record) throws Exception {
       JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, 
id, record);
@@ -200,6 +207,18 @@ public final class FileWrapperJournal extends JournalBase {
    }
 
    @Override
+   public boolean tryAppendUpdateRecord(long id,
+                                     byte recordType,
+                                     Persister persister,
+                                     Object record,
+                                     boolean sync,
+                                     IOCompletion callback) throws Exception {
+      JournalInternalRecord updateRecord = new JournalAddRecord(false, id, 
recordType, persister, record);
+      writeRecord(updateRecord, false, -1, false, callback);
+      return true;
+   }
+
+   @Override
    public void appendUpdateRecordTransactional(long txID,
                                                long id,
                                                byte recordType,
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index 2c03f92..8c7a89b 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -71,13 +71,21 @@ abstract class JournalBase implements Journal {
 
    @Override
    public void appendUpdateRecord(final long id,
-                                  final byte recordType,
-                                  final byte[] record,
-                                  final boolean sync) throws Exception {
+                                     final byte recordType,
+                                     final byte[] record,
+                                     final boolean sync) throws Exception {
       appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
    @Override
+   public boolean tryAppendUpdateRecord(final long id,
+                                     final byte recordType,
+                                     final byte[] record,
+                                     final boolean sync) throws Exception {
+      return tryAppendUpdateRecord(id, recordType, new 
ByteArrayEncoding(record), sync);
+   }
+
+   @Override
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
@@ -137,6 +145,23 @@ abstract class JournalBase implements Journal {
    }
 
    @Override
+   public boolean tryAppendUpdateRecord(final long id,
+                                     final byte recordType,
+                                     final Persister persister,
+                                     final Object record,
+                                     final boolean sync) throws Exception {
+      SyncIOCompletion callback = getSyncCallback(sync);
+
+      boolean append = tryAppendUpdateRecord(id, recordType, persister, 
record, sync, callback);
+
+      if (callback != null) {
+         callback.waitCompletion();
+      }
+
+      return append;
+   }
+
+   @Override
    public void appendRollbackRecord(final long txID, final boolean sync) 
throws Exception {
       SyncIOCompletion syncCompletion = getSyncCallback(sync);
 
@@ -159,6 +184,18 @@ abstract class JournalBase implements Journal {
       }
    }
 
+   @Override
+   public boolean tryAppendDeleteRecord(final long id, final boolean sync) 
throws Exception {
+      SyncIOCompletion callback = getSyncCallback(sync);
+
+      boolean result = tryAppendDeleteRecord(id, sync, callback);
+
+      if (callback != null) {
+         callback.waitCompletion();
+      }
+
+      return result;
+   }
    abstract void scheduleReclaim();
 
    protected SyncIOCompletion getSyncCallback(final boolean sync) {
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 1161756..d5d4a81 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -884,7 +884,7 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                                   final IOCompletion callback) throws 
Exception {
       checkJournalIsLoaded();
       lineUpContext(callback);
-      checkKnownRecordID(id);
+      checkKnownRecordID(id, true);
 
       if (logger.isTraceEnabled()) {
          logger.trace("scheduling appendUpdateRecord::id=" + id +
@@ -892,8 +892,47 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                          recordType);
       }
 
-      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);
+      internalAppendUpdateRecord(id, recordType, persister, record, sync, 
callback);
+   }
+
+
+   @Override
+   public boolean tryAppendUpdateRecord(final long id,
+                                  final byte recordType,
+                                  final Persister persister,
+                                  final Object record,
+                                  final boolean sync,
+                                  final IOCompletion callback) throws 
Exception {
+      checkJournalIsLoaded();
+      lineUpContext(callback);
+
+      if (!checkKnownRecordID(id, false)) {
+         if (callback != null) {
+            callback.done();
+         }
+         return false;
+      }
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendUpdateRecord::id=" + id +
+                         ", userRecordType=" +
+                         recordType);
+      }
+
+
+      internalAppendUpdateRecord(id, recordType, persister, record, sync, 
callback);
+
+      return true;
+   }
+
 
+   private void internalAppendUpdateRecord(long id,
+                                           byte recordType,
+                                           Persister persister,
+                                           Object record,
+                                           boolean sync,
+                                           IOCompletion callback) throws 
InterruptedException, java.util.concurrent.ExecutionException {
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
@@ -946,8 +985,37 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
 
       checkJournalIsLoaded();
       lineUpContext(callback);
-      checkKnownRecordID(id);
+      checkKnownRecordID(id, true);
+
+      internalAppendDeleteRecord(id, sync, callback);
+      return;
+   }
+
+
+   @Override
+   public boolean tryAppendDeleteRecord(final long id, final boolean sync, 
final IOCompletion callback) throws Exception {
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling appendDeleteRecord::id=" + id);
+      }
+
+
+      checkJournalIsLoaded();
+      lineUpContext(callback);
+      if (!checkKnownRecordID(id, false)) {
+         if (callback != null) {
+            callback.done();
+         }
+         return false;
+      }
+
+      internalAppendDeleteRecord(id, sync, callback);
+      return true;
+   }
 
+   private void internalAppendDeleteRecord(long id,
+                                           boolean sync,
+                                           IOCompletion callback) throws 
InterruptedException, java.util.concurrent.ExecutionException {
       final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, 
callback);
       appendExecutor.execute(new Runnable() {
          @Override
@@ -1055,9 +1123,9 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       });
    }
 
-   private void checkKnownRecordID(final long id) throws Exception {
+   private boolean checkKnownRecordID(final long id, boolean strict) throws 
Exception {
       if (records.containsKey(id) || pendingRecords.contains(id) || (compactor 
!= null && compactor.containsRecord(id))) {
-         return;
+         return true;
       }
 
       final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
@@ -1079,7 +1147,12 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       });
 
       if (!known.get()) {
-         throw new IllegalStateException("Cannot find add info " + id + " on 
compactor or current records");
+         if (strict) {
+            throw new IllegalStateException("Cannot find add info " + id + " 
on compactor or current records");
+         }
+         return false;
+      } else {
+         return true;
       }
    }
 
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 4509210..3a106cc 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1048,6 +1048,12 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    public void removeDestination(ActiveMQDestination dest) throws Exception {
       if (dest.isQueue()) {
+
+         if (!dest.isTemporary()) {
+            // this should not really happen,
+            // so I'm not creating a Logger for this
+            logger.warn("OpenWire client sending a queue remove towards " + 
dest.getPhysicalName());
+         }
          try {
             server.destroyQueue(new SimpleString(dest.getPhysicalName()), 
getRemotingConnection());
          } catch (ActiveMQNonExistentQueueException neq) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 39183cd..5bb91c0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -194,15 +194,15 @@ public interface StorageManager extends IDGenerator, 
ActiveMQComponent {
 
    void storeReference(long queueID, long messageID, boolean last) throws 
Exception;
 
-   void deleteMessage(long messageID) throws Exception;
+   boolean deleteMessage(long messageID) throws Exception;
 
    void storeAcknowledge(long queueID, long messageID) throws Exception;
 
    void storeCursorAcknowledge(long queueID, PagePosition position) throws 
Exception;
 
-   void updateDeliveryCount(MessageReference ref) throws Exception;
+   boolean updateDeliveryCount(MessageReference ref) throws Exception;
 
-   void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
+   boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception;
 
    void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) 
throws Exception;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index fdb1e8c..33fc719 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -425,25 +425,25 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    }
 
    @Override
-   public void deleteMessage(final long messageID) throws Exception {
+   public boolean deleteMessage(final long messageID) throws Exception {
       readLock();
       try {
          // Messages are deleted on postACK, one after another.
          // If these deletes are synchronized, we would build up messages on 
the Executor
          // increasing chances of losing deletes.
          // The StorageManager should verify messages without references
-         messageJournal.appendDeleteRecord(messageID, false, 
getContext(false));
+         return messageJournal.tryAppendDeleteRecord(messageID, false, 
getContext(false));
       } finally {
          readUnLock();
       }
    }
 
    @Override
-   public void updateScheduledDeliveryTime(final MessageReference ref) throws 
Exception {
+   public boolean updateScheduledDeliveryTime(final MessageReference ref) 
throws Exception {
       ScheduledDeliveryEncoding encoding = new 
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), 
ref.getQueue().getID());
       readLock();
       try {
-         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), 
JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, 
getContext(syncNonTransactional));
+         return 
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), 
JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, 
getContext(syncNonTransactional));
       } finally {
          readUnLock();
       }
@@ -725,11 +725,11 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    // Other operations
 
    @Override
-   public void updateDeliveryCount(final MessageReference ref) throws 
Exception {
+   public boolean updateDeliveryCount(final MessageReference ref) throws 
Exception {
       // no need to store if it's the same value
       // otherwise the journal will get OME in case of lots of redeliveries
       if (ref.getDeliveryCount() == ref.getPersistedCount()) {
-         return;
+         return true;
       }
 
       ref.setPersistedCount(ref.getDeliveryCount());
@@ -737,7 +737,7 @@ public abstract class AbstractJournalStorageManager extends 
CriticalComponentImp
 
       readLock();
       try {
-         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), 
JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, 
getContext(syncNonTransactional));
+         return 
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), 
JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, 
getContext(syncNonTransactional));
       } finally {
          readUnLock();
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 72953fa..a4a3f8a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -226,7 +226,8 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public void deleteMessage(final long messageID) throws Exception {
+   public boolean deleteMessage(final long messageID) throws Exception {
+      return true;
    }
 
    @Override
@@ -238,7 +239,8 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public void updateScheduledDeliveryTime(final MessageReference ref) throws 
Exception {
+   public boolean updateScheduledDeliveryTime(final MessageReference ref) 
throws Exception {
+      return true;
    }
 
    @Override
@@ -250,7 +252,8 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public void updateDeliveryCount(final MessageReference ref) throws 
Exception {
+   public boolean updateDeliveryCount(final MessageReference ref) throws 
Exception {
+      return true;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index f81a31b..e66e9b3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -207,6 +207,21 @@ public class ReplicatedJournal implements Journal {
       localJournal.appendDeleteRecord(id, sync);
    }
 
+   /**
+    * @param id
+    * @param sync
+    * @throws Exception
+    * @see 
org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long, 
boolean)
+    */
+   @Override
+   public boolean tryAppendDeleteRecord(final long id, final boolean sync) 
throws Exception {
+      if (log.isTraceEnabled()) {
+         log.trace("AppendDelete " + id);
+      }
+      replicationManager.appendDeleteRecord(journalID, id);
+      return localJournal.tryAppendDeleteRecord(id, sync);
+   }
+
    @Override
    public void appendDeleteRecord(final long id,
                                   final boolean sync,
@@ -218,6 +233,16 @@ public class ReplicatedJournal implements Journal {
       localJournal.appendDeleteRecord(id, sync, completionCallback);
    }
 
+   @Override
+   public boolean tryAppendDeleteRecord(final long id,
+                                  final boolean sync,
+                                  final IOCompletion completionCallback) 
throws Exception {
+      if (log.isTraceEnabled()) {
+         log.trace("AppendDelete " + id);
+      }
+      replicationManager.appendDeleteRecord(journalID, id);
+      return localJournal.tryAppendDeleteRecord(id, sync, completionCallback);
+   }
    /**
     * @param txID
     * @param id
@@ -345,6 +370,15 @@ public class ReplicatedJournal implements Journal {
       this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), 
sync);
    }
 
+   @Override
+   public boolean tryAppendUpdateRecord(final long id,
+                                     final byte recordType,
+                                     final byte[] record,
+                                     final boolean sync) throws Exception {
+
+      return this.tryAppendUpdateRecord(id, recordType, new 
ByteArrayEncoding(record), sync);
+   }
+
    /**
     * @param id
     * @param recordType
@@ -367,6 +401,19 @@ public class ReplicatedJournal implements Journal {
    }
 
    @Override
+   public boolean tryAppendUpdateRecord(final long id,
+                                  final byte recordType,
+                                  final Persister persister,
+                                  final Object record,
+                                  final boolean sync) throws Exception {
+      if (log.isTraceEnabled()) {
+         log.trace("AppendUpdateRecord id = " + id + " , recordType = " + 
recordType);
+      }
+      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
+      return localJournal.tryAppendUpdateRecord(id, recordType, persister, 
record, sync);
+   }
+
+   @Override
    public void appendUpdateRecord(final long id,
                                   final byte journalRecordType,
                                   final Persister persister,
@@ -380,6 +427,20 @@ public class ReplicatedJournal implements Journal {
       localJournal.appendUpdateRecord(id, journalRecordType, persister, 
record, sync, completionCallback);
    }
 
+   @Override
+   public boolean tryAppendUpdateRecord(final long id,
+                                  final byte journalRecordType,
+                                  final Persister persister,
+                                  final Object record,
+                                  final boolean sync,
+                                  final IOCompletion completionCallback) 
throws Exception {
+      if (log.isTraceEnabled()) {
+         log.trace("AppendUpdateRecord id = " + id + " , recordType = " + 
journalRecordType);
+      }
+      replicationManager.appendUpdateRecord(journalID, 
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
+      return localJournal.tryAppendUpdateRecord(id, journalRecordType, 
persister, record, sync, completionCallback);
+   }
+
    /**
     * @param txID
     * @param id
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7d798b7..d7c93ad 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3093,7 +3093,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    public Pair<Boolean, Boolean> checkRedelivery(final MessageReference 
reference,
                                   final long timeBase,
                                   final boolean ignoreRedeliveryDelay) throws 
Exception {
-      Message message = reference.getMessage();
 
       if (internalQueue) {
          if (logger.isTraceEnabled()) {
@@ -3104,7 +3103,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       }
 
       if (!internalQueue && reference.isDurable() && isDurable() && 
!reference.isPaged()) {
-         storageManager.updateDeliveryCount(reference);
+         if (!storageManager.updateDeliveryCount(reference)) {
+            return new Pair<>(false, false);
+         }
       }
 
       AddressSettings addressSettings = 
addressSettingsRepository.getMatch(address.toString());
@@ -3739,7 +3740,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             // as we can't delete each messaging with sync=true while adding 
messages transactionally.
             // There is a startup check to remove non referenced messages case 
these deletes fail
             try {
-               storageManager.deleteMessage(message.getMessageID());
+               if (!storageManager.deleteMessage(message.getMessageID())) {
+                  ActiveMQServerLogger.LOGGER.errorRemovingMessage(new 
Exception(), message.getMessageID());
+               }
             } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorRemovingMessage(e, 
message.getMessageID());
             }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 471885c..f740ba8 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -354,8 +354,8 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void deleteMessage(long messageID) throws Exception {
-
+      public boolean deleteMessage(long messageID) throws Exception {
+         return true;
       }
 
       @Override
@@ -369,13 +369,13 @@ public class TransactionImplTest extends ActiveMQTestBase 
{
       }
 
       @Override
-      public void updateDeliveryCount(MessageReference ref) throws Exception {
-
+      public boolean updateDeliveryCount(MessageReference ref) throws 
Exception {
+         return true;
       }
 
       @Override
-      public void updateScheduledDeliveryTime(MessageReference ref) throws 
Exception {
-
+      public boolean updateScheduledDeliveryTime(MessageReference ref) throws 
Exception {
+         return true;
       }
 
       @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java
new file mode 100644
index 0000000..663e3f1
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ForceDeleteQueue extends ActiveMQTestBase {
+
+   ActiveMQServer server;
+   String protocol = "openwire";
+   String uri = "tcp://localhost:61616";
+
+   public ForceDeleteQueue(String protocol) {
+      this.protocol = protocol;
+   }
+
+   @Parameterized.Parameters(name = "protocol={0}")
+   public static Collection<Object[]> data() {
+      Object[][] params = new Object[][]{{"openwire"}, {"core"}, {"amqp"}};
+      return Arrays.asList(params);
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      if (protocol.equals("openwire")) {
+         uri = "tcp://localhost:61616?jms.prefetchPolicy.all=5000";
+      }
+
+      server = createServer(true, true);
+      server.getAddressSettingsRepository().addMatch("#",
+                                                     new 
AddressSettings().setMaxDeliveryAttempts(2));
+
+      server.start();
+   }
+
+   @Test
+   public void testForceDelete() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString("testForceDelete");
+      server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
true, false);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
uri);
+      Connection conn = factory.createConnection();
+
+      AssertionLoggerHandler.startCapture();
+      try {
+         Session session = conn.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(queueName.toString());
+         MessageProducer producer = session.createProducer(queue);
+         for (int i = 0; i < 1000; i++) {
+            TextMessage message = session.createTextMessage("Text " + i);
+            producer.send(message);
+         }
+         session.commit();
+
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(queueName);
+
+         Wait.assertEquals(1000, serverQueue::getMessageCount);
+
+         conn.close();
+
+         conn = factory.createConnection();
+         session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn.start();
+
+         LinkedListIterator<MessageReference> queueiterator =  
serverQueue.browserIterator();
+         ArrayList<Long> listQueue = new ArrayList<>(1000);
+
+         while (queueiterator.hasNext()) {
+            MessageReference ref = queueiterator.next();
+
+            listQueue.add(ref.getMessageID());
+         }
+
+         queueiterator.close();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         Wait.assertTrue(() -> serverQueue.getDeliveringCount() > 100);
+
+         for (Long l : listQueue) {
+            // this is forcing an artificial situation where the message was 
removed during a failure condition
+            server.getStorageManager().deleteMessage(l);
+         }
+
+         server.destroyQueue(queueName, null, false);
+
+         for (RemotingConnection connection : 
server.getRemotingService().getConnections()) {
+            connection.fail(new ActiveMQException("failure"));
+         }
+
+
+         Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find add 
info"));
+
+
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+         try {
+            conn.close();
+         } catch (Throwable ignored) {
+         }
+      }
+
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index f2f63f5..3980b3b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -433,8 +433,8 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
-      public void deleteMessage(long messageID) throws Exception {
-         manager.deleteMessage(messageID);
+      public boolean deleteMessage(long messageID) throws Exception {
+         return manager.deleteMessage(messageID);
       }
 
       @Override
@@ -448,13 +448,13 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
-      public void updateDeliveryCount(MessageReference ref) throws Exception {
-         manager.updateDeliveryCount(ref);
+      public boolean updateDeliveryCount(MessageReference ref) throws 
Exception {
+         return manager.updateDeliveryCount(ref);
       }
 
       @Override
-      public void updateScheduledDeliveryTime(MessageReference ref) throws 
Exception {
-         manager.updateScheduledDeliveryTime(ref);
+      public boolean updateScheduledDeliveryTime(MessageReference ref) throws 
Exception {
+         return manager.updateScheduledDeliveryTime(ref);
       }
 
       @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index e69d4d9..3fc2a05 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends 
StorageManagerTestBase {
    protected JournalStorageManager createJournalStorageManager(Configuration 
configuration) {
       return new JournalStorageManager(configuration, 
EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
          @Override
-         public void deleteMessage(final long messageID) throws Exception {
+         public boolean deleteMessage(final long messageID) throws Exception {
             deletedMessage.add(messageID);
-            super.deleteMessage(messageID);
+            return super.deleteMessage(messageID);
          }
       };
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 82165f0..ce56b9b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -650,6 +650,15 @@ public final class ReplicationTest extends 
ActiveMQTestBase {
       }
 
       @Override
+      public boolean tryAppendUpdateRecord(long id,
+                                           byte recordType,
+                                           Persister persister,
+                                           Object record,
+                                           boolean sync) throws Exception {
+         return true;
+      }
+
+      @Override
       public void appendUpdateRecord(long id,
                                      byte recordType,
                                      Persister persister,
@@ -660,6 +669,16 @@ public final class ReplicationTest extends 
ActiveMQTestBase {
       }
 
       @Override
+      public boolean tryAppendUpdateRecord(long id,
+                                           byte recordType,
+                                           Persister persister,
+                                           Object record,
+                                           boolean sync,
+                                           IOCompletion callback) throws 
Exception {
+         return true;
+      }
+
+      @Override
       public void appendAddRecordTransactional(long txID,
                                                long id,
                                                byte recordType,
@@ -730,6 +749,11 @@ public final class ReplicationTest extends 
ActiveMQTestBase {
       }
 
       @Override
+      public boolean tryAppendDeleteRecord(long id, boolean sync) throws 
Exception {
+         return true;
+      }
+
+      @Override
       public void appendDeleteRecordTransactional(final long txID,
                                                   final long id,
                                                   final byte[] record) throws 
Exception {
@@ -776,6 +800,11 @@ public final class ReplicationTest extends 
ActiveMQTestBase {
       }
 
       @Override
+      public boolean tryAppendUpdateRecord(long id, byte recordType, byte[] 
record, boolean sync) throws Exception {
+         return true;
+      }
+
+      @Override
       public void appendUpdateRecord(final long id,
                                      final byte recordType,
                                      final EncodingSupport record,
@@ -876,6 +905,11 @@ public final class ReplicationTest extends 
ActiveMQTestBase {
       }
 
       @Override
+      public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion 
completionCallback) throws Exception {
+         return true;
+      }
+
+      @Override
       public void appendPrepareRecord(final long txID,
                                       final EncodingSupport transactionData,
                                       final boolean sync,
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
index 67cfe18..c38bc45 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
@@ -39,8 +39,9 @@ public class FakeStorageManager extends NullStorageManager {
    }
 
    @Override
-   public void deleteMessage(final long messageID) throws Exception {
+   public boolean deleteMessage(final long messageID) throws Exception {
       messageIds.remove(messageID);
+      return true;
    }
 
    @Override
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml 
b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml
index 4a9eb47..eb226e6 100644
--- a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml
@@ -117,6 +117,11 @@ under the License.
             <multicast>
             </multicast>
          </address>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
          <address name="exampleQueue">
             <anycast>
                <queue name="exampleQueue"/>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml 
b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml
index 58a4b9f..4d62a6a 100644
--- a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml
@@ -119,6 +119,11 @@ under the License.
             <multicast>
             </multicast>
          </address>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
          <address name="exampleQueue">
             <anycast>
                <queue name="exampleQueue"/>
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
index 954e85b..7da070c 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
@@ -41,6 +41,10 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class SoakPagingTest extends SmokeTestBase {
 
+   public static final int LAG_CONSUMER_TIME = 1000;
+   public static final int TIME_RUNNING = 4000;
+   public static final int CLIENT_KILLS = 2;
+
    String protocol;
    String consumerType;
    boolean transaction;
@@ -86,12 +90,13 @@ public class SoakPagingTest extends SmokeTestBase {
 
    private static ConnectionFactory createConnectionFactory(String protocol, 
String uri) {
       if (protocol.toUpperCase().equals("OPENWIRE")) {
-         return new org.apache.activemq.ActiveMQConnectionFactory(uri);
+         return new org.apache.activemq.ActiveMQConnectionFactory("failover:(" 
+ uri + ")");
       } else if (protocol.toUpperCase().equals("AMQP")) {
 
          if (uri.startsWith("tcp://")) {
             // replacing tcp:// by amqp://
             uri = "amqp" + uri.substring(3);
+
          }
          return new JmsConnectionFactory(uri);
       } else if (protocol.toUpperCase().equals("CORE") || 
protocol.toUpperCase().equals("ARTEMIS")) {
@@ -158,25 +163,14 @@ public class SoakPagingTest extends SmokeTestBase {
    @Test
    public void testPagingReplication() throws Throwable {
 
-      Process queueProcess = null;
-      if (consumerType.equals("queue")) {
-         queueProcess = 
SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, 
consumerType, "45000", "" + transaction);
-      }
+      server1 = startServer(SERVER_NAME_1, 0, 30000);
 
-      for (int i = 0; i < 3; i++) {
-         Process process = 
SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, 
consumerType, "15000", "" + transaction);
-
-         if (i == 0) {
-            server1 = startServer(SERVER_NAME_1, 0, 30000);
-         }
+      for (int i = 0; i < CLIENT_KILLS; i++) {
+         Process process = 
SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, 
consumerType, "" + TIME_RUNNING, "" + transaction);
 
          int result = process.waitFor();
          Assert.assertTrue(result > 0);
       }
-
-      if (queueProcess != null) {
-         Assert.assertTrue(queueProcess.waitFor() > 0);
-      }
    }
 
    public void produce(ConnectionFactory factory) {
@@ -261,7 +255,8 @@ public class SoakPagingTest extends SmokeTestBase {
             messageConsumer = session.createConsumer(address);
          }
 
-         Thread.sleep(5000);
+         if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME);
+
          connection.start();
 
          int i = 0;
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
index be72ee6..d8dc8fc 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
@@ -385,6 +385,20 @@ public abstract class JournalImplTestBase extends 
ActiveMQTestBase {
       journal.debugWait();
    }
 
+   protected boolean tryUpdate(final long  argument) throws Exception {
+      byte[] updateRecord = generateRecord(recordLength);
+
+      beforeJournalOperation();
+
+      boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0, 
updateRecord, sync);
+
+      if (result) {
+         records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, 
(short) 0));
+      }
+
+      return result;
+   }
+
    protected void update(final long... arguments) throws Exception {
       for (long element : arguments) {
          byte[] updateRecord = generateRecord(recordLength);
@@ -411,6 +425,20 @@ public abstract class JournalImplTestBase extends 
ActiveMQTestBase {
       journal.debugWait();
    }
 
+   protected boolean tryDelete(final long argument) throws Exception {
+      beforeJournalOperation();
+
+      boolean result = journal.tryAppendDeleteRecord(argument, sync);
+
+      if (result) {
+         removeRecordsForID(argument);
+      }
+
+      journal.debugWait();
+
+      return result;
+   }
+
    protected void addTx(final long txID, final long... arguments) throws 
Exception {
       TransactionHolder tx = getTransaction(txID);
 
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index 25b2413..838f334 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -2689,6 +2689,22 @@ public abstract class JournalImplTestUnit extends 
JournalImplTestBase {
    }
 
    @Test
+   public void testTryIsolation2() throws Exception {
+      setup(10, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      addTx(1, 1, 2, 3);
+
+      Assert.assertFalse(tryUpdate(1));
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
+   @Test
    public void testIsolation3() throws Exception {
       setup(10, 10 * 1024, true);
       createJournal();
@@ -2708,6 +2724,22 @@ public abstract class JournalImplTestUnit extends 
JournalImplTestBase {
       loadAndCheck();
    }
 
+   @Test
+   public void testTryDelete() throws Exception {
+      setup(10, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      addTx(1, 1, 2, 3);
+
+      Assert.assertFalse(tryDelete(1));
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+   }
+
    // XA tests
    // ========
 

Reply via email to