gemmellr commented on code in PR #5498:
URL: https://github.com/apache/activemq-artemis/pull/5498#discussion_r1967521935


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTimedWriter.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.paging.impl;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.RouteContextList;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.utils.ArtemisCloseable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PageTimedWriter extends ActiveMQScheduledComponent {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStoreImpl pagingStore;
+
+   private final StorageManager storageManager;
+
+   protected final List<PageEvent> pageEvents = new ArrayList<>();
+
+   protected volatile int pendingTasks = 0;
+
+   protected final boolean syncNonTX;
+
+   private final Semaphore writeCredits;
+
+   private final int maxCredits;
+
+   private static final AtomicIntegerFieldUpdater<PageTimedWriter> 
pendingTasksUpdater = 
AtomicIntegerFieldUpdater.newUpdater(PageTimedWriter.class, "pendingTasks");
+
+   public boolean hasPendingIO() {
+      return pendingTasksUpdater.get(this) > 0;
+   }
+
+   public static class PageEvent {
+
+      PageEvent(OperationContext context, PagedMessage message, Transaction 
tx, RouteContextList listCtx, int credits, boolean replicated) {
+         this.context = context;
+         this.message = message;
+         this.listCtx = listCtx;
+         this.replicated = replicated;
+         this.credits = credits;
+         this.tx = tx;
+      }
+
+      final boolean replicated;
+      final PagedMessage message;
+      final OperationContext context;
+      final RouteContextList listCtx;
+      final Transaction tx;
+      final int credits;
+   }
+
+   public PageTimedWriter(int writeCredits, StorageManager storageManager, 
PagingStoreImpl pagingStore, ScheduledExecutorService scheduledExecutor, 
Executor executor, boolean syncNonTX, long timeSync) {
+      super(scheduledExecutor, executor, timeSync, TimeUnit.NANOSECONDS, true);
+      this.pagingStore = pagingStore;
+      this.storageManager = storageManager;
+      this.syncNonTX = syncNonTX;
+      this.writeCredits = new Semaphore(writeCredits);
+      this.maxCredits = writeCredits;
+   }
+
+   public int getMaxCredits() {
+      return maxCredits;
+   }
+
+   @Override
+   public synchronized void stop() {
+      super.stop();
+      processMessages();
+   }
+
+   /** We increment task while holding the readLock.
+    * This is because we verify if the system is paging, and we get out of 
paging when no pending tasks and no pending messages.
+    * We allocate a task while holding the read Lock.
+    * We cannot call addTask within the lock as if the semaphore gets out of 
credits we would deadlock in certain cases. */
+   public void incrementTask() {
+      pendingTasksUpdater.incrementAndGet(this);
+   }
+
+   public void addTask(OperationContext context,
+                                    PagedMessage message,
+                                    Transaction tx,
+                                    RouteContextList listCtx) {
+
+      if (!isStarted()) {
+         throw new IllegalStateException("PageWriter Service is stopped");
+      }
+      int credits = Math.min(message.getEncodeSize() + 
PageReadWriter.SIZE_RECORD, maxCredits);
+      writeCredits.acquireUninterruptibly(credits);
+      synchronized (this) {
+         final boolean replicated = storageManager.isReplicated();
+         PageEvent event = new PageEvent(context, message, tx, listCtx, 
credits, replicated);
+         context.storeLineUp();
+         if (replicated) {
+            context.replicationLineUp();
+         }
+         this.pageEvents.add(event);
+         delay();
+      }
+
+   }
+
+   private synchronized  PageEvent[] extractPendingEvents() {
+      if (pageEvents.isEmpty()) {
+         return null;
+      }
+      PageEvent[] pendingsWrites = new PageEvent[pageEvents.size()];
+      pendingsWrites = pageEvents.toArray(pendingsWrites);
+      pageEvents.clear();
+      return pendingsWrites;
+   }
+
+   @Override
+   public void run() {
+      ArtemisCloseable closeable = storageManager.closeableReadLock(true);
+      if (closeable == null) {
+         logger.trace("Delaying PagedTimedWriter as it's currently locked");
+         delay();
+      } else {
+         try {
+            processMessages();
+         } finally {
+            closeable.close();
+         }
+      }
+   }
+
+   protected void processMessages() {
+      PageEvent[] pendingEvents = extractPendingEvents();
+      if (pendingEvents == null) {
+         return;
+      }
+      OperationContext beforeContext = OperationContextImpl.getContext();
+
+      try {
+         boolean requireSync = false;
+         for (PageEvent event : pendingEvents) {
+            OperationContextImpl.setContext(event.context);
+            pagingStore.directWritePage(event.message, false, 
event.replicated);
+
+            if (event.tx != null || syncNonTX) {
+               requireSync = true;
+            }
+         }
+         if (requireSync) {
+            performSync();
+         }
+
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         for (PageEvent event : pendingEvents) {
+            event.context.onError(ActiveMQExceptionType.IO_ERROR.getCode(), 
e.getClass() + " during ioSync for paging on " + pagingStore.getStoreName() + 
": " + e.getMessage());
+         }
+      } finally {
+         // In case of failure, The context should propagate an exception to 
the client
+         // We send an exception to the client even on the case of a failure
+         // to avoid possible locks and the client not getting the exception 
back

Review Comment:
   Is this comment meant to be on the loop above this, given it is about 
failure?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -205,17 +210,28 @@ public PagingStoreImpl(final SimpleString address,
 
       this.syncNonTransactional = syncNonTransactional;
 
-      if (scheduledExecutor != null && syncTimeout > 0) {
-         this.syncTimer = new PageSyncTimer(this, scheduledExecutor, 
ioExecutor, syncTimeout);
-      } else {
-         this.syncTimer = null;
-      }
+      this.timedWriter = createPageTimedWriter(scheduledExecutor, syncTimeout);
 
       this.cursorProvider = storeFactory.newCursorProvider(this, 
this.storageManager, addressSettings, executor);
 
       this.usingGlobalMaxSize = pagingManager.isUsingGlobalSize();
    }
 
+   // This is an extension point for unit tests to replace the creation of the 
PagedTimeWriter
+   protected PageTimedWriter createPageTimedWriter(ScheduledExecutorService 
scheduledExecutor, long syncTimeout) {
+      if (scheduledExecutor == null) {
+         throw new NullPointerException("scheduledExecutor");
+      }
+      if (executor == null) {
+         throw new NullPointerException("executor");
+      }
+      // notice that any calls on the PageTimedWriter are going to use the 
page's executor.
+      // the scheduledExecutor will transfer the call to the page executor

Review Comment:
   ```suggestion
         // Notice that any calls on the PageTimedWriter are going to use the 
paging store's executor.
         // The scheduledExecutor will transfer the call to the paging store 
executor
   ```



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -204,19 +202,27 @@ default void addSize(int size) {
     */
    boolean checkReleasedMemory();
 
+   void writeLock();
+
    /**
     * Write lock the PagingStore.
     *
     * @param timeout milliseconds to wait for the lock. If value is {@literal 
-1} then wait
     *                indefinitely.
     * @return {@code true} if the lock was obtained, {@code false} otherwise
     */
-   boolean lock(long timeout);
+   boolean writeLock(long timeout);
 
    /**
-    * Releases locks acquired with {@link PagingStore#lock(long)}.
+    * Releases locks acquired with {@link PagingStore#writeLock(long)}.

Review Comment:
   plus those acquired using the new _writeLock()_ with no arg ?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -151,10 +152,12 @@ public class PagingStoreImpl implements PagingStore {
 
    private volatile Page currentPage;
 
-   private volatile boolean paging = false;
+   private boolean paging = false;
 
    private final PageCursorProvider cursorProvider;
 
+   // this lock protects mostly paging attribute.
+   // it is also used to block producers in eventual cases such as dropping a 
queue, but mostly to protect if the storage is in page mode

Review Comment:
   ```suggestion
      // This lock mostly protects the paging field.
      // It is also used to block producers in eventual cases such as dropping 
a queue, but mostly to protect if the storage is in paging mode
   ```



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -151,10 +152,12 @@ public class PagingStoreImpl implements PagingStore {
 
    private volatile Page currentPage;
 
-   private volatile boolean paging = false;
+   private boolean paging = false;

Review Comment:
   This is accessed without locking seemingly from various places/threads, 
which I expect is why it was volatile. Not really seeing that the changes make 
a difference to warrant removing it?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -391,21 +407,93 @@ private void pageLimitReleased() {
       }
    }
 
+
+   @Override
+   public void readLock() {
+      readLock(-1L);
+   }
+
    @Override
-   public boolean lock(long timeout) {
-      if (timeout == -1) {
-         lock.writeLock().lock();
+   public boolean readLock(long timeout) {
+      try {
+         if (timeout == -1) {
+            while (true) {
+               if (tryReadLock(1, TimeUnit.SECONDS)) {
+                  return true;
+               }
+            }
+         } else {
+            if (tryReadLock(timeout, TimeUnit.MILLISECONDS)) {
+               return true;
+            } else {
+               return false;
+            }

Review Comment:
   ```suggestion
               return tryReadLock(timeout, TimeUnit.MILLISECONDS);
   ```



##########
artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java:
##########
@@ -102,15 +103,25 @@
 import org.apache.activemq.artemis.json.JsonObjectBuilder;
 import org.apache.activemq.artemis.utils.JsonLoader;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
 import org.apache.commons.lang3.ClassUtils;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ConfigurationImplTest extends AbstractConfigurationTestBase {
 
+   private ScheduledExecutorService scheduledExecutorService;
+
+   @BeforeEach
+   public void setupScheduledExecutor() {
+      scheduledExecutorService = Executors.newScheduledThreadPool(10);

Review Comment:
   Does it really need a core pool size of 10 threads? When no executor at all 
was being passed originally?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -391,21 +407,93 @@ private void pageLimitReleased() {
       }
    }
 
+
+   @Override
+   public void readLock() {
+      readLock(-1L);
+   }
+
    @Override
-   public boolean lock(long timeout) {
-      if (timeout == -1) {
-         lock.writeLock().lock();
+   public boolean readLock(long timeout) {
+      try {
+         if (timeout == -1) {
+            while (true) {
+               if (tryReadLock(1, TimeUnit.SECONDS)) {
+                  return true;
+               }
+            }
+         } else {
+            if (tryReadLock(timeout, TimeUnit.MILLISECONDS)) {
+               return true;
+            } else {
+               return false;
+            }
+         }
+      } catch (InterruptedException e) {
+         logger.warn(e.getMessage(), e);
+         Thread.currentThread().interrupt();
+         return false;
+      }
+   }
+
+   private boolean tryReadLock(long timeout, TimeUnit unit) throws 
InterruptedException {
+      if (lock.readLock().tryLock(timeout, unit)) {
          return true;
+      } else {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Not able to read lock", new Exception("lock"));

Review Comment:
   If we are going to have these ugly 'trace stacktraces' it might be less 
likely to confuse if the exception message wasnt the same (both "lock") for the 
read lock and later write lock cases.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -731,17 +800,21 @@ private void resetCurrentPage(Page newCurrentPage) {
    @Override
    public void stopPaging() {
       logger.debug("stopPaging being called, while isPaging={} on {}", 
this.paging, this.storeName);
-      lock.writeLock().lock();
+      writeLock();
       try {
          final boolean isPaging = this.paging;
          if (isPaging) {
+            if (timedWriter.hasPendingIO()) {
+               logger.debug("There are pending timed writes. Cannot clear 
paging now.");

Review Comment:
   What are the implications here, e.g. how is a caller meant to know this 
happened and react appropriately to what they asked to occur not actually being 
done?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -391,21 +407,93 @@ private void pageLimitReleased() {
       }
    }
 
+
+   @Override
+   public void readLock() {
+      readLock(-1L);
+   }
+
    @Override
-   public boolean lock(long timeout) {
-      if (timeout == -1) {
-         lock.writeLock().lock();
+   public boolean readLock(long timeout) {
+      try {
+         if (timeout == -1) {
+            while (true) {
+               if (tryReadLock(1, TimeUnit.SECONDS)) {
+                  return true;
+               }
+            }
+         } else {
+            if (tryReadLock(timeout, TimeUnit.MILLISECONDS)) {
+               return true;
+            } else {
+               return false;
+            }
+         }
+      } catch (InterruptedException e) {
+         logger.warn(e.getMessage(), e);
+         Thread.currentThread().interrupt();
+         return false;
+      }
+   }
+
+   private boolean tryReadLock(long timeout, TimeUnit unit) throws 
InterruptedException {
+      if (lock.readLock().tryLock(timeout, unit)) {
          return true;
+      } else {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Not able to read lock", new Exception("lock"));
+         }
+         return false;
       }
+   }
+
+   @Override
+   public void readUnlock() {
+      lock.readLock().unlock();
+   }
+
+   @Override
+   public void writeLock() {
+      writeLock(-1L);
+   }
+
+   @Override
+   public boolean writeLock(long timeout) {
       try {
-         return lock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);
+         if (timeout == -1) {
+            while (true) {
+               if (tryWriteLock(1, TimeUnit.SECONDS)) {
+                  return true;
+               }
+            }
+         } else {
+            if (tryWriteLock(timeout, TimeUnit.MILLISECONDS)) {
+               return true;
+            } else {
+               return false;
+            }

Review Comment:
   ```suggestion
               return tryWriteLock(timeout, TimeUnit.MILLISECONDS);
   ```



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java:
##########
@@ -894,7 +967,28 @@ private SequentialFileFactory checkFileFactory() throws 
Exception {
 
    @Override
    public void forceAnotherPage() throws Exception {
-      openNewPage();
+      forceAnotherPage(false);

Review Comment:
   Should this fallback default perhaps be true instead? If its expected that 
its going to be needed to be true in msot cases, except in cases already on the 
executor, shouldnt most calls thus be passing true, and the ones that dont 
should probably be explicitly saying false? The new method seems to have been 
used most places, so another question is do we need this old one at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to