This is an automated email from the ASF dual-hosted git repository.

jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 3421695091 Enhance JournalFdRecoveryTest to verify message recovery 
after journal file deletion (#1644)
3421695091 is described below

commit 34216950915979ed92c83a76a35ad652fc60bee9
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Feb 11 12:40:57 2026 +0100

    Enhance JournalFdRecoveryTest to verify message recovery after journal file 
deletion (#1644)
---
 .../store/kahadb/JournalFdRecoveryTest.java        | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
index e4dbdd1d26..8a34b88944 100644
--- 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -217,36 +217,53 @@ public class JournalFdRecoveryTest {
         adapter.setPreallocationScope(Journal.PreallocationScope.NONE.name());
         adapter.setCheckpointInterval(50000);
         adapter.setCleanupInterval(50000);
+        // Force each message into its own write batch for deterministic file 
boundaries
+        adapter.setJournalMaxWriteBatchSize(100);
         broker.start();
 
-        int toSend = 50;
+        final int toSend = 50;
         produceMessagesToConsumeMultipleDataFiles(toSend);
 
-        int numFiles = getNumberOfJournalFiles();
+        final int numFiles = getNumberOfJournalFiles();
         LOG.info("Num files: " + numFiles);
         assertTrue("more than x files: " + numFiles, numFiles > 5);
-        assertEquals("Drain", 30, tryConsume(destination, 30));
+
+        final int toConsume = 30;
+        assertEquals("Drain", toConsume, tryConsume(destination, toConsume));
 
         // Force checkpoint to ensure all acknowledgments are persisted before 
stopping
         adapter.getStore().checkpoint(true);
 
-        LOG.info("Num files after stopped: " + getNumberOfJournalFiles());
+        LOG.info("Num files after checkpoint: " + getNumberOfJournalFiles());
 
-        File dataDir = broker.getPersistenceAdapter().getDirectory();
+        // Count pending messages before restart
+        final int pendingBeforeRestart = toSend - toConsume;
+
+        final File dataDir = broker.getPersistenceAdapter().getDirectory();
         broker.stop();
         broker.waitUntilStopped();
 
-        whackDataFile(dataDir, 4);
-
+        // Delete a journal file containing some unacked messages
+        final int fileToDelete = 4;
+        whackDataFile(dataDir, fileToDelete);
         whackIndex(dataDir);
 
         doStartBroker(false);
 
         LOG.info("Num files after restarted: " + getNumberOfJournalFiles());
 
-        assertEquals("Empty?", 18, tryConsume(destination, 20));
+        // After recovery with a deleted journal file, some messages from that 
file are lost.
+        // With journalMaxWriteBatchSize=100, each message gets its own batch, 
making distribution
+        // deterministic. We verify the invariants: some messages lost, most 
recovered, queue empties.
+        final int remaining = tryConsume(destination, pendingBeforeRestart);
+        LOG.info("Messages remaining after recovery: {} (was {} before 
restart)", remaining, pendingBeforeRestart);
+
+        assertTrue("Some messages should be lost from deleted file, but got: " 
+ remaining,
+                remaining < pendingBeforeRestart);
+        assertTrue("Some messages should survive recovery, but got: " + 
remaining,
+                remaining > 0);
 
-        assertEquals("no queue size ", 0l,  
((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+        assertEquals("no queue size", 0L, ((RegionBroker) 
broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
 
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to