This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 4486a70aaa ARTEMIS-5574 fix handling of pending large message records
4486a70aaa is described below
commit 4486a70aaaaef7182ba922419e7277e9fb226ef4
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Jul 10 16:19:04 2025 -0500
ARTEMIS-5574 fix handling of pending large message records
Thanks to Clebert Suconic for the test.
---
.../artemis/core/persistence/StorageManager.java | 10 +--
.../journal/AbstractJournalStorageManager.java | 32 +--------
.../persistence/impl/journal/JournalRecordIds.java | 4 +-
.../TXLargeMessageConfirmationOperation.java | 45 -------------
.../impl/nullpm/NullStorageManager.java | 8 +--
.../core/server/impl/ActiveMQServerImpl.java | 9 +--
.../core/transaction/impl/TransactionImplTest.java | 9 +--
.../tests/integration/client/SendAckFailTest.java | 11 +--
.../largemessage/PendingLargeMessageTest.java | 78 ++++++++++++++++++++++
9 files changed, 90 insertions(+), 116 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index fd8404f3b7..a4a11359e0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -185,15 +185,7 @@ public interface StorageManager extends MapStorageManager,
IDGenerator, ActiveMQ
void clearContext();
- /**
- * Confirms that a large message was finished
- */
- void confirmPendingLargeMessageTX(Transaction transaction, long messageID,
long recordID) throws Exception;
-
- /**
- * Confirms that a large message was finished
- */
- void confirmPendingLargeMessage(long recordID) throws Exception;
+ void deletePendingLargeMessage(long recordID) throws Exception;
void storeMessage(Message message) throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 87b5096c2f..c3fc7653ae 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -82,7 +82,6 @@ import
org.apache.activemq.artemis.core.persistence.config.PersistedUser;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.AddressStatusEncoding;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
-import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation;
@@ -375,18 +374,7 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
// Non transactional operations
@Override
- public void confirmPendingLargeMessageTX(final Transaction tx, long
messageID, long recordID) throws Exception {
- try (ArtemisCloseable lock = closeableReadLock()) {
- installLargeMessageConfirmationOnTX(tx, recordID);
- messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID,
new DeleteEncoding(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, messageID));
- }
- }
-
- /**
- * We don't need messageID now but we are likely to need it we ever decide
to support a database
- */
- @Override
- public void confirmPendingLargeMessage(long recordID) throws Exception {
+ public void deletePendingLargeMessage(long recordID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
messageJournal.tryAppendDeleteRecord(recordID, true,
this::messageUpdateCallback, getContext());
}
@@ -2089,12 +2077,8 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
switch (b) {
case ADD_LARGE_MESSAGE_PENDING: {
- long messageID = buff.readLong();
- if (!pendingLargeMessages.remove(new
Pair<>(recordDeleted.id, messageID))) {
-
ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id);
- }
- installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
- break;
+ // reading just to position the buffer, not used any more
+ buff.readLong();
}
default:
ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b);
@@ -2294,14 +2278,4 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
return credits >= 0;
}
-
- private void installLargeMessageConfirmationOnTX(Transaction tx, long
recordID) {
- TXLargeMessageConfirmationOperation txoper =
(TXLargeMessageConfirmationOperation)
tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
- if (txoper == null) {
- txoper = new TXLargeMessageConfirmationOperation(this);
-
tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
- }
- txoper.confirmedMessages.add(recordID);
- }
-
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 16b2f4d310..b29ca6efe7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -53,9 +53,7 @@ public final class JournalRecordIds {
// Message journal record types
/**
- * This is used when a large message is created but not yet stored on the
system.
- * <p>
- * We use this to avoid temporary files missing
+ * THIS RECORD IS NO LONGER USED, WE NOW WILL SCAN ALL PAGE FILES FOR
PENDING LARGE MESSAGES
*/
public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
deleted file mode 100644
index 5b34a327fa..0000000000
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.persistence.impl.journal;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
-
-public final class TXLargeMessageConfirmationOperation extends
TransactionOperationAbstract {
-
- private AbstractJournalStorageManager journalStorageManager;
- public List<Long> confirmedMessages = new LinkedList<>();
-
- public TXLargeMessageConfirmationOperation(AbstractJournalStorageManager
journalStorageManager) {
- this.journalStorageManager = journalStorageManager;
- }
-
- @Override
- public void afterRollback(Transaction tx) {
- for (Long msg : confirmedMessages) {
- try {
- journalStorageManager.confirmPendingLargeMessage(msg);
- } catch (Throwable e) {
-
ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(msg, e);
- }
- }
- }
-}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index a2bd9563a9..89f084685b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -652,13 +652,7 @@ public class NullStorageManager implements StorageManager {
}
@Override
- public void confirmPendingLargeMessageTX(final Transaction transaction,
- final long messageID,
- final long recordID) throws
Exception {
- }
-
- @Override
- public void confirmPendingLargeMessage(final long recordID) throws
Exception {
+ public void deletePendingLargeMessage(final long recordID) throws Exception
{
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 04eb012a3c..fbfe0fe976 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -138,7 +138,6 @@ import
org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MemoryManager;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
@@ -3921,12 +3920,10 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
journalLoader.handleDuplicateIds(duplicateIDMap);
+ // this deals with legacy pending large message records that might be
left in a journal after an upgrade
for (Pair<Long, Long> msgToDelete : pendingLargeMessages) {
- ActiveMQServerLogger.LOGGER.deletingPendingMessage(msgToDelete);
- LargeServerMessage msg = storageManager.createCoreLargeMessage();
- msg.setMessageID(msgToDelete.getB());
- msg.setDurable(true);
- msg.deleteFile();
+ logger.debug("Removing pending large message record: {}",
msgToDelete.getA());
+ storageManager.deletePendingLargeMessage(msgToDelete.getA());
}
if (pendingNonTXPageCounter.size() != 0) {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 6a6b005526..0430348967 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -467,20 +467,13 @@ public class TransactionImplTest extends ServerTestBase {
}
- @Override
- public void confirmPendingLargeMessageTX(Transaction transaction,
- long messageID,
- long recordID) throws Exception
{
-
- }
-
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
@Override
- public void confirmPendingLargeMessage(long recordID) throws Exception {
+ public void deletePendingLargeMessage(long recordID) throws Exception {
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 637f62916a..71d17944e3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -426,15 +426,8 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
- public void confirmPendingLargeMessageTX(Transaction transaction,
- long messageID,
- long recordID) throws Exception
{
- manager.confirmPendingLargeMessageTX(transaction, messageID,
recordID);
- }
-
- @Override
- public void confirmPendingLargeMessage(long recordID) throws Exception {
- manager.confirmPendingLargeMessage(recordID);
+ public void deletePendingLargeMessage(long recordID) throws Exception {
+ manager.deletePendingLargeMessage(recordID);
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/PendingLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/PendingLargeMessageTest.java
new file mode 100644
index 0000000000..1244d71e1f
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/largemessage/PendingLargeMessageTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.largemessage;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.journal.Journal;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Validate that large message pending records are cleared after a restart.
+ * {@link
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds#ADD_LARGE_MESSAGE_PENDING}
is not used any longer
+ * however if this record is in the journal the server should clear any
previous data
+ */
+public class PendingLargeMessageTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // The ClientConsumer should be able to also send ServerLargeMessages as
that's done by the CoreBridge
+ @Test
+ public void testPendingRecords() throws Exception {
+ ActiveMQServer server = createServer(true);
+
+ server.start();
+
+ Journal journal = server.getStorageManager().getMessageJournal();
+
+ for (int i = 0; i < 100; i++) {
+ long recordID = server.getStorageManager().generateID();
+ long fakeMessageID = server.getStorageManager().generateID();
+ // this is storing the record that we used to use
+ journal.appendAddRecord(recordID,
JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, new
PendingLargeMessageEncoding(fakeMessageID), true,
server.getStorageManager().getContext());
+ }
+
+ server.getStorageManager().getContext().waitCompletion();
+
+ server.stop();
+ try (AssertionLoggerHandler assertionLoggerHandler = new
AssertionLoggerHandler()) {
+ server.start();
+ assertFalse(assertionLoggerHandler.findText("large message"));
+ assertFalse(assertionLoggerHandler.findText("AMQ221005"));
+ }
+
+ // compact to get rid of old records from the journal
+
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
+
+ HashMap<Integer, AtomicInteger> records =
countJournal(server.getConfiguration());
+
+ AtomicInteger recordsForPending =
records.get(Integer.valueOf(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING));
+ assertTrue(recordsForPending == null || recordsForPending.get() == 0);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact