This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 766e81d26e ARTEMIS-4505 Cleanup page transactions on startup of the
broker
766e81d26e is described below
commit 766e81d26e222466e810c3d9d0608c2a939fe298
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Nov 14 23:35:36 2023 -0500
ARTEMIS-4505 Cleanup page transactions on startup of the broker
---
.../artemis/core/paging/PageTransactionInfo.java | 5 +
.../core/paging/cursor/PageCursorProvider.java | 2 +
.../cursor/impl/PageCounterRebuildManager.java | 7 +-
.../paging/cursor/impl/PageCursorProviderImpl.java | 13 +-
.../core/paging/impl/PageTransactionInfoImpl.java | 15 +++
.../core/paging/impl/PagingManagerImpl.java | 87 ++++++++++++--
.../artemis/core/server/ActiveMQServerLogger.java | 9 ++
.../core/server/impl/ActiveMQServerImpl.java | 2 +-
.../core/server/files/FileMoveManagerTest.java | 2 +-
.../paging/PageTransactionCleanupTest.java | 133 +++++++++++++++++++++
10 files changed, 258 insertions(+), 17 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
index e977c00ed0..9692e2b7a6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
@@ -73,4 +73,9 @@ public interface PageTransactionInfo extends EncodingSupport {
*/
boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription
cursor, PagedReference pagedMessage);
+ /** Used on PageRebuildManager to cleanup orphaned Page Transactions */
+ boolean isOrphaned();
+
+ PageTransactionInfo setOrphaned(boolean orphaned);
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index b42049fe55..526289aa00 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -67,4 +67,6 @@ public interface PageCursorProvider {
void counterRebuildDone();
+ boolean isRebuildDone();
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
index 62d40de398..e4cac16264 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
@@ -39,10 +39,12 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
/** this class will copy current data from the Subscriptions, count messages
while the server is already active
* performing other activity */
+// TODO: Rename this as RebuildManager in a future major version
public class PageCounterRebuildManager implements Runnable {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -58,7 +60,7 @@ public class PageCounterRebuildManager implements Runnable {
private final Set<Long> storedLargeMessages;
- public PageCounterRebuildManager(PagingManager pagingManager, PagingStore
store, Map<Long, PageTransactionInfo> transactions, Set<Long>
storedLargeMessages) {
+ public PageCounterRebuildManager(PagingManager pagingManager, PagingStore
store, Map<Long, PageTransactionInfo> transactions, Set<Long>
storedLargeMessages, AtomicLong minPageTXIDFound) {
// we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
initialize(store);
this.pagingManager = pagingManager;
@@ -253,6 +255,9 @@ public class PageCounterRebuildManager implements Runnable {
if (msg.getTransactionID() > 0) {
txInfo = transactions.get(msg.getTransactionID());
+ if (txInfo != null) {
+ txInfo.setOrphaned(false);
+ }
}
Transaction preparedTX = txInfo == null ? null :
txInfo.getPreparedTransaction();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index dea4cd5fbc..1cf6c1cd47 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -61,7 +61,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
// We can't call cleanup before counters were rebuilt
// as they will determine if a subscription is empty or not
- protected volatile boolean countersRebuilt = true;
+ protected volatile boolean rebuildDone = true;
protected final PagingStore pagingStore;
@@ -268,7 +268,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
protected void cleanup() {
- if (!countersRebuilt) {
+ if (!rebuildDone) {
logger.debug("Counters were not rebuilt yet, cleanup has to be
ignored on address {}", pagingStore != null ? pagingStore.getAddress() :
"NULL");
return;
}
@@ -611,11 +611,16 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
@Override
public void counterRebuildStarted() {
- this.countersRebuilt = false;
+ this.rebuildDone = false;
}
@Override
public void counterRebuildDone() {
- this.countersRebuilt = true;
+ this.rebuildDone = true;
+ }
+
+ @Override
+ public boolean isRebuildDone() {
+ return this.rebuildDone;
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index 3bcdcf4d4f..2ec4d242a7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -67,11 +67,26 @@ public final class PageTransactionInfoImpl implements
PageTransactionInfo {
private List<LateDelivery> lateDeliveries;
+ /** To be used during by the RebuildManager.
+ * When reading transactions not found transactions are marked as done. */
+ private boolean orphaned;
+
public PageTransactionInfoImpl(final long transactionID) {
this();
this.transactionID = transactionID;
}
+ @Override
+ public boolean isOrphaned() {
+ return orphaned;
+ }
+
+ @Override
+ public PageTransactionInfoImpl setOrphaned(boolean orphaned) {
+ this.orphaned = orphaned;
+ return this;
+ }
+
public PageTransactionInfoImpl() {
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 41773760be..b97c9448b3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.paging.impl;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -27,6 +28,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import io.netty.util.collection.LongObjectHashMap;
@@ -37,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import
org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -48,6 +52,7 @@ import
org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.lang.invoke.MethodHandles;
import java.util.function.BiConsumer;
@@ -56,6 +61,8 @@ import static
org.apache.activemq.artemis.core.server.files.FileStoreMonitor.Fil
public final class PagingManagerImpl implements PagingManager {
+ private static final int PAGE_TX_CLEANUP_PRINT_LIMIT = 1000;
+
private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL =
Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval",
"60"));
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -76,6 +83,8 @@ public final class PagingManagerImpl implements PagingManager
{
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
+ private final ActiveMQServer server;
+
private PagingStoreFactory pagingStoreFactory;
private volatile boolean globalFull;
@@ -125,7 +134,8 @@ public final class PagingManagerImpl implements
PagingManager {
final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
final long maxSize,
final long maxMessages,
- final SimpleString managementAddress) {
+ final SimpleString managementAddress,
+ final ActiveMQServer server) {
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
@@ -138,14 +148,16 @@ public final class PagingManagerImpl implements
PagingManager {
globalSizeMetric.setUnderCallback(() -> setGlobalFull(false));
this.managerExecutor = pagingSPI.newExecutor();
this.managementAddress = managementAddress;
+ this.server = server;
}
SizeAwareMetric getSizeAwareMetric() {
return globalSizeMetric;
}
-
- /** To be used in tests only called through PagingManagerTestAccessor */
+ /**
+ * To be used in tests only called through PagingManagerTestAccessor
+ */
void resetMaxSize(long maxSize, long maxMessages) {
this.maxSize = maxSize;
this.maxMessages = maxMessages;
@@ -164,13 +176,13 @@ public final class PagingManagerImpl implements
PagingManager {
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository) {
- this(pagingSPI, addressSettingsRepository, -1, -1, null);
+ this(pagingSPI, addressSettingsRepository, -1, -1, null, null);
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
final SimpleString managementAddress) {
- this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress);
+ this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress,
null);
}
@Override
@@ -325,7 +337,6 @@ public final class PagingManagerImpl implements
PagingManager {
}
}
-
@Override
public boolean isGlobalFull() {
return diskFull || maxSize > 0 && globalFull;
@@ -513,7 +524,6 @@ public final class PagingManagerImpl implements
PagingManager {
}
}
-
@Override
public synchronized void stop() throws Exception {
if (!started) {
@@ -583,22 +593,79 @@ public final class PagingManagerImpl implements
PagingManager {
public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap();
// making a copy
- transactions.forEach(transactionsSet::put);
+ transactions.forEach((a, b) -> {
+ transactionsSet.put(a, b);
+ b.setOrphaned(true);
+ });
+ AtomicLong minLargeMessageID = new AtomicLong(Long.MAX_VALUE);
+
+ // make a copy of the stores
+ Map<SimpleString, PagingStore> currentStoreMap = new HashMap<>();
+ stores.forEach(currentStoreMap::put);
if (logger.isDebugEnabled()) {
logger.debug("Page Transactions during rebuildCounters:");
transactionsSet.forEach((a, b) -> logger.debug("{} = {}", a, b));
}
- stores.forEach((address, pgStore) -> {
- PageCounterRebuildManager rebuildManager = new
PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages);
+ currentStoreMap.forEach((address, pgStore) -> {
+ PageCounterRebuildManager rebuildManager = new
PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages,
minLargeMessageID);
logger.debug("Setting destination {} to rebuild counters", address);
managerExecutor.execute(rebuildManager);
});
+ managerExecutor.execute(() -> cleanupPageTransactions(transactionsSet,
currentStoreMap));
+
FutureTask<Object> task = new FutureTask<>(() -> null);
managerExecutor.execute(task);
return task;
}
+
+ private void cleanupPageTransactions(Map<Long, PageTransactionInfo>
transactionSet, Map<SimpleString, PagingStore> currentStoreMap) {
+ if (server == null) {
+ logger.warn("Server attribute was not set, cannot proceed with page
transaction cleanup");
+ }
+ AtomicBoolean proceed = new AtomicBoolean(true);
+
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // I'm now checking if all pagingStore have finished rebuilding the page
counter.
+ // this would be only false if some exception happened on previous
executions on the rebuild manager.
+ // so if any exception happened, the cleanup is not going to be done
+ currentStoreMap.forEach((a, b) -> {
+ if (!b.getCursorProvider().isRebuildDone()) {
+ logger.warn("cannot proceed on cleaning up page transactions as
page cursor for {} is not done rebuilding it", b.getAddress());
+ proceed.set(false);
+ }
+ });
+
+ if (!proceed.get()) {
+ return;
+ }
+
+ AtomicLong txRemoved = new AtomicLong(0);
+
+ transactionSet.forEach((a, b) -> {
+ if (b.isOrphaned()) {
+ b.onUpdate(b.getNumberOfMessages(), server.getStorageManager(),
this);
+ txRemoved.incrementAndGet();
+
+ // I'm pringing up to 1000 records, id by ID..
+ if (txRemoved.get() < PAGE_TX_CLEANUP_PRINT_LIMIT) {
+ ActiveMQServerLogger.LOGGER.removeOrphanedPageTransaction(a);
+ } else {
+ // after a while, I start just printing counters to speed up
things a bit
+ if (txRemoved.get() % PAGE_TX_CLEANUP_PRINT_LIMIT == 0) {
+
ActiveMQServerLogger.LOGGER.cleaningOrphanedTXCleanup(txRemoved.get());
+ }
+
+ }
+ }
+ });
+
+ if (txRemoved.get() > 0) {
+
ActiveMQServerLogger.LOGGER.completeOrphanedTXCleanup(txRemoved.get());
+ } else {
+ logger.debug("Complete cleanupPageTransactions with no orphaned
records found");
+ }
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 51b1580dac..966661365b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1593,4 +1593,13 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224130, value = "The {} value {} will override the {}
value {} since both are set.", level = LogMessage.Level.INFO)
void configParamOverride(String overridingParam, Object
overridingParamValue, String overriddenParam, Object overriddenParamValue);
+
+ @LogMessage(id = 224131, value = "Removing orphaned page transaction {}",
level = LogMessage.Level.INFO)
+ void removeOrphanedPageTransaction(long transactionID);
+
+ @LogMessage(id = 224132, value = "{} orphaned page transactions were
removed", level = LogMessage.Level.INFO)
+ void completeOrphanedTXCleanup(long numberOfPageTx);
+
+ @LogMessage(id = 224133, value = "{} orphaned page transactions have been
removed", level = LogMessage.Level.INFO)
+ void cleaningOrphanedTXCleanup(long numberOfPageTx);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 82c88f4bf6..7f072eb1d4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -3048,7 +3048,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
@Override
public PagingManager createPagingManager() throws Exception {
- return new PagingManagerImpl(getPagingStoreFactory(),
addressSettingsRepository, configuration.getGlobalMaxSize(),
configuration.getGlobalMaxMessages(), configuration.getManagementAddress());
+ return new PagingManagerImpl(getPagingStoreFactory(),
addressSettingsRepository, configuration.getGlobalMaxSize(),
configuration.getGlobalMaxMessages(), configuration.getManagementAddress(),
this);
}
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
index 0991420ac2..4cd01b4bf8 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
@@ -307,7 +307,7 @@ public class FileMoveManagerTest {
PagingStoreFactoryNIO storeFactory = new
PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new
OrderedExecutorFactory(threadPool), new OrderedExecutorFactory(threadPool),
true, null);
- PagingManagerImpl managerImpl = new
PagingManagerImpl(storeFactory, addressSettings, -1, -1,
ActiveMQDefaultConfiguration.getDefaultManagementAddress());
+ PagingManagerImpl managerImpl = new
PagingManagerImpl(storeFactory, addressSettings, -1, -1,
ActiveMQDefaultConfiguration.getDefaultManagementAddress(), null);
managerImpl.start();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java
new file mode 100644
index 0000000000..5cf6c8b591
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PagingOrderTest.
+ * <br>
+ * PagingTest has a lot of tests already. I decided to create a newer one more
specialized on Ordering and counters
+ */
+public class PageTransactionCleanupTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int PAGE_MAX = 100 * 1024;
+
+ private static final int PAGE_SIZE = 10 * 1024;
+
+ static final SimpleString ADDRESS = new SimpleString("TestQueue");
+
+ @Test
+ public void testPageTXCleanup() throws Throwable {
+
+ Configuration config =
createDefaultConfig(true).setJournalSyncNonTransactional(false);
+
+ ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX,
new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ Queue queue1 = server.createQueue(new
QueueConfiguration("test1").setRoutingType(RoutingType.ANYCAST));
+ Queue queue2 = server.createQueue(new
QueueConfiguration("test2").setRoutingType(RoutingType.ANYCAST));
+
+ queue1.getPagingStore().startPaging();
+ queue2.getPagingStore().startPaging();
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+
+ final int NUMBER_OF_MESSAGES = 30;
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ for (int producerID = 1; producerID <= 2; producerID++) {
+ MessageProducer producer =
session.createProducer(session.createQueue("test" + producerID));
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage("hello " + i));
+ session.commit();
+ }
+ }
+ }
+
+ PagingStoreImpl store = (PagingStoreImpl) queue1.getPagingStore();
+ File folder = store.getFolder();
+
+ server.stop();
+
+ for (String fileName : folder.list((dir, f) -> f.endsWith(".page"))) {
+ File fileToRemove = new File(folder, fileName);
+ fileToRemove.delete();
+ logger.debug("removing file {}", fileToRemove);
+ }
+
+ try (AssertionLoggerHandler handler = new AssertionLoggerHandler()) {
+ server.start();
+ Wait.assertTrue(() -> handler.findText("AMQ224132"));
+ }
+
+
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(60_000);
+
+ HashMap<Integer, AtomicInteger> countedJournal =
countJournal(server.getConfiguration());
+ Assert.assertEquals(NUMBER_OF_MESSAGES, countedJournal.get(35).get());
+
+ server.stop();
+ server.start();
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ for (int producerID = 1; producerID <= 2; producerID++) {
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("test" + producerID));
+ for (int i = 0; i < (producerID == 1 ? 0 : NUMBER_OF_MESSAGES);
i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull("message not received on producer + " +
producerID + ", message " + i, message);
+ Assert.assertEquals("could not find message " + i + " on
producerID=" + producerID, "hello " + i, message.getText());
+ session.commit();
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ consumer.close();
+ }
+ }
+ }
+
+}