This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new b4d66b6 ARTEMIS-3054 Fix inconsistencies between replica catchup and
page cleanup
new c90d7df This closes #3400
b4d66b6 is described below
commit b4d66b684a44669615b86d971268f925ae0ee60c
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Jan 6 21:25:46 2021 -0500
ARTEMIS-3054 Fix inconsistencies between replica catchup and page cleanup
---
.../paging/cursor/impl/PageCursorProviderImpl.java | 20 ++-
.../paging/cursor/impl/PageSubscriptionImpl.java | 4 +
.../artemis/core/paging/impl/PagingStoreImpl.java | 10 ++
.../impl/journal/JournalStorageManager.java | 6 +
.../client/JMSPagingFileDeleteTest.java | 6 +-
.../PageCleanupWhileReplicaCatchupTest.java | 174 +++++++++++++++++++++
.../tests/integration/paging/PagingTest.java | 11 +-
7 files changed, 210 insertions(+), 21 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 0a8168d..7ace24b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -428,6 +428,16 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
ArrayList<Page> depagedPages = new ArrayList<>();
+ // This read lock is required
+ // because in case of a replicated configuration
+ // The replication manager will first get a writeLock on the
StorageManager
+ // for a short period when it is getting a list of IDs to send to the
replica
+ // Not getting this lock now could eventually result in a dead lock for
a different order
+ //
+ // I tried to simplify the locks but each PageStore has its own lock, so
this was the best option
+ // I found in order to fix
https://issues.apache.org/jira/browse/ARTEMIS-3054
+ storageManager.readLock();
+
while (true) {
if (pagingStore.lock(100)) {
break;
@@ -471,7 +481,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
}
}
- for (long i = pagingStore.getFirstPage(); i < minPage; i++) {
+ for (long i = pagingStore.getFirstPage(); i <= minPage; i++) {
if (!checkPageCompletion(cursorList, i)) {
break;
}
@@ -495,9 +505,11 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
}
} catch (Exception ex) {
ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(ex,
pagingStore.getAddress());
+ logger.warn(ex.getMessage(), ex);
return;
} finally {
pagingStore.unlock();
+ storageManager.readUnLock();
}
}
finishCleanup(depagedPages);
@@ -625,12 +637,6 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
for (PageSubscription cursor : cursorList) {
cursor.confirmPosition(new
PagePositionImpl(currentPage.getPageId(), -1));
}
-
- // we just need to make sure the storage is done..
- // if the thread pool is full, we will just log it once instead of
looping
- if (!storageManager.waitOnOperations(5000)) {
-
ActiveMQServerLogger.LOGGER.problemCompletingOperations(storageManager.getContext());
- }
} finally {
for (PageSubscription cursor : cursorList) {
cursor.enableAutoCleanup();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 7e1bd2c..67a5cf4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -1182,6 +1182,10 @@ public final class PageSubscriptionImpl implements
PageSubscription {
PageCache localCache = this.cache.get();
if (localCache == null) {
localCache = cursorProvider.getPageCache(pageId);
+ // this could happen if the file does not exist any more, after
cleanup
+ if (localCache == null) {
+ return 0;
+ }
this.cache = new WeakReference<>(localCache);
}
int numberOfMessage = localCache.getNumberOfMessages();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 31c969d..3c248af 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -400,6 +400,16 @@ public class PagingStoreImpl implements PagingStore {
}
}
+ public int getNumberOfFiles() throws Exception {
+ final SequentialFileFactory fileFactory = this.fileFactory;
+ if (fileFactory != null) {
+ List<String> files = fileFactory.listFiles("page");
+ return files.size();
+ }
+
+ return 0;
+ }
+
@Override
public void start() throws Exception {
lock.writeLock().lock();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 3fa73b7..59dcc63 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -637,6 +637,11 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
try {
Map<SimpleString, Collection<Integer>> pageFilesToSync;
storageManagerLock.writeLock().lock();
+
+ // We need to get this lock here in order to
+ // avoid a clash with Page.cleanup();
+ // This was a fix part of
https://issues.apache.org/jira/browse/ARTEMIS-3054
+ pagingManager.lock();
try {
if (isReplicated())
throw new ActiveMQIllegalStateException("already replicating");
@@ -680,6 +685,7 @@ public class JournalStorageManager extends
AbstractJournalStorageManager {
replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
} finally {
storageManagerLock.writeLock().unlock();
+ pagingManager.unlock();
}
sendJournalFile(messageFiles, JournalContent.MESSAGES);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
index ff23351..5e883a6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSPagingFileDeleteTest.java
@@ -196,11 +196,7 @@ public class JMSPagingFileDeleteTest extends JMSTestBase {
timeout = System.currentTimeMillis() + 10000;
- while (timeout > System.currentTimeMillis() &&
pagingStore.getNumberOfPages() != 1) {
- Thread.sleep(100);
- }
-
- assertEquals(1, pagingStore.getNumberOfPages()); //I expected number
of the page is 1, but It was not.
+ Wait.assertEquals(0, pagingStore::getNumberOfPages); //I expected
number of the page is 1, but It was not.
} finally {
if (connection != null) {
connection.close();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java
new file mode 100644
index 0000000..1e977d6
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase {
+
+ private static final Logger logger =
Logger.getLogger(PageCleanupWhileReplicaCatchupTest.class);
+ volatile boolean running = true;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ startBackupServer = false;
+ super.setUp();
+ }
+
+ @Override
+ protected void createConfigs() throws Exception {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final
boolean live) {
+ return getNettyAcceptorTransportConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final
boolean live) {
+ return getNettyConnectorTransportConfiguration(live);
+ }
+
+ @Override
+ protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
+ final Configuration
configuration,
+ final NodeManager
nodeManager,
+ int id) {
+ Map<String, AddressSettings> conf = new HashMap<>();
+ AddressSettings as = new
AddressSettings().setMaxSizeBytes(PAGE_MAX).setPageSizeBytes(PAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ conf.put(ADDRESS.toString(), as);
+ return createInVMFailoverServer(realFiles, configuration, PAGE_SIZE,
PAGE_MAX, conf, nodeManager, id);
+ }
+
+ @Test(timeout = 120_000)
+ public void testPageCleanup() throws Throwable {
+ int numberOfWorkers = 20;
+
+ Worker[] workers = new Worker[numberOfWorkers];
+
+ for (int i = 0; i < 20; i++) {
+ liveServer.getServer().addAddressInfo(new AddressInfo("WORKER_" +
i).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+ liveServer.getServer().createQueue(new QueueConfiguration("WORKER_" +
i).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+ workers[i] = new Worker("WORKER_" + i);
+ workers[i].start();
+ }
+
+ for (int i = 0; i < 25; i++) {
+ logger.debug("Starting replica " + i);
+ backupServer.start();
+ Wait.assertTrue(backupServer.getServer()::isReplicaSync);
+ backupServer.stop();
+ }
+
+ running = false;
+
+ for (Worker worker : workers) {
+ worker.join();
+ }
+
+ Throwable toThrow = null;
+ for (Worker worker : workers) {
+ if (worker.throwable != null) {
+
worker.queue.getPagingStore().getCursorProvider().scheduleCleanup();
+ Thread.sleep(2000);
+ worker.queue.getPagingStore().getCursorProvider().cleanup();
+
+ // This is more a debug statement in case there is an issue with
the test
+ System.out.println("PagingStore(" + worker.queueName +
")::isPaging() = " + worker.queue.getPagingStore().isPaging() + " after test
failure " + worker.throwable.getMessage());
+ toThrow = worker.throwable;
+ }
+ }
+
+ if (toThrow != null) {
+ throw toThrow;
+ }
+
+ for (Worker worker : workers) {
+ PagingStoreImpl storeImpl =
(PagingStoreImpl)worker.queue.getPagingStore();
+ Assert.assertTrue("Store impl " + worker.queueName + " had more files
than expected on " + storeImpl.getFolder(), storeImpl.getNumberOfFiles() <= 1);
+ }
+ }
+
+ class Worker extends Thread {
+
+ final String queueName;
+ final Queue queue;
+ volatile Throwable throwable;
+
+ Worker(String queue) {
+ super("Worker on queue " + queue + " for test on
PageCleanupWhileReplicaCatchupTest");
+ this.queueName = queue;
+ this.queue = liveServer.getServer().locateQueue(queueName);
+ }
+
+ @Override
+ public void run() {
+ try {
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ javax.jms.Queue jmsQueue = session.createQueue(queueName);
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+ MessageProducer producer = session.createProducer(jmsQueue);
+ while (running) {
+ queue.getPagingStore().startPaging();
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("hello " + i));
+ }
+ Wait.assertTrue(queue.getPagingStore()::isPaging);
+ for (int i = 0; i < 10; i++) {
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+ Wait.assertFalse("Waiting for !Paging on " + queueName + "
with folder " + queue.getPagingStore().getFolder(),
queue.getPagingStore()::isPaging);
+ }
+ }
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ this.throwable = e;
+ }
+
+ }
+ }
+
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index ec57605..eaa6c61 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -490,13 +490,6 @@ public class PagingTest extends ActiveMQTestBase {
waitForNotPaging(queue);
server.stop();
-
- HashMap<Integer, AtomicInteger> counts =
countJournalLivingRecords(server.getConfiguration());
-
- AtomicInteger pgComplete = counts.get((int)
JournalRecordIds.PAGE_CURSOR_COMPLETE);
-
- assertTrue(pgComplete == null || pgComplete.get() == 0);
-
}
@Test
@@ -4630,7 +4623,7 @@ public class PagingTest extends ActiveMQTestBase {
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
- Wait.assertEquals(1,
()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
+ Wait.assertEquals(0,
()->server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
}
@Test
@@ -6004,7 +5997,7 @@ public class PagingTest extends ActiveMQTestBase {
locator.close();
- Wait.assertEquals(2, store::getNumberOfPages);
+ Wait.assertEquals(0, store::getNumberOfPages);
} finally {
try {