This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 98a6e42 ARTEMIS-3554 Invalid Prepared Transaction could interrupt
server reload
98a6e42 is described below
commit 98a6e42a57038e2a97d13c2cf1d3a70dd30f5ae1
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 4 20:56:06 2021 -0400
ARTEMIS-3554 Invalid Prepared Transaction could interrupt server reload
---
.../journal/AbstractJournalStorageManager.java | 284 ++++++++++++---------
.../artemis/core/server/ActiveMQServerLogger.java | 8 +
.../tests/integration/paging/PagingTest.java | 106 ++++++++
3 files changed, 273 insertions(+), 125 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 10c3399..b7d3101 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
import javax.transaction.xa.Xid;
@@ -1205,7 +1206,7 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
journalLoader.handleAddMessage(queueMap);
- loadPreparedTransactions(postOffice, pagingManager, resourceManager,
queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions,
pendingLargeMessages, journalLoader);
+ loadPreparedTransactions(postOffice, pagingManager, resourceManager,
queueInfos, preparedTransactions, this::failedToPrepareException,
pageSubscriptions, pendingLargeMessages, journalLoader);
for (PageSubscription sub : pageSubscriptions.values()) {
sub.getCounter().processReload();
@@ -1236,6 +1237,22 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
}
}
+ private void failedToPrepareException(PreparedTransactionInfo txInfo,
Throwable e) {
+ XidEncoding encodingXid = null;
+ try {
+ encodingXid = new XidEncoding(txInfo.getExtraData());
+ } catch (Throwable ignored) {
+ }
+
+ ActiveMQServerLogger.LOGGER.failedToLoadPreparedTX(e,
String.valueOf(encodingXid != null ? encodingXid.xid : null));
+
+ try {
+ rollback(txInfo.getId());
+ } catch (Throwable e2) {
+ logger.warn(e.getMessage(), e2);
+ }
+ }
+
private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer
buff) {
Message message = MessagePersister.getInstance().decode(buff, null,
pools, this);
return message;
@@ -1702,195 +1719,212 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
final ResourceManager resourceManager,
final Map<Long, QueueBindingInfo>
queueInfos,
final List<PreparedTransactionInfo>
preparedTransactions,
- final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap,
+ final
BiConsumer<PreparedTransactionInfo, Throwable> failedTransactionCallback,
final Map<Long, PageSubscription>
pageSubscriptions,
final Set<Pair<Long, Long>>
pendingLargeMessages,
JournalLoader journalLoader) throws
Exception {
// recover prepared transactions
- CoreMessageObjectPools pools = null;
+ final CoreMessageObjectPools pools = new CoreMessageObjectPools();
for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
{
- XidEncoding encodingXid = new
XidEncoding(preparedTransaction.getExtraData());
-
- Xid xid = encodingXid.xid;
+ try {
+ loadSinglePreparedTransaction(postOffice, pagingManager,
resourceManager, queueInfos, pageSubscriptions, pendingLargeMessages,
journalLoader, pools, preparedTransaction);
+ } catch (Throwable e) {
+ if (failedTransactionCallback != null) {
+ failedTransactionCallback.accept(preparedTransaction, e);
+ } else {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
- Transaction tx = new TransactionImpl(preparedTransaction.getId(),
xid, this);
+ private void loadSinglePreparedTransaction(PostOffice postOffice,
+ PagingManager pagingManager,
+ ResourceManager resourceManager,
+ Map<Long, QueueBindingInfo> queueInfos,
+ Map<Long, PageSubscription> pageSubscriptions,
+ Set<Pair<Long, Long>> pendingLargeMessages,
+ JournalLoader journalLoader,
+ CoreMessageObjectPools pools,
+ PreparedTransactionInfo preparedTransaction) throws
Exception {
+ XidEncoding encodingXid = new
XidEncoding(preparedTransaction.getExtraData());
- List<MessageReference> referencesToAck = new ArrayList<>();
+ Xid xid = encodingXid.xid;
- Map<Long, Message> messages = new HashMap<>();
+ Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid,
this);
- // Use same method as load message journal to prune out acks, so they
don't get added.
- // Then have reacknowledge(tx) methods on queue, which needs to add
the page size
+ List<MessageReference> referencesToAck = new ArrayList<>();
- // first get any sent messages for this tx and recreate
- for (RecordInfo record : preparedTransaction.getRecords()) {
- byte[] data = record.data;
+ Map<Long, Message> messages = new HashMap<>();
- ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+ // Use same method as load message journal to prune out acks, so they
don't get added.
+ // Then have reacknowledge(tx) methods on queue, which needs to add the
page size
- byte recordType = record.getUserRecordType();
+ // first get any sent messages for this tx and recreate
+ for (RecordInfo record : preparedTransaction.getRecords()) {
+ byte[] data = record.data;
- switch (recordType) {
- case JournalRecordIds.ADD_LARGE_MESSAGE: {
- messages.put(record.id, parseLargeMessage(buff).toMessage());
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
- break;
- }
- case JournalRecordIds.ADD_MESSAGE: {
+ byte recordType = record.getUserRecordType();
- break;
- }
- case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
- if (pools == null) {
- pools = new CoreMessageObjectPools();
- }
- Message message = decodeMessage(pools, buff);
+ switch (recordType) {
+ case JournalRecordIds.ADD_LARGE_MESSAGE: {
+ messages.put(record.id, parseLargeMessage(buff).toMessage());
- messages.put(record.id, message);
+ break;
+ }
+ case JournalRecordIds.ADD_MESSAGE: {
- break;
- }
- case JournalRecordIds.ADD_REF: {
- long messageID = record.id;
+ break;
+ }
+ case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
+ Message message = decodeMessage(pools, buff);
- RefEncoding encoding = new RefEncoding();
+ messages.put(record.id, message);
- encoding.decode(buff);
+ break;
+ }
+ case JournalRecordIds.ADD_REF: {
+ long messageID = record.id;
- Message message = messages.get(messageID);
+ RefEncoding encoding = new RefEncoding();
- if (message == null) {
- throw new IllegalStateException("Cannot find message with
id " + messageID);
- }
+ encoding.decode(buff);
- journalLoader.handlePreparedSendMessage(message, tx,
encoding.queueID);
+ Message message = messages.get(messageID);
- break;
+ if (message == null) {
+ throw new IllegalStateException("Cannot find message with id
" + messageID);
}
- case JournalRecordIds.ACKNOWLEDGE_REF: {
- long messageID = record.id;
-
- RefEncoding encoding = new RefEncoding();
- encoding.decode(buff);
+ journalLoader.handlePreparedSendMessage(message, tx,
encoding.queueID);
- journalLoader.handlePreparedAcknowledge(messageID,
referencesToAck, encoding.queueID);
+ break;
+ }
+ case JournalRecordIds.ACKNOWLEDGE_REF: {
+ long messageID = record.id;
- break;
- }
- case JournalRecordIds.PAGE_TRANSACTION: {
+ RefEncoding encoding = new RefEncoding();
- PageTransactionInfo pageTransactionInfo = new
PageTransactionInfoImpl();
+ encoding.decode(buff);
- pageTransactionInfo.decode(buff);
+ journalLoader.handlePreparedAcknowledge(messageID,
referencesToAck, encoding.queueID);
- if (record.isUpdate) {
- PageTransactionInfo pgTX =
pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
- if (pgTX != null) {
- pgTX.reloadUpdate(this, pagingManager, tx,
pageTransactionInfo.getNumberOfMessages());
- }
- } else {
- pageTransactionInfo.setCommitted(false);
+ break;
+ }
+ case JournalRecordIds.PAGE_TRANSACTION: {
-
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION,
pageTransactionInfo);
+ PageTransactionInfo pageTransactionInfo = new
PageTransactionInfoImpl();
- pagingManager.addTransaction(pageTransactionInfo);
+ pageTransactionInfo.decode(buff);
- tx.addOperation(new FinishPageMessageOperation());
+ if (record.isUpdate) {
+ PageTransactionInfo pgTX =
pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
+ if (pgTX != null) {
+ pgTX.reloadUpdate(this, pagingManager, tx,
pageTransactionInfo.getNumberOfMessages());
}
+ } else {
+ pageTransactionInfo.setCommitted(false);
- break;
- }
- case SET_SCHEDULED_DELIVERY_TIME: {
- // Do nothing - for prepared txs, the set scheduled delivery
time will only occur in a send in which
- // case the message will already have the header for the
scheduled delivery time, so no need to do
- // anything.
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION,
pageTransactionInfo);
- break;
+ pagingManager.addTransaction(pageTransactionInfo);
+
+ tx.addOperation(new FinishPageMessageOperation());
}
- case DUPLICATE_ID: {
- // We need load the duplicate ids at prepare time too
- DuplicateIDEncoding encoding = new DuplicateIDEncoding();
- encoding.decode(buff);
+ break;
+ }
+ case SET_SCHEDULED_DELIVERY_TIME: {
+ // Do nothing - for prepared txs, the set scheduled delivery
time will only occur in a send in which
+ // case the message will already have the header for the
scheduled delivery time, so no need to do
+ // anything.
+
+ break;
+ }
+ case DUPLICATE_ID: {
+ // We need load the duplicate ids at prepare time too
+ DuplicateIDEncoding encoding = new DuplicateIDEncoding();
- DuplicateIDCache cache =
postOffice.getDuplicateIDCache(encoding.address);
+ encoding.decode(buff);
- cache.load(tx, encoding.duplID);
+ DuplicateIDCache cache =
postOffice.getDuplicateIDCache(encoding.address);
- break;
- }
- case ACKNOWLEDGE_CURSOR: {
- CursorAckRecordEncoding encoding = new
CursorAckRecordEncoding();
- encoding.decode(buff);
+ cache.load(tx, encoding.duplID);
- encoding.position.setRecordID(record.id);
+ break;
+ }
+ case ACKNOWLEDGE_CURSOR: {
+ CursorAckRecordEncoding encoding = new
CursorAckRecordEncoding();
+ encoding.decode(buff);
- PageSubscription sub = locateSubscription(encoding.queueID,
pageSubscriptions, queueInfos, pagingManager);
+ encoding.position.setRecordID(record.id);
- if (sub != null) {
- sub.reloadPreparedACK(tx, encoding.position);
- referencesToAck.add(new
PagedReferenceImpl(encoding.position, null, sub));
- } else {
-
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
- }
- break;
- }
- case PAGE_CURSOR_COUNTER_VALUE: {
- ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared();
+ PageSubscription sub = locateSubscription(encoding.queueID,
pageSubscriptions, queueInfos, pagingManager);
- break;
+ if (sub != null) {
+ sub.reloadPreparedACK(tx, encoding.position);
+ referencesToAck.add(new
PagedReferenceImpl(encoding.position, null, sub));
+ } else {
+
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
}
+ break;
+ }
+ case PAGE_CURSOR_COUNTER_VALUE: {
+ ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared();
- case PAGE_CURSOR_COUNTER_INC: {
- PageCountRecordInc encoding = new PageCountRecordInc();
+ break;
+ }
- encoding.decode(buff);
+ case PAGE_CURSOR_COUNTER_INC: {
+ PageCountRecordInc encoding = new PageCountRecordInc();
- PageSubscription sub =
locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos,
pagingManager);
+ encoding.decode(buff);
- if (sub != null) {
- sub.getCounter().applyIncrementOnTX(tx, record.id,
encoding.getValue(), encoding.getPersistentSize());
- sub.notEmpty();
- } else {
-
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
- }
+ PageSubscription sub =
locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos,
pagingManager);
- break;
+ if (sub != null) {
+ sub.getCounter().applyIncrementOnTX(tx, record.id,
encoding.getValue(), encoding.getPersistentSize());
+ sub.notEmpty();
+ } else {
+
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
}
- default: {
-
ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType);
- }
+ break;
+ }
+
+ default: {
+
ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType);
}
}
+ }
- for (RecordInfo recordDeleted :
preparedTransaction.getRecordsToDelete()) {
- byte[] data = recordDeleted.data;
+ for (RecordInfo recordDeleted :
preparedTransaction.getRecordsToDelete()) {
+ byte[] data = recordDeleted.data;
- if (data.length > 0) {
- ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
- byte b = buff.readByte();
+ if (data.length > 0) {
+ ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+ byte b = buff.readByte();
- switch (b) {
- case ADD_LARGE_MESSAGE_PENDING: {
- long messageID = buff.readLong();
- if (!pendingLargeMessages.remove(new
Pair<>(recordDeleted.id, messageID))) {
-
ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
- }
- installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
- break;
+ switch (b) {
+ case ADD_LARGE_MESSAGE_PENDING: {
+ long messageID = buff.readLong();
+ if (!pendingLargeMessages.remove(new
Pair<>(recordDeleted.id, messageID))) {
+
ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
}
- default:
-
ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
+ installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
+ break;
}
+ default:
+
ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
}
-
}
- journalLoader.handlePreparedTransaction(tx, referencesToAck, xid,
resourceManager);
}
+
+ journalLoader.handlePreparedTransaction(tx, referencesToAck, xid,
resourceManager);
}
OperationContext getContext(final boolean sync) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index ac75bec..27d3d3d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1768,6 +1768,14 @@ public interface ActiveMQServerLogger extends
BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void federationDispatchError(@Cause Throwable e, String message);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222306, value = "Failed to load prepared TX and it will be
rolled back: {0}",
+ format = Message.Format.MESSAGE_FORMAT)
+ void failedToLoadPreparedTX(@Cause Throwable e, String message);
+
+
+
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format =
Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 29a6694..86de254 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -1314,6 +1314,112 @@ public class PagingTest extends ActiveMQTestBase {
session.close();
}
+
+ @Test
+ public void testPreparedACKRemoveAndRestart() throws Exception {
+ Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE);
+
+ clearDataRecreateServerDirs();
+
+ Configuration config =
createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX);
+
+ server.start();
+
+ final int numberOfMessages = 10;
+
+ locator =
createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setAckBatchSize(0);
+
+ sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ queue.getPageSubscription().getPagingStore().startPaging();
+
+ forcePage(queue);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage message = session.createMessage(true);
+
+ message.putIntProperty("count", i);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+
+ if (i == 4) {
+ session.commit();
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(true, false, false);
+
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i <= 4; i++) {
+ Xid xidConsumeNoCommit = newXID();
+ session.start(xidConsumeNoCommit, XAResource.TMNOFLAGS);
+ // First message is consumed, prepared, will be rolled back later
+ ClientMessage firstMessageConsumed = cons.receive(5000);
+ assertNotNull(firstMessageConsumed);
+ firstMessageConsumed.acknowledge();
+ session.end(xidConsumeNoCommit, XAResource.TMSUCCESS);
+ session.prepare(xidConsumeNoCommit);
+ }
+
+ File pagingFolder =
queue.getPageSubscription().getPagingStore().getFolder();
+
+ server.stop();
+
+ // remove the very first page. a restart should not fail
+ File fileToRemove = new File(pagingFolder, "000000001.page");
+ Assert.assertTrue(fileToRemove.delete());
+
+ server.start();
+
+ sf = createSessionFactory(locator);
+
+ session = sf.createSession(false, true, true);
+
+ cons = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 5; i < numberOfMessages; i++) {
+ ClientMessage message = cons.receive(1000);
+ assertNotNull(message);
+ assertEquals(i, message.getIntProperty("count").intValue());
+ message.acknowledge();
+ }
+ assertNull(cons.receiveImmediate());
+ session.commit();
+ }
+
/**
* @param queue
* @throws InterruptedException