This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new af796d5 ARTEMIS-2701 Improving DLQ/check over previously removed
records
new b2342b6 This closes #3067
af796d5 is described below
commit af796d5ce4129b9c0359d9868d434dadd3f30beb
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Apr 2 17:38:12 2020 -0400
ARTEMIS-2701 Improving DLQ/check over previously removed records
---
.../jdbc/store/journal/JDBCJournalImpl.java | 37 +++++
.../activemq/artemis/core/journal/Journal.java | 35 ++++-
.../core/journal/impl/FileWrapperJournal.java | 19 +++
.../artemis/core/journal/impl/JournalBase.java | 43 +++++-
.../artemis/core/journal/impl/JournalImpl.java | 85 ++++++++++-
.../core/protocol/openwire/OpenWireConnection.java | 6 +
.../artemis/core/persistence/StorageManager.java | 6 +-
.../journal/AbstractJournalStorageManager.java | 14 +-
.../impl/nullpm/NullStorageManager.java | 9 +-
.../core/replication/ReplicatedJournal.java | 61 ++++++++
.../artemis/core/server/impl/QueueImpl.java | 9 +-
.../core/transaction/impl/TransactionImplTest.java | 12 +-
.../tests/integration/client/ForceDeleteQueue.java | 155 +++++++++++++++++++++
.../tests/integration/client/SendAckFailTest.java | 12 +-
.../persistence/DeleteMessagesOnStartupTest.java | 4 +-
.../integration/replication/ReplicationTest.java | 34 +++++
.../integration/server/FakeStorageManager.java | 3 +-
.../servers/replicated-static0/broker.xml | 5 +
.../servers/replicated-static1/broker.xml | 5 +
.../smoke/replicationflow/SoakPagingTest.java | 27 ++--
.../core/journal/impl/JournalImplTestBase.java | 28 ++++
.../core/journal/impl/JournalImplTestUnit.java | 32 +++++
22 files changed, 581 insertions(+), 60 deletions(-)
diff --git
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 181a61c..f87d7a7 100644
---
a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -502,6 +502,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver
implements Journal {
}
@Override
+ public boolean tryAppendUpdateRecord(long id, byte recordType, byte[]
record, boolean sync) throws Exception {
+ appendUpdateRecord(id, recordType, record, sync);
+ return true;
+ }
+
+ @Override
public void appendUpdateRecord(long id, byte recordType, Persister
persister, Object record, boolean sync) throws Exception {
JDBCJournalRecord r = new JDBCJournalRecord(id,
JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
r.setUserRecordType(recordType);
@@ -517,6 +523,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver
implements Journal {
}
@Override
+ public boolean tryAppendUpdateRecord(long id, byte recordType, Persister
persister, Object record, boolean sync) throws Exception {
+ appendUpdateRecord(id, recordType, persister, record, sync);
+ return true;
+ }
+
+ @Override
public void appendUpdateRecord(long id,
byte recordType,
Persister persister,
@@ -539,6 +551,19 @@ public class JDBCJournalImpl extends AbstractJDBCDriver
implements Journal {
appendRecord(r);
}
+
+ @Override
+ public boolean tryAppendUpdateRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync,
+ IOCompletion completionCallback) throws
Exception {
+ appendUpdateRecord(id, recordType, persister, record, sync,
completionCallback);
+ return true;
+ }
+
+
@Override
public void appendDeleteRecord(long id, boolean sync) throws Exception {
checkStatus();
@@ -554,6 +579,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver
implements Journal {
}
@Override
+ public boolean tryAppendDeleteRecord(long id, boolean sync) throws
Exception {
+ appendDeleteRecord(id, sync);
+ return true;
+ }
+
+ @Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion
completionCallback) throws Exception {
checkStatus(completionCallback);
@@ -570,6 +601,12 @@ public class JDBCJournalImpl extends AbstractJDBCDriver
implements Journal {
}
@Override
+ public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion
completionCallback) throws Exception {
+ appendDeleteRecord(id, sync, completionCallback);
+ return true;
+ }
+
+ @Override
public void appendAddRecordTransactional(long txID, long id, byte
recordType, byte[] record) throws Exception {
checkStatus();
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index 15ba4d3..473ea1c 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -85,20 +85,36 @@ public interface Journal extends ActiveMQComponent {
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean
sync) throws Exception;
+ boolean tryAppendUpdateRecord(long id, byte recordType, byte[] record,
boolean sync) throws Exception;
+
default void appendUpdateRecord(long id, byte recordType, EncodingSupport
record, boolean sync) throws Exception {
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(),
record, sync);
}
+ default boolean tryAppendUpdateRecord(long id, byte recordType,
EncodingSupport record, boolean sync) throws Exception {
+ return tryAppendUpdateRecord(id, recordType,
EncoderPersister.getInstance(), record, sync);
+ }
+
void appendUpdateRecord(long id, byte recordType, Persister persister,
Object record, boolean sync) throws Exception;
+ boolean tryAppendUpdateRecord(long id, byte recordType, Persister
persister, Object record, boolean sync) throws Exception;
+
default void appendUpdateRecord(long id,
- byte recordType,
- EncodingSupport record,
- boolean sync,
- IOCompletion completionCallback) throws Exception {
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws
Exception {
appendUpdateRecord(id, recordType, EncoderPersister.getInstance(),
record, sync, completionCallback);
}
+ default boolean tryAppendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws
Exception {
+ return tryAppendUpdateRecord(id, recordType,
EncoderPersister.getInstance(), record, sync, completionCallback);
+ }
+
void appendUpdateRecord(long id,
byte recordType,
Persister persister,
@@ -106,10 +122,21 @@ public interface Journal extends ActiveMQComponent {
boolean sync,
IOCompletion callback) throws Exception;
+ boolean tryAppendUpdateRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync,
+ IOCompletion callback) throws Exception;
+
void appendDeleteRecord(long id, boolean sync) throws Exception;
+ boolean tryAppendDeleteRecord(long id, boolean sync) throws Exception;
+
void appendDeleteRecord(long id, boolean sync, IOCompletion
completionCallback) throws Exception;
+ boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion
completionCallback) throws Exception;
+
// Transactional operations
void appendAddRecordTransactional(long txID, long id, byte recordType,
byte[] record) throws Exception;
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index 85f55dc..c76ed68 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -172,6 +172,13 @@ public final class FileWrapperJournal extends JournalBase {
writeRecord(deleteRecord, false, -1, false, callback);
}
+
+ @Override
+ public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion
callback) throws Exception {
+ appendDeleteRecord(id, sync, callback);
+ return true;
+ }
+
@Override
public void appendDeleteRecordTransactional(long txID, long id,
EncodingSupport record) throws Exception {
JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID,
id, record);
@@ -200,6 +207,18 @@ public final class FileWrapperJournal extends JournalBase {
}
@Override
+ public boolean tryAppendUpdateRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync,
+ IOCompletion callback) throws Exception {
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id,
recordType, persister, record);
+ writeRecord(updateRecord, false, -1, false, callback);
+ return true;
+ }
+
+ @Override
public void appendUpdateRecordTransactional(long txID,
long id,
byte recordType,
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index 2c03f92..8c7a89b 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -71,13 +71,21 @@ abstract class JournalBase implements Journal {
@Override
public void appendUpdateRecord(final long id,
- final byte recordType,
- final byte[] record,
- final boolean sync) throws Exception {
+ final byte recordType,
+ final byte[] record,
+ final boolean sync) throws Exception {
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
@Override
+ public boolean tryAppendUpdateRecord(final long id,
+ final byte recordType,
+ final byte[] record,
+ final boolean sync) throws Exception {
+ return tryAppendUpdateRecord(id, recordType, new
ByteArrayEncoding(record), sync);
+ }
+
+ @Override
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -137,6 +145,23 @@ abstract class JournalBase implements Journal {
}
@Override
+ public boolean tryAppendUpdateRecord(final long id,
+ final byte recordType,
+ final Persister persister,
+ final Object record,
+ final boolean sync) throws Exception {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ boolean append = tryAppendUpdateRecord(id, recordType, persister,
record, sync, callback);
+
+ if (callback != null) {
+ callback.waitCompletion();
+ }
+
+ return append;
+ }
+
+ @Override
public void appendRollbackRecord(final long txID, final boolean sync)
throws Exception {
SyncIOCompletion syncCompletion = getSyncCallback(sync);
@@ -159,6 +184,18 @@ abstract class JournalBase implements Journal {
}
}
+ @Override
+ public boolean tryAppendDeleteRecord(final long id, final boolean sync)
throws Exception {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ boolean result = tryAppendDeleteRecord(id, sync, callback);
+
+ if (callback != null) {
+ callback.waitCompletion();
+ }
+
+ return result;
+ }
abstract void scheduleReclaim();
protected SyncIOCompletion getSyncCallback(final boolean sync) {
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 1161756..d5d4a81 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -884,7 +884,7 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
final IOCompletion callback) throws
Exception {
checkJournalIsLoaded();
lineUpContext(callback);
- checkKnownRecordID(id);
+ checkKnownRecordID(id, true);
if (logger.isTraceEnabled()) {
logger.trace("scheduling appendUpdateRecord::id=" + id +
@@ -892,8 +892,47 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
recordType);
}
- final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync,
callback);
+ internalAppendUpdateRecord(id, recordType, persister, record, sync,
callback);
+ }
+
+
+ @Override
+ public boolean tryAppendUpdateRecord(final long id,
+ final byte recordType,
+ final Persister persister,
+ final Object record,
+ final boolean sync,
+ final IOCompletion callback) throws
Exception {
+ checkJournalIsLoaded();
+ lineUpContext(callback);
+
+ if (!checkKnownRecordID(id, false)) {
+ if (callback != null) {
+ callback.done();
+ }
+ return false;
+ }
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("scheduling appendUpdateRecord::id=" + id +
+ ", userRecordType=" +
+ recordType);
+ }
+
+
+ internalAppendUpdateRecord(id, recordType, persister, record, sync,
callback);
+
+ return true;
+ }
+
+ private void internalAppendUpdateRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync,
+ IOCompletion callback) throws
InterruptedException, java.util.concurrent.ExecutionException {
+ final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync,
callback);
appendExecutor.execute(new Runnable() {
@Override
public void run() {
@@ -946,8 +985,37 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
checkJournalIsLoaded();
lineUpContext(callback);
- checkKnownRecordID(id);
+ checkKnownRecordID(id, true);
+
+ internalAppendDeleteRecord(id, sync, callback);
+ return;
+ }
+
+
+ @Override
+ public boolean tryAppendDeleteRecord(final long id, final boolean sync,
final IOCompletion callback) throws Exception {
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("scheduling appendDeleteRecord::id=" + id);
+ }
+
+
+ checkJournalIsLoaded();
+ lineUpContext(callback);
+ if (!checkKnownRecordID(id, false)) {
+ if (callback != null) {
+ callback.done();
+ }
+ return false;
+ }
+
+ internalAppendDeleteRecord(id, sync, callback);
+ return true;
+ }
+ private void internalAppendDeleteRecord(long id,
+ boolean sync,
+ IOCompletion callback) throws
InterruptedException, java.util.concurrent.ExecutionException {
final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync,
callback);
appendExecutor.execute(new Runnable() {
@Override
@@ -1055,9 +1123,9 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
});
}
- private void checkKnownRecordID(final long id) throws Exception {
+ private boolean checkKnownRecordID(final long id, boolean strict) throws
Exception {
if (records.containsKey(id) || pendingRecords.contains(id) || (compactor
!= null && compactor.containsRecord(id))) {
- return;
+ return true;
}
final SimpleFuture<Boolean> known = new SimpleFutureImpl<>();
@@ -1079,7 +1147,12 @@ public class JournalImpl extends JournalBase implements
TestableJournal, Journal
});
if (!known.get()) {
- throw new IllegalStateException("Cannot find add info " + id + " on
compactor or current records");
+ if (strict) {
+ throw new IllegalStateException("Cannot find add info " + id + "
on compactor or current records");
+ }
+ return false;
+ } else {
+ return true;
}
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 4509210..3a106cc 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1048,6 +1048,12 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
public void removeDestination(ActiveMQDestination dest) throws Exception {
if (dest.isQueue()) {
+
+ if (!dest.isTemporary()) {
+ // this should not really happen,
+ // so I'm not creating a Logger for this
+ logger.warn("OpenWire client sending a queue remove towards " +
dest.getPhysicalName());
+ }
try {
server.destroyQueue(new SimpleString(dest.getPhysicalName()),
getRemotingConnection());
} catch (ActiveMQNonExistentQueueException neq) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 39183cd..5bb91c0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -194,15 +194,15 @@ public interface StorageManager extends IDGenerator,
ActiveMQComponent {
void storeReference(long queueID, long messageID, boolean last) throws
Exception;
- void deleteMessage(long messageID) throws Exception;
+ boolean deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
void storeCursorAcknowledge(long queueID, PagePosition position) throws
Exception;
- void updateDeliveryCount(MessageReference ref) throws Exception;
+ boolean updateDeliveryCount(MessageReference ref) throws Exception;
- void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
+ boolean updateScheduledDeliveryTime(MessageReference ref) throws Exception;
void storeDuplicateID(SimpleString address, byte[] duplID, long recordID)
throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index fdb1e8c..33fc719 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -425,25 +425,25 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
}
@Override
- public void deleteMessage(final long messageID) throws Exception {
+ public boolean deleteMessage(final long messageID) throws Exception {
readLock();
try {
// Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on
the Executor
// increasing chances of losing deletes.
// The StorageManager should verify messages without references
- messageJournal.appendDeleteRecord(messageID, false,
getContext(false));
+ return messageJournal.tryAppendDeleteRecord(messageID, false,
getContext(false));
} finally {
readUnLock();
}
}
@Override
- public void updateScheduledDeliveryTime(final MessageReference ref) throws
Exception {
+ public boolean updateScheduledDeliveryTime(final MessageReference ref)
throws Exception {
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
ref.getQueue().getID());
readLock();
try {
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional,
getContext(syncNonTransactional));
+ return
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(),
JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional,
getContext(syncNonTransactional));
} finally {
readUnLock();
}
@@ -725,11 +725,11 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
// Other operations
@Override
- public void updateDeliveryCount(final MessageReference ref) throws
Exception {
+ public boolean updateDeliveryCount(final MessageReference ref) throws
Exception {
// no need to store if it's the same value
// otherwise the journal will get OME in case of lots of redeliveries
if (ref.getDeliveryCount() == ref.getPersistedCount()) {
- return;
+ return true;
}
ref.setPersistedCount(ref.getDeliveryCount());
@@ -737,7 +737,7 @@ public abstract class AbstractJournalStorageManager extends
CriticalComponentImp
readLock();
try {
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional,
getContext(syncNonTransactional));
+ return
messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(),
JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional,
getContext(syncNonTransactional));
} finally {
readUnLock();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 72953fa..a4a3f8a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -226,7 +226,8 @@ public class NullStorageManager implements StorageManager {
}
@Override
- public void deleteMessage(final long messageID) throws Exception {
+ public boolean deleteMessage(final long messageID) throws Exception {
+ return true;
}
@Override
@@ -238,7 +239,8 @@ public class NullStorageManager implements StorageManager {
}
@Override
- public void updateScheduledDeliveryTime(final MessageReference ref) throws
Exception {
+ public boolean updateScheduledDeliveryTime(final MessageReference ref)
throws Exception {
+ return true;
}
@Override
@@ -250,7 +252,8 @@ public class NullStorageManager implements StorageManager {
}
@Override
- public void updateDeliveryCount(final MessageReference ref) throws
Exception {
+ public boolean updateDeliveryCount(final MessageReference ref) throws
Exception {
+ return true;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index f81a31b..e66e9b3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -207,6 +207,21 @@ public class ReplicatedJournal implements Journal {
localJournal.appendDeleteRecord(id, sync);
}
+ /**
+ * @param id
+ * @param sync
+ * @throws Exception
+ * @see
org.apache.activemq.artemis.core.journal.Journal#appendDeleteRecord(long,
boolean)
+ */
+ @Override
+ public boolean tryAppendDeleteRecord(final long id, final boolean sync)
throws Exception {
+ if (log.isTraceEnabled()) {
+ log.trace("AppendDelete " + id);
+ }
+ replicationManager.appendDeleteRecord(journalID, id);
+ return localJournal.tryAppendDeleteRecord(id, sync);
+ }
+
@Override
public void appendDeleteRecord(final long id,
final boolean sync,
@@ -218,6 +233,16 @@ public class ReplicatedJournal implements Journal {
localJournal.appendDeleteRecord(id, sync, completionCallback);
}
+ @Override
+ public boolean tryAppendDeleteRecord(final long id,
+ final boolean sync,
+ final IOCompletion completionCallback)
throws Exception {
+ if (log.isTraceEnabled()) {
+ log.trace("AppendDelete " + id);
+ }
+ replicationManager.appendDeleteRecord(journalID, id);
+ return localJournal.tryAppendDeleteRecord(id, sync, completionCallback);
+ }
/**
* @param txID
* @param id
@@ -345,6 +370,15 @@ public class ReplicatedJournal implements Journal {
this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record),
sync);
}
+ @Override
+ public boolean tryAppendUpdateRecord(final long id,
+ final byte recordType,
+ final byte[] record,
+ final boolean sync) throws Exception {
+
+ return this.tryAppendUpdateRecord(id, recordType, new
ByteArrayEncoding(record), sync);
+ }
+
/**
* @param id
* @param recordType
@@ -367,6 +401,19 @@ public class ReplicatedJournal implements Journal {
}
@Override
+ public boolean tryAppendUpdateRecord(final long id,
+ final byte recordType,
+ final Persister persister,
+ final Object record,
+ final boolean sync) throws Exception {
+ if (log.isTraceEnabled()) {
+ log.trace("AppendUpdateRecord id = " + id + " , recordType = " +
recordType);
+ }
+ replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record);
+ return localJournal.tryAppendUpdateRecord(id, recordType, persister,
record, sync);
+ }
+
+ @Override
public void appendUpdateRecord(final long id,
final byte journalRecordType,
final Persister persister,
@@ -380,6 +427,20 @@ public class ReplicatedJournal implements Journal {
localJournal.appendUpdateRecord(id, journalRecordType, persister,
record, sync, completionCallback);
}
+ @Override
+ public boolean tryAppendUpdateRecord(final long id,
+ final byte journalRecordType,
+ final Persister persister,
+ final Object record,
+ final boolean sync,
+ final IOCompletion completionCallback)
throws Exception {
+ if (log.isTraceEnabled()) {
+ log.trace("AppendUpdateRecord id = " + id + " , recordType = " +
journalRecordType);
+ }
+ replicationManager.appendUpdateRecord(journalID,
ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record);
+ return localJournal.tryAppendUpdateRecord(id, journalRecordType,
persister, record, sync, completionCallback);
+ }
+
/**
* @param txID
* @param id
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7d798b7..d7c93ad 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3093,7 +3093,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
public Pair<Boolean, Boolean> checkRedelivery(final MessageReference
reference,
final long timeBase,
final boolean ignoreRedeliveryDelay) throws
Exception {
- Message message = reference.getMessage();
if (internalQueue) {
if (logger.isTraceEnabled()) {
@@ -3104,7 +3103,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
if (!internalQueue && reference.isDurable() && isDurable() &&
!reference.isPaged()) {
- storageManager.updateDeliveryCount(reference);
+ if (!storageManager.updateDeliveryCount(reference)) {
+ return new Pair<>(false, false);
+ }
}
AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
@@ -3739,7 +3740,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
// as we can't delete each messaging with sync=true while adding
messages transactionally.
// There is a startup check to remove non referenced messages case
these deletes fail
try {
- storageManager.deleteMessage(message.getMessageID());
+ if (!storageManager.deleteMessage(message.getMessageID())) {
+ ActiveMQServerLogger.LOGGER.errorRemovingMessage(new
Exception(), message.getMessageID());
+ }
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorRemovingMessage(e,
message.getMessageID());
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 471885c..f740ba8 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -354,8 +354,8 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
- public void deleteMessage(long messageID) throws Exception {
-
+ public boolean deleteMessage(long messageID) throws Exception {
+ return true;
}
@Override
@@ -369,13 +369,13 @@ public class TransactionImplTest extends ActiveMQTestBase
{
}
@Override
- public void updateDeliveryCount(MessageReference ref) throws Exception {
-
+ public boolean updateDeliveryCount(MessageReference ref) throws
Exception {
+ return true;
}
@Override
- public void updateScheduledDeliveryTime(MessageReference ref) throws
Exception {
-
+ public boolean updateScheduledDeliveryTime(MessageReference ref) throws
Exception {
+ return true;
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java
new file mode 100644
index 0000000..663e3f1
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ForceDeleteQueue.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ForceDeleteQueue extends ActiveMQTestBase {
+
+ ActiveMQServer server;
+ String protocol = "openwire";
+ String uri = "tcp://localhost:61616";
+
+ public ForceDeleteQueue(String protocol) {
+ this.protocol = protocol;
+ }
+
+ @Parameterized.Parameters(name = "protocol={0}")
+ public static Collection<Object[]> data() {
+ Object[][] params = new Object[][]{{"openwire"}, {"core"}, {"amqp"}};
+ return Arrays.asList(params);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ if (protocol.equals("openwire")) {
+ uri = "tcp://localhost:61616?jms.prefetchPolicy.all=5000";
+ }
+
+ server = createServer(true, true);
+ server.getAddressSettingsRepository().addMatch("#",
+ new
AddressSettings().setMaxDeliveryAttempts(2));
+
+ server.start();
+ }
+
+ @Test
+ public void testForceDelete() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString("testForceDelete");
+ server.addAddressInfo(new AddressInfo(queueName, RoutingType.ANYCAST));
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null,
true, false);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
uri);
+ Connection conn = factory.createConnection();
+
+ AssertionLoggerHandler.startCapture();
+ try {
+ Session session = conn.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName.toString());
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < 1000; i++) {
+ TextMessage message = session.createTextMessage("Text " + i);
+ producer.send(message);
+ }
+ session.commit();
+
+ org.apache.activemq.artemis.core.server.Queue serverQueue =
server.locateQueue(queueName);
+
+ Wait.assertEquals(1000, serverQueue::getMessageCount);
+
+ conn.close();
+
+ conn = factory.createConnection();
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ LinkedListIterator<MessageReference> queueiterator =
serverQueue.browserIterator();
+ ArrayList<Long> listQueue = new ArrayList<>(1000);
+
+ while (queueiterator.hasNext()) {
+ MessageReference ref = queueiterator.next();
+
+ listQueue.add(ref.getMessageID());
+ }
+
+ queueiterator.close();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Wait.assertTrue(() -> serverQueue.getDeliveringCount() > 100);
+
+ for (Long l : listQueue) {
+ // this is forcing an artificial situation where the message was
removed during a failure condition
+ server.getStorageManager().deleteMessage(l);
+ }
+
+ server.destroyQueue(queueName, null, false);
+
+ for (RemotingConnection connection :
server.getRemotingService().getConnections()) {
+ connection.fail(new ActiveMQException("failure"));
+ }
+
+
+ Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find add
info"));
+
+
+ } finally {
+ AssertionLoggerHandler.stopCapture();
+ try {
+ conn.close();
+ } catch (Throwable ignored) {
+ }
+ }
+
+ }
+
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index f2f63f5..3980b3b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -433,8 +433,8 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
- public void deleteMessage(long messageID) throws Exception {
- manager.deleteMessage(messageID);
+ public boolean deleteMessage(long messageID) throws Exception {
+ return manager.deleteMessage(messageID);
}
@Override
@@ -448,13 +448,13 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
- public void updateDeliveryCount(MessageReference ref) throws Exception {
- manager.updateDeliveryCount(ref);
+ public boolean updateDeliveryCount(MessageReference ref) throws
Exception {
+ return manager.updateDeliveryCount(ref);
}
@Override
- public void updateScheduledDeliveryTime(MessageReference ref) throws
Exception {
- manager.updateScheduledDeliveryTime(ref);
+ public boolean updateScheduledDeliveryTime(MessageReference ref) throws
Exception {
+ return manager.updateScheduledDeliveryTime(ref);
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
index e69d4d9..3fc2a05 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DeleteMessagesOnStartupTest.java
@@ -95,9 +95,9 @@ public class DeleteMessagesOnStartupTest extends
StorageManagerTestBase {
protected JournalStorageManager createJournalStorageManager(Configuration
configuration) {
return new JournalStorageManager(configuration,
EmptyCriticalAnalyzer.getInstance(), execFactory, execFactory) {
@Override
- public void deleteMessage(final long messageID) throws Exception {
+ public boolean deleteMessage(final long messageID) throws Exception {
deletedMessage.add(messageID);
- super.deleteMessage(messageID);
+ return super.deleteMessage(messageID);
}
};
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index 82165f0..ce56b9b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -650,6 +650,15 @@ public final class ReplicationTest extends
ActiveMQTestBase {
}
@Override
+ public boolean tryAppendUpdateRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync) throws Exception {
+ return true;
+ }
+
+ @Override
public void appendUpdateRecord(long id,
byte recordType,
Persister persister,
@@ -660,6 +669,16 @@ public final class ReplicationTest extends
ActiveMQTestBase {
}
@Override
+ public boolean tryAppendUpdateRecord(long id,
+ byte recordType,
+ Persister persister,
+ Object record,
+ boolean sync,
+ IOCompletion callback) throws
Exception {
+ return true;
+ }
+
+ @Override
public void appendAddRecordTransactional(long txID,
long id,
byte recordType,
@@ -730,6 +749,11 @@ public final class ReplicationTest extends
ActiveMQTestBase {
}
@Override
+ public boolean tryAppendDeleteRecord(long id, boolean sync) throws
Exception {
+ return true;
+ }
+
+ @Override
public void appendDeleteRecordTransactional(final long txID,
final long id,
final byte[] record) throws
Exception {
@@ -776,6 +800,11 @@ public final class ReplicationTest extends
ActiveMQTestBase {
}
@Override
+ public boolean tryAppendUpdateRecord(long id, byte recordType, byte[]
record, boolean sync) throws Exception {
+ return true;
+ }
+
+ @Override
public void appendUpdateRecord(final long id,
final byte recordType,
final EncodingSupport record,
@@ -876,6 +905,11 @@ public final class ReplicationTest extends
ActiveMQTestBase {
}
@Override
+ public boolean tryAppendDeleteRecord(long id, boolean sync, IOCompletion
completionCallback) throws Exception {
+ return true;
+ }
+
+ @Override
public void appendPrepareRecord(final long txID,
final EncodingSupport transactionData,
final boolean sync,
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
index 67cfe18..c38bc45 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/FakeStorageManager.java
@@ -39,8 +39,9 @@ public class FakeStorageManager extends NullStorageManager {
}
@Override
- public void deleteMessage(final long messageID) throws Exception {
+ public boolean deleteMessage(final long messageID) throws Exception {
messageIds.remove(messageID);
+ return true;
}
@Override
diff --git
a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml
b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml
index 4a9eb47..eb226e6 100644
--- a/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/replicated-static0/broker.xml
@@ -117,6 +117,11 @@ under the License.
<multicast>
</multicast>
</address>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>
diff --git
a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml
b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml
index 58a4b9f..4d62a6a 100644
--- a/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml
+++ b/tests/smoke-tests/src/main/resources/servers/replicated-static1/broker.xml
@@ -119,6 +119,11 @@ under the License.
<multicast>
</multicast>
</address>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
<address name="exampleQueue">
<anycast>
<queue name="exampleQueue"/>
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
index 954e85b..7da070c 100644
---
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/replicationflow/SoakPagingTest.java
@@ -41,6 +41,10 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class SoakPagingTest extends SmokeTestBase {
+ public static final int LAG_CONSUMER_TIME = 1000;
+ public static final int TIME_RUNNING = 4000;
+ public static final int CLIENT_KILLS = 2;
+
String protocol;
String consumerType;
boolean transaction;
@@ -86,12 +90,13 @@ public class SoakPagingTest extends SmokeTestBase {
private static ConnectionFactory createConnectionFactory(String protocol,
String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
- return new org.apache.activemq.ActiveMQConnectionFactory(uri);
+ return new org.apache.activemq.ActiveMQConnectionFactory("failover:("
+ uri + ")");
} else if (protocol.toUpperCase().equals("AMQP")) {
if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
+
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") ||
protocol.toUpperCase().equals("ARTEMIS")) {
@@ -158,25 +163,14 @@ public class SoakPagingTest extends SmokeTestBase {
@Test
public void testPagingReplication() throws Throwable {
- Process queueProcess = null;
- if (consumerType.equals("queue")) {
- queueProcess =
SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol,
consumerType, "45000", "" + transaction);
- }
+ server1 = startServer(SERVER_NAME_1, 0, 30000);
- for (int i = 0; i < 3; i++) {
- Process process =
SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol,
consumerType, "15000", "" + transaction);
-
- if (i == 0) {
- server1 = startServer(SERVER_NAME_1, 0, 30000);
- }
+ for (int i = 0; i < CLIENT_KILLS; i++) {
+ Process process =
SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol,
consumerType, "" + TIME_RUNNING, "" + transaction);
int result = process.waitFor();
Assert.assertTrue(result > 0);
}
-
- if (queueProcess != null) {
- Assert.assertTrue(queueProcess.waitFor() > 0);
- }
}
public void produce(ConnectionFactory factory) {
@@ -261,7 +255,8 @@ public class SoakPagingTest extends SmokeTestBase {
messageConsumer = session.createConsumer(address);
}
- Thread.sleep(5000);
+ if (LAG_CONSUMER_TIME > 0) Thread.sleep(LAG_CONSUMER_TIME);
+
connection.start();
int i = 0;
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
index be72ee6..d8dc8fc 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java
@@ -385,6 +385,20 @@ public abstract class JournalImplTestBase extends
ActiveMQTestBase {
journal.debugWait();
}
+ protected boolean tryUpdate(final long argument) throws Exception {
+ byte[] updateRecord = generateRecord(recordLength);
+
+ beforeJournalOperation();
+
+ boolean result = journal.tryAppendUpdateRecord(argument, (byte) 0,
updateRecord, sync);
+
+ if (result) {
+ records.add(new RecordInfo(argument, (byte) 0, updateRecord, true,
(short) 0));
+ }
+
+ return result;
+ }
+
protected void update(final long... arguments) throws Exception {
for (long element : arguments) {
byte[] updateRecord = generateRecord(recordLength);
@@ -411,6 +425,20 @@ public abstract class JournalImplTestBase extends
ActiveMQTestBase {
journal.debugWait();
}
+ protected boolean tryDelete(final long argument) throws Exception {
+ beforeJournalOperation();
+
+ boolean result = journal.tryAppendDeleteRecord(argument, sync);
+
+ if (result) {
+ removeRecordsForID(argument);
+ }
+
+ journal.debugWait();
+
+ return result;
+ }
+
protected void addTx(final long txID, final long... arguments) throws
Exception {
TransactionHolder tx = getTransaction(txID);
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
index 25b2413..838f334 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java
@@ -2689,6 +2689,22 @@ public abstract class JournalImplTestUnit extends
JournalImplTestBase {
}
@Test
+ public void testTryIsolation2() throws Exception {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, 1, 2, 3);
+
+ Assert.assertFalse(tryUpdate(1));
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ @Test
public void testIsolation3() throws Exception {
setup(10, 10 * 1024, true);
createJournal();
@@ -2708,6 +2724,22 @@ public abstract class JournalImplTestUnit extends
JournalImplTestBase {
loadAndCheck();
}
+ @Test
+ public void testTryDelete() throws Exception {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, 1, 2, 3);
+
+ Assert.assertFalse(tryDelete(1));
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
// XA tests
// ========