This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new b92cde1 ARTEMIS-2430 Avoid data loss when live page cache evicted
new 8963cd9 This closes #2766
b92cde1 is described below
commit b92cde165b0ce822972eee723a950653c3ee5ff4
Author: Wei Yang <[email protected]>
AuthorDate: Wed Jul 24 19:42:49 2019 +0800
ARTEMIS-2430 Avoid data loss when live page cache evicted
---
.../paging/cursor/impl/PageCursorProviderImpl.java | 6 +++
.../activemq/artemis/core/paging/impl/Page.java | 4 ++
.../tests/integration/paging/PagingTest.java | 63 ++++++++++++++++++++++
3 files changed, 73 insertions(+)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 4e03b3b..3abd886 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -161,6 +161,12 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
if (!pagingStore.checkPageFileExists((int) pageId)) {
return null;
}
+ Page currentPage = pagingStore.getCurrentPage();
+ // Live page cache might be cleared by gc, we need to retrieve it
otherwise partially written page cache is being returned
+ if (currentPage != null && currentPage.getPageId() == pageId &&
(cache = currentPage.getLiveCache()) != null) {
+ softCache.put(cache.getPageId(), cache);
+ return cache;
+ }
inProgressReadPage = inProgressReadPages.get(pageId);
if (inProgressReadPage == null) {
final CompletableFuture<PageCache> readPage = new
CompletableFuture<>();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 5f98b4c..fb236f8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -100,6 +100,10 @@ public final class Page implements Comparable<Page> {
this.pageCache = pageCache;
}
+ public LivePageCache getLiveCache() {
+ return pageCache;
+ }
+
public synchronized List<PagedMessage> read(StorageManager storage) throws
Exception {
if (logger.isDebugEnabled()) {
logger.debug("reading page " + this.pageId + " on address = " +
storeName);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 5dd5cc2..19eee08 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -6586,6 +6586,69 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
}
+ // We send messages to page, evict live page cache, send last message when
mid consumed, and expect to receive all messages
+ @Test
+ public void testLivePageCacheEvicted() throws Throwable {
+ clearDataRecreateServerDirs();
+
+ Configuration config =
createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+ server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX);
+ server.start();
+
+ try {
+ ServerLocator locator =
createInVMNonHALocator().setBlockOnDurableSend(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(true, true, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.startPaging();
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+ int num = 10;
+ for (int i = 0; i < num; i++) {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("index", i);
+ prod.send(msg);
+ }
+
+ session.start();
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+ ClientMessage msgReceivedCons = null;
+ // simulate the live page cache evicted
+ store.getCursorProvider().clearCache();
+ for (int i = 0; i < num; i++) {
+ msgReceivedCons = cons.receive(1000);
+ assertNotNull(msgReceivedCons);
+ assertTrue(msgReceivedCons.getIntProperty("index") == i);
+ msgReceivedCons.acknowledge();
+
+ session.commit();
+
+ if (i == num / 2) {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("index", num);
+ prod.send(msg);
+ }
+ }
+
+ msgReceivedCons = cons.receive(1000);
+ assertNotNull(msgReceivedCons);
+ assertTrue(msgReceivedCons.getIntProperty("index") == num);
+ msgReceivedCons.acknowledge();
+
+ assertNull(cons.receiveImmediate());
+
+ session.commit();
+ session.close();
+
+ waitForNotPaging(store);
+ } finally {
+ server.stop();
+ }
+ }
+
@Override
protected Configuration createDefaultConfig(final int serverID, final
boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);