This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 4486a70aaa ARTEMIS-5574 fix handling of pending large message records
4486a70aaa is described below

commit 4486a70aaaaef7182ba922419e7277e9fb226ef4
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Jul 10 16:19:04 2025 -0500

    ARTEMIS-5574 fix handling of pending large message records
    
    Thanks to Clebert Suconic for the test.
---
 .../artemis/core/persistence/StorageManager.java   | 10 +--
 .../journal/AbstractJournalStorageManager.java     | 32 +--------
 .../persistence/impl/journal/JournalRecordIds.java |  4 +-
 .../TXLargeMessageConfirmationOperation.java       | 45 -------------
 .../impl/nullpm/NullStorageManager.java            |  8 +--
 .../core/server/impl/ActiveMQServerImpl.java       |  9 +--
 .../core/transaction/impl/TransactionImplTest.java |  9 +--
 .../tests/integration/client/SendAckFailTest.java  | 11 +--
 .../largemessage/PendingLargeMessageTest.java      | 78 ++++++++++++++++++++++
 9 files changed, 90 insertions(+), 116 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index fd8404f3b7..a4a11359e0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -185,15 +185,7 @@ public interface StorageManager extends MapStorageManager, 
IDGenerator, ActiveMQ
 
    void clearContext();
 
-   /**
-    * Confirms that a large message was finished
-    */
-   void confirmPendingLargeMessageTX(Transaction transaction, long messageID, 
long recordID) throws Exception;
-
-   /**
-    * Confirms that a large message was finished
-    */
-   void confirmPendingLargeMessage(long recordID) throws Exception;
+   void deletePendingLargeMessage(long recordID) throws Exception;
 
    void storeMessage(Message message) throws Exception;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 87b5096c2f..c3fc7653ae 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -82,7 +82,6 @@ import 
org.apache.activemq.artemis.core.persistence.config.PersistedUser;
 import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
-import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation;
@@ -375,18 +374,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    // Non transactional operations
 
    @Override
-   public void confirmPendingLargeMessageTX(final Transaction tx, long 
messageID, long recordID) throws Exception {
-      try (ArtemisCloseable lock = closeableReadLock()) {
-         installLargeMessageConfirmationOnTX(tx, recordID);
-         messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, 
new DeleteEncoding(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, messageID));
-      }
-   }
-
-   /**
-    * We don't need messageID now but we are likely to need it we ever decide 
to support a database
-    */
-   @Override
-   public void confirmPendingLargeMessage(long recordID) throws Exception {
+   public void deletePendingLargeMessage(long recordID) throws Exception {
       try (ArtemisCloseable lock = closeableReadLock()) {
          messageJournal.tryAppendDeleteRecord(recordID, true, 
this::messageUpdateCallback, getContext());
       }
@@ -2089,12 +2077,8 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
 
             switch (b) {
                case ADD_LARGE_MESSAGE_PENDING: {
-                  long messageID = buff.readLong();
-                  if (!pendingLargeMessages.remove(new 
Pair<>(recordDeleted.id, messageID))) {
-                     
ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
-                  }
-                  installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
-                  break;
+                  // reading just to position the buffer, not used any more
+                  buff.readLong();
                }
                default:
                   
ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
@@ -2294,14 +2278,4 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
 
       return credits >= 0;
    }
-
-   private void installLargeMessageConfirmationOnTX(Transaction tx, long 
recordID) {
-      TXLargeMessageConfirmationOperation txoper = 
(TXLargeMessageConfirmationOperation) 
tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
-      if (txoper == null) {
-         txoper = new TXLargeMessageConfirmationOperation(this);
-         
tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
-      }
-      txoper.confirmedMessages.add(recordID);
-   }
-
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 16b2f4d310..b29ca6efe7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -53,9 +53,7 @@ public final class JournalRecordIds {
    // Message journal record types
 
    /**
-    * This is used when a large message is created but not yet stored on the 
system.
-    * <p>
-    * We use this to avoid temporary files missing
+    * THIS RECORD IS NO LONGER USED, WE NOW WILL SCAN ALL PAGE FILES FOR 
PENDING LARGE MESSAGES
     */
    public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
deleted file mode 100644
index 5b34a327fa..0000000000
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.persistence.impl.journal;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
-
-public final class TXLargeMessageConfirmationOperation extends 
TransactionOperationAbstract {
-
-   private AbstractJournalStorageManager journalStorageManager;
-   public List<Long> confirmedMessages = new LinkedList<>();
-
-   public TXLargeMessageConfirmationOperation(AbstractJournalStorageManager 
journalStorageManager) {
-      this.journalStorageManager = journalStorageManager;
-   }
-
-   @Override
-   public void afterRollback(Transaction tx) {
-      for (Long msg : confirmedMessages) {
-         try {
-            journalStorageManager.confirmPendingLargeMessage(msg);
-         } catch (Throwable e) {
-            
ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(msg, e);
-         }
-      }
-   }
-}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index a2bd9563a9..89f084685b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -652,13 +652,7 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public void confirmPendingLargeMessageTX(final Transaction transaction,
-                                            final long messageID,
-                                            final long recordID) throws 
Exception {
-   }
-
-   @Override
-   public void confirmPendingLargeMessage(final long recordID) throws 
Exception {
+   public void deletePendingLargeMessage(final long recordID) throws Exception 
{
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 04eb012a3c..fbfe0fe976 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -138,7 +138,6 @@ import 
org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.BrokerConnection;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.JournalType;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MemoryManager;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
@@ -3921,12 +3920,10 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
 
       journalLoader.handleDuplicateIds(duplicateIDMap);
 
+      // this deals with legacy pending large message records that might be 
left in a journal after an upgrade
       for (Pair<Long, Long> msgToDelete : pendingLargeMessages) {
-         ActiveMQServerLogger.LOGGER.deletingPendingMessage(msgToDelete);
-         LargeServerMessage msg = storageManager.createCoreLargeMessage();
-         msg.setMessageID(msgToDelete.getB());
-         msg.setDurable(true);
-         msg.deleteFile();
+         logger.debug("Removing pending large message record: {}", 
msgToDelete.getA());
+         storageManager.deletePendingLargeMessage(msgToDelete.getA());
       }
 
       if (pendingNonTXPageCounter.size() != 0) {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 6a6b005526..0430348967 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -467,20 +467,13 @@ public class TransactionImplTest extends ServerTestBase {
 
       }
 
-      @Override
-      public void confirmPendingLargeMessageTX(Transaction transaction,
-                                               long messageID,
-                                               long recordID) throws Exception 
{
-
-      }
-
       @Override
       public void injectMonitor(FileStoreMonitor monitor) throws Exception {
 
       }
 
       @Override
-      public void confirmPendingLargeMessage(long recordID) throws Exception {
+      public void deletePendingLargeMessage(long recordID) throws Exception {
 
       }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 637f62916a..71d17944e3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -426,15 +426,8 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
-      public void confirmPendingLargeMessageTX(Transaction transaction,
-                                               long messageID,
-                                               long recordID) throws Exception 
{
-         manager.confirmPendingLargeMessageTX(transaction, messageID, 
recordID);
-      }
-
-      @Override
-      public void confirmPendingLargeMessage(long recordID) throws Exception {
-         manager.confirmPendingLargeMessage(recordID);
+      public void deletePendingLargeMessage(long recordID) throws Exception {
+         manager.deletePendingLargeMessage(recordID);
       }
 
       @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/PendingLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/PendingLargeMessageTest.java
new file mode 100644
index 0000000000..1244d71e1f
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/PendingLargeMessageTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.largemessage;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.journal.Journal;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Validate that large message pending records are cleared after a restart.
+ * {@link 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds#ADD_LARGE_MESSAGE_PENDING}
 is not used any longer
+ * however if this record is in the journal the server should clear any 
previous data
+ */
+public class PendingLargeMessageTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   // The ClientConsumer should be able to also send ServerLargeMessages as 
that's done by the CoreBridge
+   @Test
+   public void testPendingRecords() throws Exception {
+      ActiveMQServer server = createServer(true);
+
+      server.start();
+
+      Journal journal = server.getStorageManager().getMessageJournal();
+
+      for (int i = 0; i < 100; i++) {
+         long recordID = server.getStorageManager().generateID();
+         long fakeMessageID = server.getStorageManager().generateID();
+         // this is storing the record that we used to use
+         journal.appendAddRecord(recordID, 
JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new 
PendingLargeMessageEncoding(fakeMessageID), true, 
server.getStorageManager().getContext());
+      }
+
+      server.getStorageManager().getContext().waitCompletion();
+
+      server.stop();
+      try (AssertionLoggerHandler assertionLoggerHandler = new 
AssertionLoggerHandler()) {
+         server.start();
+         assertFalse(assertionLoggerHandler.findText("large message"));
+         assertFalse(assertionLoggerHandler.findText("AMQ221005"));
+      }
+
+      // compact to get rid of old records from the journal
+      
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+
+      HashMap<Integer, AtomicInteger> records = 
countJournal(server.getConfiguration());
+
+      AtomicInteger recordsForPending = 
records.get(Integer.valueOf(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING));
+      assertTrue(recordsForPending == null || recordsForPending.get() == 0);
+   }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to