franz1981 commented on a change in pull request #2645: ARTEMIS-2321 Paging 
scalability and GC improvement
URL: https://github.com/apache/activemq-artemis/pull/2645#discussion_r280542595
 
 

 ##########
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 ##########
 @@ -132,52 +146,99 @@ public PagedReference newReference(final PagePosition 
pos,
    @Override
    public PageCache getPageCache(final long pageId) {
       try {
+         if (pageId > pagingStore.getCurrentWritingPage()) {
+            return null;
+         }
+         boolean createPage = false;
+         CompletableFuture<PageCache> inProgressReadPage;
          PageCache cache;
+         Page page = null;
          synchronized (softCache) {
-            if (pageId > pagingStore.getCurrentWritingPage()) {
+            cache = softCache.get(pageId);
+            if (cache != null) {
+               return cache;
+            }
+            if (!pagingStore.checkPageFileExists((int) pageId)) {
                return null;
             }
-
-            cache = softCache.get(pageId);
-            if (cache == null) {
-               if (!pagingStore.checkPageFileExists((int) pageId)) {
-                  return null;
-               }
-
+            inProgressReadPage = inProgressReadPages.get(pageId);
+            if (inProgressReadPage == null) {
+               final CompletableFuture<PageCache> readPage = new 
CompletableFuture<>();
                cache = createPageCache(pageId);
-               // anyone reading from this cache will have to wait reading to 
finish first
-               // we also want only one thread reading this cache
-               logger.tracef("adding pageCache pageNr=%d into cursor = %s", 
pageId, this.pagingStore.getAddress());
-               readPage((int) pageId, cache);
-               softCache.put(pageId, cache);
+               page = pagingStore.createPage((int) pageId);
+               createPage = true;
+               inProgressReadPage = readPage;
+               inProgressReadPages.put(pageId, readPage);
+            }
+         }
+         if (createPage) {
+            return readPage(pageId, page, cache, inProgressReadPage);
+         } else {
+            final long startedWait = System.nanoTime();
+            while (true) {
+               try {
+                  return 
inProgressReadPage.get(CONCURRENT_PAGE_READ_TIMEOUT_NS, TimeUnit.NANOSECONDS);
+               } catch (TimeoutException e) {
+                  final long elapsed = System.nanoTime() - startedWait;
+                  final long elapsedMillis = 
TimeUnit.NANOSECONDS.toMillis(elapsed);
+                  logger.warnf("Waiting a concurrent Page::read for pageNr=%d 
on cursor %s by %d ms",
+                               pageId, pagingStore.getAddress(), 
elapsedMillis);
+               }
             }
          }
-
-         return cache;
       } catch (Exception e) {
          throw new RuntimeException(e.getMessage(), e);
       }
    }
 
-   private void readPage(int pageId, PageCache cache) throws Exception {
-      Page page = null;
+   private PageCache readPage(long pageId,
+                              Page page,
+                              PageCache cache,
+                              CompletableFuture<PageCache> inProgressReadPage) 
throws Exception {
+      logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, 
this.pagingStore.getAddress());
+      boolean acquiredPageReadPermission = false;
       try {
-         page = pagingStore.createPage(pageId);
-
-         storageManager.beforePageRead();
+         final long startedRequest = System.nanoTime();
+         while (!acquiredPageReadPermission) {
+            acquiredPageReadPermission = 
storageManager.beforePageRead(PAGE_READ_PERMISSION_TIMEOUT_NS, 
TimeUnit.NANOSECONDS);
+            if (!acquiredPageReadPermission) {
+               final long elapsedMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedRequest);
+               logger.warnf("Cannot acquire page read permission of pageNr=%d 
on cursor %s after %d ms: consider increasing page-max-concurrent-io or use a 
faster disk",
+                            pageId, pagingStore.getAddress(), elapsedMillis);
+            }
+         }
          page.open();
-
+         final long startedReadPage = System.nanoTime();
          List<PagedMessage> pgdMessages = page.read(storageManager);
+         final long elapsedReadPage = System.nanoTime() - startedReadPage;
+         if (elapsedReadPage > PAGE_READ_TIMEOUT_NS) {
+            logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to 
read %d bytes", pageId,
 
 Review comment:
   @clebertsuconic @jbertram 
   In order to not modify `Page::read` and introduce a whole new "failure" 
state (half pages read with critical IO error raised) 
   I've preferred to add some more detailed logs in order to detect weird 
behaviours, long wait or `page-max-concurrent-io` misconfigurations.
   I suppose that adding a proper "slowness disk/JVM suicide" deserve its own 
PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to