This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new f2bac5a ARTEMIS-2321 Non-blocking Page::read on page cache
new 56e33bc This closes #2667
f2bac5a is described below
commit f2bac5ad08dc2b31628b622e68056d0e813e989e
Author: Francesco Nigro <[email protected]>
AuthorDate: Thu Apr 25 22:30:53 2019 +0200
ARTEMIS-2321 Non-blocking Page::read on page cache
---
artemis-server/pom.xml | 5 +
.../paging/cursor/impl/PageCursorProviderImpl.java | 106 ++++++++++++++++-----
.../artemis/core/persistence/StorageManager.java | 7 ++
.../journal/AbstractJournalStorageManager.java | 9 ++
.../impl/nullpm/NullStorageManager.java | 6 ++
.../cursor/impl/PageCursorProviderImplTest.java | 77 +++++++++++++++
.../core/transaction/impl/TransactionImplTest.java | 6 ++
.../org.mockito.plugins.MockMaker | 1 +
pom.xml | 2 +
.../tests/integration/client/SendAckFailTest.java | 5 +
10 files changed, 202 insertions(+), 22 deletions(-)
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 6ab2967..e20179f 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -189,6 +189,11 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index ddca7f2..4e03b3b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -19,9 +19,12 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -71,8 +74,19 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
private final SoftValueLongObjectHashMap<PageCache> softCache;
+ private final LongObjectHashMap<CompletableFuture<PageCache>>
inProgressReadPages;
+
private final ConcurrentLongHashMap<PageSubscription> activeCursors = new
ConcurrentLongHashMap<>();
+ private static final long PAGE_READ_TIMEOUT_NS =
TimeUnit.SECONDS.toNanos(30);
+
+ //Any concurrent read page request will wait in a loop the original
Page::read to complete while
+ //printing at intervals a warn message
+ private static final long CONCURRENT_PAGE_READ_TIMEOUT_NS =
TimeUnit.SECONDS.toNanos(10);
+
+ //storageManager.beforePageRead will be attempted in a loop, printing at
intervals a warn message
+ private static final long PAGE_READ_PERMISSION_TIMEOUT_NS =
TimeUnit.SECONDS.toNanos(10);
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -85,6 +99,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
this.storageManager = storageManager;
this.executor = executor;
this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
+ this.inProgressReadPages = new LongObjectHashMap<>();
}
// Public --------------------------------------------------------
@@ -131,43 +146,82 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
@Override
public PageCache getPageCache(final long pageId) {
try {
+ if (pageId > pagingStore.getCurrentWritingPage()) {
+ return null;
+ }
+ boolean createPage = false;
+ CompletableFuture<PageCache> inProgressReadPage;
PageCache cache;
+ Page page = null;
synchronized (softCache) {
- if (pageId > pagingStore.getCurrentWritingPage()) {
+ cache = softCache.get(pageId);
+ if (cache != null) {
+ return cache;
+ }
+ if (!pagingStore.checkPageFileExists((int) pageId)) {
return null;
}
-
- cache = softCache.get(pageId);
- if (cache == null) {
- if (!pagingStore.checkPageFileExists((int) pageId)) {
- return null;
- }
-
+ inProgressReadPage = inProgressReadPages.get(pageId);
+ if (inProgressReadPage == null) {
+ final CompletableFuture<PageCache> readPage = new
CompletableFuture<>();
cache = createPageCache(pageId);
- // anyone reading from this cache will have to wait reading to
finish first
- // we also want only one thread reading this cache
- logger.tracef("adding pageCache pageNr=%d into cursor = %s",
pageId, this.pagingStore.getAddress());
- readPage((int) pageId, cache);
- softCache.put(pageId, cache);
+ page = pagingStore.createPage((int) pageId);
+ createPage = true;
+ inProgressReadPage = readPage;
+ inProgressReadPages.put(pageId, readPage);
+ }
+ }
+ if (createPage) {
+ return readPage(pageId, page, cache, inProgressReadPage);
+ } else {
+ final long startedWait = System.nanoTime();
+ while (true) {
+ try {
+ return
inProgressReadPage.get(CONCURRENT_PAGE_READ_TIMEOUT_NS, TimeUnit.NANOSECONDS);
+ } catch (TimeoutException e) {
+ final long elapsed = System.nanoTime() - startedWait;
+ final long elapsedMillis =
TimeUnit.NANOSECONDS.toMillis(elapsed);
+ logger.warnf("Waiting a concurrent Page::read for pageNr=%d
on cursor %s by %d ms",
+ pageId, pagingStore.getAddress(),
elapsedMillis);
+ }
}
}
-
- return cache;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
- private void readPage(int pageId, PageCache cache) throws Exception {
- Page page = null;
+ private PageCache readPage(long pageId,
+ Page page,
+ PageCache cache,
+ CompletableFuture<PageCache> inProgressReadPage)
throws Exception {
+ logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId,
this.pagingStore.getAddress());
+ boolean acquiredPageReadPermission = false;
try {
- page = pagingStore.createPage(pageId);
-
- storageManager.beforePageRead();
+ final long startedRequest = System.nanoTime();
+ while (!acquiredPageReadPermission) {
+ acquiredPageReadPermission =
storageManager.beforePageRead(PAGE_READ_PERMISSION_TIMEOUT_NS,
TimeUnit.NANOSECONDS);
+ if (!acquiredPageReadPermission) {
+ final long elapsedMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedRequest);
+ logger.warnf("Cannot acquire page read permission of pageNr=%d
on cursor %s after %d ms: consider increasing page-max-concurrent-io or use a
faster disk",
+ pageId, pagingStore.getAddress(), elapsedMillis);
+ }
+ }
page.open();
-
+ final long startedReadPage = System.nanoTime();
List<PagedMessage> pgdMessages = page.read(storageManager);
+ final long elapsedReadPage = System.nanoTime() - startedReadPage;
+ if (elapsedReadPage > PAGE_READ_TIMEOUT_NS) {
+ logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to
read %d bytes", pageId,
+ pagingStore.getAddress(),
TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
+ }
cache.setMessages(pgdMessages.toArray(new
PagedMessage[pgdMessages.size()]));
+ } catch (Throwable t) {
+ inProgressReadPage.completeExceptionally(t);
+ synchronized (softCache) {
+ inProgressReadPages.remove(pageId);
+ }
+ throw t;
} finally {
try {
if (page != null) {
@@ -175,8 +229,16 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
}
} catch (Throwable ignored) {
}
- storageManager.afterPageRead();
+ if (acquiredPageReadPermission) {
+ storageManager.afterPageRead();
+ }
+ }
+ inProgressReadPage.complete(cache);
+ synchronized (softCache) {
+ inProgressReadPages.remove(pageId);
+ softCache.put(pageId, cache);
}
+ return cache;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index d025d5e..73c43fe 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@@ -147,6 +148,12 @@ public interface StorageManager extends IDGenerator,
ActiveMQComponent {
void beforePageRead() throws Exception;
/**
+ * Like {@link #beforePageRead()} but return {@code true} if acquired
within {@code timeout},
+ * {@code false} otherwise.
+ */
+ boolean beforePageRead(long timeout, TimeUnit unit) throws
InterruptedException;
+
+ /**
* We need a safeguard in place to avoid too much concurrent IO happening
on Paging, otherwise
* the system may become unresponsive if too many destinations are reading
all the same time.
* This is called after we read, so we can limit concurrent reads
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 1b92e86..fd14d55 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1633,6 +1633,15 @@ public abstract class AbstractJournalStorageManager
extends CriticalComponentImp
}
@Override
+ public boolean beforePageRead(long timeout, TimeUnit unit) throws
InterruptedException {
+ final Semaphore pageMaxConcurrentIO = this.pageMaxConcurrentIO;
+ if (pageMaxConcurrentIO == null) {
+ return true;
+ }
+ return pageMaxConcurrentIO.tryAcquire(timeout, unit);
+ }
+
+ @Override
public void afterPageRead() throws Exception {
if (pageMaxConcurrentIO != null) {
pageMaxConcurrentIO.release();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 995e57b..577ce8b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.Message;
@@ -578,6 +579,11 @@ public class NullStorageManager implements StorageManager {
}
@Override
+ public boolean beforePageRead(long timeout, TimeUnit unit) throws
InterruptedException {
+ return true;
+ }
+
+ @Override
public void afterPageRead() throws Exception {
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
new file mode 100644
index 0000000..4ed38e8
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.Collections.emptyList;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PageCursorProviderImplTest {
+
+ @Test(timeout = 30_000)
+ public void shouldAllowConcurrentPageReads() throws Exception {
+ final PagingStore pagingStore = mock(PagingStore.class);
+ final StorageManager storageManager = mock(StorageManager.class);
+ when(storageManager.beforePageRead(anyLong(),
any(TimeUnit.class))).thenReturn(true);
+ final int pages = 2;
+ final ArtemisExecutor artemisExecutor = mock(ArtemisExecutor.class);
+ final PageCursorProviderImpl pageCursorProvider = new
PageCursorProviderImpl(pagingStore, storageManager, artemisExecutor, 2);
+ when(pagingStore.getCurrentWritingPage()).thenReturn(pages);
+ when(pagingStore.checkPageFileExists(anyInt())).thenReturn(true);
+ final Page firstPage = mock(Page.class);
+ when(firstPage.getPageId()).thenReturn(1);
+ when(pagingStore.createPage(1)).thenReturn(firstPage);
+ final Page secondPage = mock(Page.class);
+ when(secondPage.getPageId()).thenReturn(2);
+ when(pagingStore.createPage(2)).thenReturn(secondPage);
+ final CountDownLatch finishFirstPageRead = new CountDownLatch(1);
+ final Thread concurrentRead = new Thread(() -> {
+ try {
+ final PageCache cache = pageCursorProvider.getPageCache(2);
+ Assert.assertNotNull(cache);
+ } finally {
+ finishFirstPageRead.countDown();
+ }
+ });
+ try {
+ when(firstPage.read(storageManager)).then(invocationOnMock -> {
+ concurrentRead.start();
+ finishFirstPageRead.await();
+ return emptyList();
+ });
+ Assert.assertNotNull(pageCursorProvider.getPageCache(1));
+ } finally {
+ pageCursorProvider.stop();
+ concurrentRead.interrupt();
+ concurrentRead.join();
+ }
+ }
+
+}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index b51be9a..0a15022 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -290,6 +291,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
+ public boolean beforePageRead(long timeout, TimeUnit unit) throws
InterruptedException {
+ return true;
+ }
+
+ @Override
public void afterPageRead() throws Exception {
}
diff --git
a/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
b/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..ca6ee9c
--- /dev/null
+++
b/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b32a6b0..3c7b1a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1631,6 +1631,8 @@
<exclude>activemq-artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
+ <!-- Mockito -->
+
<exclude>**/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
</excludes>
</configuration>
<executions>
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index b9b686c..9364aa0 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -366,6 +366,11 @@ public class SendAckFailTest extends SpawnedTestBase {
}
@Override
+ public boolean beforePageRead(long timeout, TimeUnit unit) throws
InterruptedException {
+ return manager.beforePageRead(timeout, unit);
+ }
+
+ @Override
public void afterPageRead() throws Exception {
manager.afterPageRead();
}