This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.19.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b1db573d99c0c305f6061216983b1dc3a05213a0
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 4 20:56:06 2021 -0400

    ARTEMIS-3554 Invalid Prepared Transaction could interrupt server reload
    
    (cherry picked from commit 98a6e42a57038e2a97d13c2cf1d3a70dd30f5ae1)
---
 .../journal/AbstractJournalStorageManager.java     | 284 ++++++++++++---------
 .../artemis/core/server/ActiveMQServerLogger.java  |   8 +
 .../tests/integration/paging/PagingTest.java       | 106 ++++++++
 3 files changed, 273 insertions(+), 125 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 314982e..a3e5044 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
 
 import javax.transaction.xa.Xid;
 
@@ -1205,7 +1206,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
 
          journalLoader.handleAddMessage(queueMap);
 
-         loadPreparedTransactions(postOffice, pagingManager, resourceManager, 
queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions, 
pendingLargeMessages, journalLoader);
+         loadPreparedTransactions(postOffice, pagingManager, resourceManager, 
queueInfos, preparedTransactions, this::failedToPrepareException, 
pageSubscriptions, pendingLargeMessages, journalLoader);
 
          for (PageSubscription sub : pageSubscriptions.values()) {
             sub.getCounter().processReload();
@@ -1236,6 +1237,22 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
       }
    }
 
+   private void failedToPrepareException(PreparedTransactionInfo txInfo, 
Throwable e) {
+      XidEncoding encodingXid = null;
+      try {
+         encodingXid = new XidEncoding(txInfo.getExtraData());
+      } catch (Throwable ignored) {
+      }
+
+      ActiveMQServerLogger.LOGGER.failedToLoadPreparedTX(e, 
String.valueOf(encodingXid != null ? encodingXid.xid : null));
+
+      try {
+         rollback(txInfo.getId());
+      } catch (Throwable e2) {
+         logger.warn(e.getMessage(), e2);
+      }
+   }
+
    private Message decodeMessage(CoreMessageObjectPools pools, ActiveMQBuffer 
buff) {
       Message message = MessagePersister.getInstance().decode(buff, null, 
pools, this);
       return message;
@@ -1706,195 +1723,212 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                                          final ResourceManager resourceManager,
                                          final Map<Long, QueueBindingInfo> 
queueInfos,
                                          final List<PreparedTransactionInfo> 
preparedTransactions,
-                                         final Map<SimpleString, 
List<Pair<byte[], Long>>> duplicateIDMap,
+                                         final 
BiConsumer<PreparedTransactionInfo, Throwable> failedTransactionCallback,
                                          final Map<Long, PageSubscription> 
pageSubscriptions,
                                          final Set<Pair<Long, Long>> 
pendingLargeMessages,
                                          JournalLoader journalLoader) throws 
Exception {
       // recover prepared transactions
-      CoreMessageObjectPools pools = null;
+      final CoreMessageObjectPools pools = new CoreMessageObjectPools();
 
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions) 
{
-         XidEncoding encodingXid = new 
XidEncoding(preparedTransaction.getExtraData());
-
-         Xid xid = encodingXid.xid;
+         try {
+            loadSinglePreparedTransaction(postOffice, pagingManager, 
resourceManager, queueInfos, pageSubscriptions, pendingLargeMessages, 
journalLoader, pools, preparedTransaction);
+         } catch (Throwable e) {
+            if (failedTransactionCallback != null) {
+               failedTransactionCallback.accept(preparedTransaction, e);
+            } else {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+      }
+   }
 
-         Transaction tx = new TransactionImpl(preparedTransaction.getId(), 
xid, this);
+   private void loadSinglePreparedTransaction(PostOffice postOffice,
+                          PagingManager pagingManager,
+                          ResourceManager resourceManager,
+                          Map<Long, QueueBindingInfo> queueInfos,
+                          Map<Long, PageSubscription> pageSubscriptions,
+                          Set<Pair<Long, Long>> pendingLargeMessages,
+                          JournalLoader journalLoader,
+                          CoreMessageObjectPools pools,
+                          PreparedTransactionInfo preparedTransaction) throws 
Exception {
+      XidEncoding encodingXid = new 
XidEncoding(preparedTransaction.getExtraData());
 
-         List<MessageReference> referencesToAck = new ArrayList<>();
+      Xid xid = encodingXid.xid;
 
-         Map<Long, Message> messages = new HashMap<>();
+      Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, 
this);
 
-         // Use same method as load message journal to prune out acks, so they 
don't get added.
-         // Then have reacknowledge(tx) methods on queue, which needs to add 
the page size
+      List<MessageReference> referencesToAck = new ArrayList<>();
 
-         // first get any sent messages for this tx and recreate
-         for (RecordInfo record : preparedTransaction.getRecords()) {
-            byte[] data = record.data;
+      Map<Long, Message> messages = new HashMap<>();
 
-            ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+      // Use same method as load message journal to prune out acks, so they 
don't get added.
+      // Then have reacknowledge(tx) methods on queue, which needs to add the 
page size
 
-            byte recordType = record.getUserRecordType();
+      // first get any sent messages for this tx and recreate
+      for (RecordInfo record : preparedTransaction.getRecords()) {
+         byte[] data = record.data;
 
-            switch (recordType) {
-               case JournalRecordIds.ADD_LARGE_MESSAGE: {
-                  messages.put(record.id, parseLargeMessage(buff).toMessage());
+         ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
 
-                  break;
-               }
-               case JournalRecordIds.ADD_MESSAGE: {
+         byte recordType = record.getUserRecordType();
 
-                  break;
-               }
-               case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
-                  if (pools == null) {
-                     pools = new CoreMessageObjectPools();
-                  }
-                  Message message = decodeMessage(pools, buff);
+         switch (recordType) {
+            case JournalRecordIds.ADD_LARGE_MESSAGE: {
+               messages.put(record.id, parseLargeMessage(buff).toMessage());
 
-                  messages.put(record.id, message);
+               break;
+            }
+            case JournalRecordIds.ADD_MESSAGE: {
 
-                  break;
-               }
-               case JournalRecordIds.ADD_REF: {
-                  long messageID = record.id;
+               break;
+            }
+            case JournalRecordIds.ADD_MESSAGE_PROTOCOL: {
+               Message message = decodeMessage(pools, buff);
 
-                  RefEncoding encoding = new RefEncoding();
+               messages.put(record.id, message);
 
-                  encoding.decode(buff);
+               break;
+            }
+            case JournalRecordIds.ADD_REF: {
+               long messageID = record.id;
 
-                  Message message = messages.get(messageID);
+               RefEncoding encoding = new RefEncoding();
 
-                  if (message == null) {
-                     throw new IllegalStateException("Cannot find message with 
id " + messageID);
-                  }
+               encoding.decode(buff);
 
-                  journalLoader.handlePreparedSendMessage(message, tx, 
encoding.queueID);
+               Message message = messages.get(messageID);
 
-                  break;
+               if (message == null) {
+                  throw new IllegalStateException("Cannot find message with id 
" + messageID);
                }
-               case JournalRecordIds.ACKNOWLEDGE_REF: {
-                  long messageID = record.id;
-
-                  RefEncoding encoding = new RefEncoding();
 
-                  encoding.decode(buff);
+               journalLoader.handlePreparedSendMessage(message, tx, 
encoding.queueID);
 
-                  journalLoader.handlePreparedAcknowledge(messageID, 
referencesToAck, encoding.queueID);
+               break;
+            }
+            case JournalRecordIds.ACKNOWLEDGE_REF: {
+               long messageID = record.id;
 
-                  break;
-               }
-               case JournalRecordIds.PAGE_TRANSACTION: {
+               RefEncoding encoding = new RefEncoding();
 
-                  PageTransactionInfo pageTransactionInfo = new 
PageTransactionInfoImpl();
+               encoding.decode(buff);
 
-                  pageTransactionInfo.decode(buff);
+               journalLoader.handlePreparedAcknowledge(messageID, 
referencesToAck, encoding.queueID);
 
-                  if (record.isUpdate) {
-                     PageTransactionInfo pgTX = 
pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
-                     if (pgTX != null) {
-                        pgTX.reloadUpdate(this, pagingManager, tx, 
pageTransactionInfo.getNumberOfMessages());
-                     }
-                  } else {
-                     pageTransactionInfo.setCommitted(false);
+               break;
+            }
+            case JournalRecordIds.PAGE_TRANSACTION: {
 
-                     
tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, 
pageTransactionInfo);
+               PageTransactionInfo pageTransactionInfo = new 
PageTransactionInfoImpl();
 
-                     pagingManager.addTransaction(pageTransactionInfo);
+               pageTransactionInfo.decode(buff);
 
-                     tx.addOperation(new FinishPageMessageOperation());
+               if (record.isUpdate) {
+                  PageTransactionInfo pgTX = 
pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
+                  if (pgTX != null) {
+                     pgTX.reloadUpdate(this, pagingManager, tx, 
pageTransactionInfo.getNumberOfMessages());
                   }
+               } else {
+                  pageTransactionInfo.setCommitted(false);
 
-                  break;
-               }
-               case SET_SCHEDULED_DELIVERY_TIME: {
-                  // Do nothing - for prepared txs, the set scheduled delivery 
time will only occur in a send in which
-                  // case the message will already have the header for the 
scheduled delivery time, so no need to do
-                  // anything.
+                  tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, 
pageTransactionInfo);
 
-                  break;
+                  pagingManager.addTransaction(pageTransactionInfo);
+
+                  tx.addOperation(new FinishPageMessageOperation());
                }
-               case DUPLICATE_ID: {
-                  // We need load the duplicate ids at prepare time too
-                  DuplicateIDEncoding encoding = new DuplicateIDEncoding();
 
-                  encoding.decode(buff);
+               break;
+            }
+            case SET_SCHEDULED_DELIVERY_TIME: {
+               // Do nothing - for prepared txs, the set scheduled delivery 
time will only occur in a send in which
+               // case the message will already have the header for the 
scheduled delivery time, so no need to do
+               // anything.
+
+               break;
+            }
+            case DUPLICATE_ID: {
+               // We need load the duplicate ids at prepare time too
+               DuplicateIDEncoding encoding = new DuplicateIDEncoding();
 
-                  DuplicateIDCache cache = 
postOffice.getDuplicateIDCache(encoding.address);
+               encoding.decode(buff);
 
-                  cache.load(tx, encoding.duplID);
+               DuplicateIDCache cache = 
postOffice.getDuplicateIDCache(encoding.address);
 
-                  break;
-               }
-               case ACKNOWLEDGE_CURSOR: {
-                  CursorAckRecordEncoding encoding = new 
CursorAckRecordEncoding();
-                  encoding.decode(buff);
+               cache.load(tx, encoding.duplID);
 
-                  encoding.position.setRecordID(record.id);
+               break;
+            }
+            case ACKNOWLEDGE_CURSOR: {
+               CursorAckRecordEncoding encoding = new 
CursorAckRecordEncoding();
+               encoding.decode(buff);
 
-                  PageSubscription sub = locateSubscription(encoding.queueID, 
pageSubscriptions, queueInfos, pagingManager);
+               encoding.position.setRecordID(record.id);
 
-                  if (sub != null) {
-                     sub.reloadPreparedACK(tx, encoding.position);
-                     referencesToAck.add(new 
PagedReferenceImpl(encoding.position, null, sub));
-                  } else {
-                     
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
-                  }
-                  break;
-               }
-               case PAGE_CURSOR_COUNTER_VALUE: {
-                  ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared();
+               PageSubscription sub = locateSubscription(encoding.queueID, 
pageSubscriptions, queueInfos, pagingManager);
 
-                  break;
+               if (sub != null) {
+                  sub.reloadPreparedACK(tx, encoding.position);
+                  referencesToAck.add(new 
PagedReferenceImpl(encoding.position, null, sub));
+               } else {
+                  
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
                }
+               break;
+            }
+            case PAGE_CURSOR_COUNTER_VALUE: {
+               ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared();
 
-               case PAGE_CURSOR_COUNTER_INC: {
-                  PageCountRecordInc encoding = new PageCountRecordInc();
+               break;
+            }
 
-                  encoding.decode(buff);
+            case PAGE_CURSOR_COUNTER_INC: {
+               PageCountRecordInc encoding = new PageCountRecordInc();
 
-                  PageSubscription sub = 
locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, 
pagingManager);
+               encoding.decode(buff);
 
-                  if (sub != null) {
-                     sub.getCounter().applyIncrementOnTX(tx, record.id, 
encoding.getValue(), encoding.getPersistentSize());
-                     sub.notEmpty();
-                  } else {
-                     
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
-                  }
+               PageSubscription sub = 
locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, 
pagingManager);
 
-                  break;
+               if (sub != null) {
+                  sub.getCounter().applyIncrementOnTX(tx, record.id, 
encoding.getValue(), encoding.getPersistentSize());
+                  sub.notEmpty();
+               } else {
+                  
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
                }
 
-               default: {
-                  
ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType);
-               }
+               break;
+            }
+
+            default: {
+               
ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType);
             }
          }
+      }
 
-         for (RecordInfo recordDeleted : 
preparedTransaction.getRecordsToDelete()) {
-            byte[] data = recordDeleted.data;
+      for (RecordInfo recordDeleted : 
preparedTransaction.getRecordsToDelete()) {
+         byte[] data = recordDeleted.data;
 
-            if (data.length > 0) {
-               ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
-               byte b = buff.readByte();
+         if (data.length > 0) {
+            ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+            byte b = buff.readByte();
 
-               switch (b) {
-                  case ADD_LARGE_MESSAGE_PENDING: {
-                     long messageID = buff.readLong();
-                     if (!pendingLargeMessages.remove(new 
Pair<>(recordDeleted.id, messageID))) {
-                        
ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
-                     }
-                     installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
-                     break;
+            switch (b) {
+               case ADD_LARGE_MESSAGE_PENDING: {
+                  long messageID = buff.readLong();
+                  if (!pendingLargeMessages.remove(new 
Pair<>(recordDeleted.id, messageID))) {
+                     
ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
                   }
-                  default:
-                     
ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
+                  installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
+                  break;
                }
+               default:
+                  
ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
             }
-
          }
 
-         journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, 
resourceManager);
       }
+
+      journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, 
resourceManager);
    }
 
    OperationContext getContext(final boolean sync) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index ac75bec..27d3d3d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1768,6 +1768,14 @@ public interface ActiveMQServerLogger extends 
BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void federationDispatchError(@Cause Throwable e, String message);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222306, value = "Failed to load prepared TX and it will be 
rolled back: {0}",
+      format = Message.Format.MESSAGE_FORMAT)
+   void failedToLoadPreparedTX(@Cause Throwable e, String message);
+
+
+
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = 
Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 29a6694..86de254 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -1314,6 +1314,112 @@ public class PagingTest extends ActiveMQTestBase {
       session.close();
    }
 
+
+   @Test
+   public void testPreparedACKRemoveAndRestart() throws Exception {
+      Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE);
+
+      clearDataRecreateServerDirs();
+
+      Configuration config = 
createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, 
PagingTest.PAGE_MAX);
+
+      server.start();
+
+      final int numberOfMessages = 10;
+
+      locator = 
createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setAckBatchSize(0);
+
+      sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
+
+      Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      byte[] body = new byte[MESSAGE_SIZE];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= MESSAGE_SIZE; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      queue.getPageSubscription().getPagingStore().startPaging();
+
+      forcePage(queue);
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         message.putIntProperty("count", i);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         producer.send(message);
+
+         if (i == 4) {
+            session.commit();
+            queue.getPageSubscription().getPagingStore().forceAnotherPage();
+         }
+      }
+
+      session.commit();
+
+      session.close();
+
+      session = sf.createSession(true, false, false);
+
+
+      ClientConsumer cons = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i <= 4; i++) {
+         Xid xidConsumeNoCommit = newXID();
+         session.start(xidConsumeNoCommit, XAResource.TMNOFLAGS);
+         // First message is consumed, prepared, will be rolled back later
+         ClientMessage firstMessageConsumed = cons.receive(5000);
+         assertNotNull(firstMessageConsumed);
+         firstMessageConsumed.acknowledge();
+         session.end(xidConsumeNoCommit, XAResource.TMSUCCESS);
+         session.prepare(xidConsumeNoCommit);
+      }
+
+      File pagingFolder = 
queue.getPageSubscription().getPagingStore().getFolder();
+
+      server.stop();
+
+      // remove the very first page. a restart should not fail
+      File fileToRemove = new File(pagingFolder, "000000001.page");
+      Assert.assertTrue(fileToRemove.delete());
+
+      server.start();
+
+      sf = createSessionFactory(locator);
+
+      session = sf.createSession(false, true, true);
+
+      cons = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 5; i < numberOfMessages; i++) {
+         ClientMessage message = cons.receive(1000);
+         assertNotNull(message);
+         assertEquals(i, message.getIntProperty("count").intValue());
+         message.acknowledge();
+      }
+      assertNull(cons.receiveImmediate());
+      session.commit();
+   }
+
    /**
     * @param queue
     * @throws InterruptedException

Reply via email to