This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.19.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit b1db573d99c0c305f6061216983b1dc3a05213a0 Author: Clebert Suconic <[email protected]> AuthorDate: Thu Nov 4 20:56:06 2021 -0400 ARTEMIS-3554 Invalid Prepared Transaction could interrupt server reload (cherry picked from commit 98a6e42a57038e2a97d13c2cf1d3a70dd30f5ae1) --- .../journal/AbstractJournalStorageManager.java | 284 ++++++++++++--------- .../artemis/core/server/ActiveMQServerLogger.java | 8 + .../tests/integration/paging/PagingTest.java | 106 ++++++++ 3 files changed, 273 insertions(+), 125 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 314982e..a3e5044 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; import javax.transaction.xa.Xid; @@ -1205,7 +1206,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp journalLoader.handleAddMessage(queueMap); - loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions, pendingLargeMessages, journalLoader); + loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, this::failedToPrepareException, pageSubscriptions, pendingLargeMessages, journalLoader); for (PageSubscription sub : pageSubscriptions.values()) { sub.getCounter().processReload(); @@ -1236,6 +1237,22 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } } + private void failedToPrepareException(PreparedTransactionInfo txInfo, Throwable e) { + XidEncoding encodingXid = null; + try { + encodingXid = new XidEncoding(txInfo.getExtraData()); + } catch (Throwable ignored) { + } + + ActiveMQServerLogger.LOGGER.failedToLoadPreparedTX(e, String.valueOf(encodingXid != null ? encodingXid.xid : null)); + + try { + rollback(txInfo.getId()); + } catch (Throwable e2) { + logger.warn(e.getMessage(), e2); + } + } + private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer buff) { Message message = MessagePersister.getInstance().decode(buff, null, pools, this); return message; @@ -1706,195 +1723,212 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp final ResourceManager resourceManager, final Map<Long, QueueBindingInfo> queueInfos, final List<PreparedTransactionInfo> preparedTransactions, - final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap, + final BiConsumer<PreparedTransactionInfo, Throwable> failedTransactionCallback, final Map<Long, PageSubscription> pageSubscriptions, final Set<Pair<Long, Long>> pendingLargeMessages, JournalLoader journalLoader) throws Exception { // recover prepared transactions - CoreMessageObjectPools pools = null; + final CoreMessageObjectPools pools = new CoreMessageObjectPools(); for (PreparedTransactionInfo preparedTransaction : preparedTransactions) { - XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData()); - - Xid xid = encodingXid.xid; + try { + loadSinglePreparedTransaction(postOffice, pagingManager, resourceManager, queueInfos, pageSubscriptions, pendingLargeMessages, journalLoader, pools, preparedTransaction); + } catch (Throwable e) { + if (failedTransactionCallback != null) { + failedTransactionCallback.accept(preparedTransaction, e); + } else { + logger.warn(e.getMessage(), e); + } + } + } + } - Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this); + private void loadSinglePreparedTransaction(PostOffice postOffice, + PagingManager pagingManager, + ResourceManager resourceManager, + Map<Long, QueueBindingInfo> queueInfos, + Map<Long, PageSubscription> pageSubscriptions, + Set<Pair<Long, Long>> pendingLargeMessages, + JournalLoader journalLoader, + CoreMessageObjectPools pools, + PreparedTransactionInfo preparedTransaction) throws Exception { + XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData()); - List<MessageReference> referencesToAck = new ArrayList<>(); + Xid xid = encodingXid.xid; - Map<Long, Message> messages = new HashMap<>(); + Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this); - // Use same method as load message journal to prune out acks, so they don't get added. - // Then have reacknowledge(tx) methods on queue, which needs to add the page size + List<MessageReference> referencesToAck = new ArrayList<>(); - // first get any sent messages for this tx and recreate - for (RecordInfo record : preparedTransaction.getRecords()) { - byte[] data = record.data; + Map<Long, Message> messages = new HashMap<>(); - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + // Use same method as load message journal to prune out acks, so they don't get added. + // Then have reacknowledge(tx) methods on queue, which needs to add the page size - byte recordType = record.getUserRecordType(); + // first get any sent messages for this tx and recreate + for (RecordInfo record : preparedTransaction.getRecords()) { + byte[] data = record.data; - switch (recordType) { - case JournalRecordIds.ADD_LARGE_MESSAGE: { - messages.put(record.id, parseLargeMessage(buff).toMessage()); + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); - break; - } - case JournalRecordIds.ADD_MESSAGE: { + byte recordType = record.getUserRecordType(); - break; - } - case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { - if (pools == null) { - pools = new CoreMessageObjectPools(); - } - Message message = decodeMessage(pools, buff); + switch (recordType) { + case JournalRecordIds.ADD_LARGE_MESSAGE: { + messages.put(record.id, parseLargeMessage(buff).toMessage()); - messages.put(record.id, message); + break; + } + case JournalRecordIds.ADD_MESSAGE: { - break; - } - case JournalRecordIds.ADD_REF: { - long messageID = record.id; + break; + } + case JournalRecordIds.ADD_MESSAGE_PROTOCOL: { + Message message = decodeMessage(pools, buff); - RefEncoding encoding = new RefEncoding(); + messages.put(record.id, message); - encoding.decode(buff); + break; + } + case JournalRecordIds.ADD_REF: { + long messageID = record.id; - Message message = messages.get(messageID); + RefEncoding encoding = new RefEncoding(); - if (message == null) { - throw new IllegalStateException("Cannot find message with id " + messageID); - } + encoding.decode(buff); - journalLoader.handlePreparedSendMessage(message, tx, encoding.queueID); + Message message = messages.get(messageID); - break; + if (message == null) { + throw new IllegalStateException("Cannot find message with id " + messageID); } - case JournalRecordIds.ACKNOWLEDGE_REF: { - long messageID = record.id; - - RefEncoding encoding = new RefEncoding(); - encoding.decode(buff); + journalLoader.handlePreparedSendMessage(message, tx, encoding.queueID); - journalLoader.handlePreparedAcknowledge(messageID, referencesToAck, encoding.queueID); + break; + } + case JournalRecordIds.ACKNOWLEDGE_REF: { + long messageID = record.id; - break; - } - case JournalRecordIds.PAGE_TRANSACTION: { + RefEncoding encoding = new RefEncoding(); - PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl(); + encoding.decode(buff); - pageTransactionInfo.decode(buff); + journalLoader.handlePreparedAcknowledge(messageID, referencesToAck, encoding.queueID); - if (record.isUpdate) { - PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID()); - if (pgTX != null) { - pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); - } - } else { - pageTransactionInfo.setCommitted(false); + break; + } + case JournalRecordIds.PAGE_TRANSACTION: { - tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); + PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl(); - pagingManager.addTransaction(pageTransactionInfo); + pageTransactionInfo.decode(buff); - tx.addOperation(new FinishPageMessageOperation()); + if (record.isUpdate) { + PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID()); + if (pgTX != null) { + pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); } + } else { + pageTransactionInfo.setCommitted(false); - break; - } - case SET_SCHEDULED_DELIVERY_TIME: { - // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which - // case the message will already have the header for the scheduled delivery time, so no need to do - // anything. + tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); - break; + pagingManager.addTransaction(pageTransactionInfo); + + tx.addOperation(new FinishPageMessageOperation()); } - case DUPLICATE_ID: { - // We need load the duplicate ids at prepare time too - DuplicateIDEncoding encoding = new DuplicateIDEncoding(); - encoding.decode(buff); + break; + } + case SET_SCHEDULED_DELIVERY_TIME: { + // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which + // case the message will already have the header for the scheduled delivery time, so no need to do + // anything. + + break; + } + case DUPLICATE_ID: { + // We need load the duplicate ids at prepare time too + DuplicateIDEncoding encoding = new DuplicateIDEncoding(); - DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address); + encoding.decode(buff); - cache.load(tx, encoding.duplID); + DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address); - break; - } - case ACKNOWLEDGE_CURSOR: { - CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); - encoding.decode(buff); + cache.load(tx, encoding.duplID); - encoding.position.setRecordID(record.id); + break; + } + case ACKNOWLEDGE_CURSOR: { + CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); + encoding.decode(buff); - PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); + encoding.position.setRecordID(record.id); - if (sub != null) { - sub.reloadPreparedACK(tx, encoding.position); - referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub)); - } else { - ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID); - } - break; - } - case PAGE_CURSOR_COUNTER_VALUE: { - ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared(); + PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); - break; + if (sub != null) { + sub.reloadPreparedACK(tx, encoding.position); + referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub)); + } else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID); } + break; + } + case PAGE_CURSOR_COUNTER_VALUE: { + ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared(); - case PAGE_CURSOR_COUNTER_INC: { - PageCountRecordInc encoding = new PageCountRecordInc(); + break; + } - encoding.decode(buff); + case PAGE_CURSOR_COUNTER_INC: { + PageCountRecordInc encoding = new PageCountRecordInc(); - PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); + encoding.decode(buff); - if (sub != null) { - sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize()); - sub.notEmpty(); - } else { - ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); - } + PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); - break; + if (sub != null) { + sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize()); + sub.notEmpty(); + } else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); } - default: { - ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType); - } + break; + } + + default: { + ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType); } } + } - for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) { - byte[] data = recordDeleted.data; + for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) { + byte[] data = recordDeleted.data; - if (data.length > 0) { - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); - byte b = buff.readByte(); + if (data.length > 0) { + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + byte b = buff.readByte(); - switch (b) { - case ADD_LARGE_MESSAGE_PENDING: { - long messageID = buff.readLong(); - if (!pendingLargeMessages.remove(new Pair<>(recordDeleted.id, messageID))) { - ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id); - } - installLargeMessageConfirmationOnTX(tx, recordDeleted.id); - break; + switch (b) { + case ADD_LARGE_MESSAGE_PENDING: { + long messageID = buff.readLong(); + if (!pendingLargeMessages.remove(new Pair<>(recordDeleted.id, messageID))) { + ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id); } - default: - ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b); + installLargeMessageConfirmationOnTX(tx, recordDeleted.id); + break; } + default: + ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b); } - } - journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager); } + + journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager); } OperationContext getContext(final boolean sync) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index ac75bec..27d3d3d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1768,6 +1768,14 @@ public interface ActiveMQServerLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void federationDispatchError(@Cause Throwable e, String message); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222306, value = "Failed to load prepared TX and it will be rolled back: {0}", + format = Message.Format.MESSAGE_FORMAT) + void failedToLoadPreparedTX(@Cause Throwable e, String message); + + + + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 29a6694..86de254 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -1314,6 +1314,112 @@ public class PagingTest extends ActiveMQTestBase { session.close(); } + + @Test + public void testPreparedACKRemoveAndRestart() throws Exception { + Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE); + + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int numberOfMessages = 10; + + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setAckBatchSize(0); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + session.createQueue(new QueueConfiguration(PagingTest.ADDRESS)); + + Queue queue = server.locateQueue(PagingTest.ADDRESS); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + queue.getPageSubscription().getPagingStore().startPaging(); + + forcePage(queue); + + for (int i = 0; i < numberOfMessages; i++) { + ClientMessage message = session.createMessage(true); + + message.putIntProperty("count", i); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + + if (i == 4) { + session.commit(); + queue.getPageSubscription().getPagingStore().forceAnotherPage(); + } + } + + session.commit(); + + session.close(); + + session = sf.createSession(true, false, false); + + + ClientConsumer cons = session.createConsumer(ADDRESS); + + session.start(); + + for (int i = 0; i <= 4; i++) { + Xid xidConsumeNoCommit = newXID(); + session.start(xidConsumeNoCommit, XAResource.TMNOFLAGS); + // First message is consumed, prepared, will be rolled back later + ClientMessage firstMessageConsumed = cons.receive(5000); + assertNotNull(firstMessageConsumed); + firstMessageConsumed.acknowledge(); + session.end(xidConsumeNoCommit, XAResource.TMSUCCESS); + session.prepare(xidConsumeNoCommit); + } + + File pagingFolder = queue.getPageSubscription().getPagingStore().getFolder(); + + server.stop(); + + // remove the very first page. a restart should not fail + File fileToRemove = new File(pagingFolder, "000000001.page"); + Assert.assertTrue(fileToRemove.delete()); + + server.start(); + + sf = createSessionFactory(locator); + + session = sf.createSession(false, true, true); + + cons = session.createConsumer(ADDRESS); + + session.start(); + + for (int i = 5; i < numberOfMessages; i++) { + ClientMessage message = cons.receive(1000); + assertNotNull(message); + assertEquals(i, message.getIntProperty("count").intValue()); + message.acknowledge(); + } + assertNull(cons.receiveImmediate()); + session.commit(); + } + /** * @param queue * @throws InterruptedException
