[ 
https://issues.apache.org/jira/browse/ARTEMIS-4065?focusedWorklogId=832818&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-832818
 ]

ASF GitHub Bot logged work on ARTEMIS-4065:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/22 17:33
            Start Date: 12/Dec/22 17:33
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1044298291


##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean 
elementsEnabled) {
       return this;
    }
 
+   public void reset() {
+      sizeUpdater.set(this, 0);
+      elementsUpdater.set(this, 0);
+   }
+
+   public void reset(long size, long elements) {
+      sizeUpdater.set(this, size);
+      elementsUpdater.set(this, elements);
+   }

Review Comment:
   This looks unused?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java:
##########
@@ -141,6 +143,7 @@ default long getGlobalMessages() {
     */
    void checkMemory(Runnable runWhenAvailable);
 
+   void counterSnashot();

Review Comment:
   typo in method name



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java:
##########
@@ -56,4 +62,8 @@ public interface PageCursorProvider {
     */
    void close(PageSubscription pageCursorImpl);
 
+   void startCounterRebuild();
+
+   void finishCounterRebuild();

Review Comment:
   Perhaps counterRebuildStarted and counterRebuildFinished? More of a 
notification than an instruction to start/stop than the existing names, and 
groups the 3 counter-related methods together.



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -163,6 +173,21 @@ public final long addSize(final int delta) {
       return addSize(delta, false);
    }
 
+   public final void add(final int elements, final long size) {
+
+      long currentSize = sizeUpdater.addAndGet(this, size);
+      long currentElements = elementsUpdater.addAndGet(this, elements);
+
+      if (elements >= 0) {
+         assert size >= 0 : "If elements is positve, size must be positive";

Review Comment:
   
   
   Should it be checking that if one is 0 the other is also 0, and if one is 
positive the other is? Currently it would allow them to differ, one 0 and the 
other not. 0 also isnt positive.
   
   Or is the intent to allow doing the 'elements or size only' behaviour the 
other methods facilitate?



##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean 
elementsEnabled) {
       return this;
    }
 
+   public void reset() {
+      sizeUpdater.set(this, 0);
+      elementsUpdater.set(this, 0);
+   }

Review Comment:
   Is this method ever used while other things may be updating the counts? If 
so they could get out of sync.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+
+   @Override
+   public void markRebuilding() {
+      rebuilding = true;
+   }
+
+   @Override
+   public void finishRebuild() {
+      rebuilding = false;
+   }
+
+
+   @Override
+   public boolean isRebuilding() {
+      return rebuilding;
+   }
+
+
+
+

Review Comment:
   Superfluous lines.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages 
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet 
transactions) {
+      // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new 
LongObjectHashMap<>();

Review Comment:
   Doesnt seem like these need to be package private variables.
   Constructors more typically go below the variables, looks nicer.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -216,6 +243,11 @@ public void resumeCleanup() {
 
    protected void cleanup() {
 
+      if (!countersRebuilt) {
+         logger.debug("Counters were not rebuilt yet, cleanup has to be 
ignored on address {}", pagingStore != null ? pagingStore.getAddress() : 
"NULL");
+         return;
+      }

Review Comment:
   When is cleanup called? Should it be deferred and run once the rebuild is 
done? Or will another cleanup call come along promptly to replace the one that 
was ignored?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -860,6 +868,13 @@ public void processReload() throws Exception {
    public void stop() {
    }
 
+   @Override
+   public void counterSnapshot() {
+      if (counter != null) {
+         counter.snapshot();
+      }

Review Comment:
   Seems odd for this method to null check when others such as 
isCounterPending() dont.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages 
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet 
transactions) {
+      // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new 
LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = 
subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new 
LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the 
PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {

Review Comment:
   Having the 'Local' prefix on this class but not the one above seems odd, 
they should follow a similar convention.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+   private volatile  boolean rebuilding = false;
+
+
+   @Override
+   public void markRebuilding() {
+      rebuilding = true;
+   }
+
+   @Override
+   public void finishRebuild() {
+      rebuilding = false;
+   }
+
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Review Comment:
   Superfluous line.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages 
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet 
transactions) {
+      // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new 
LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = 
subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;

Review Comment:
   CopiedSubscription.this. is superfluous, its a static class.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages 
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet 
transactions) {
+      // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new 
LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = 
subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new 
LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the 
PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because 
it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because 
it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the 
subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild 
counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page 
{} and limit will be {} with lastMessage on last page={}", 
store.getStoreName(), store.getCurrentWritingPage(), limitPageId, 
limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", 
store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", 
subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new 
CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), 
copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new 
LocalCopiedConsumedPage();
+               
copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), 
copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", 
consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on 
pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = 
copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, 
copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", 
pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on 
rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, 
pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last 
message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);

Review Comment:
   "got to the"?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages 
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet 
transactions) {
+      // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new 
LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = 
subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new 
LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the 
PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because 
it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because 
it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the 
subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild 
counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page 
{} and limit will be {} with lastMessage on last page={}", 
store.getStoreName(), store.getCurrentWritingPage(), limitPageId, 
limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", 
store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", 
subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new 
CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), 
copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new 
LocalCopiedConsumedPage();
+               
copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), 
copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", 
consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on 
pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = 
copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, 
copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {
+      if (pgStore == null) {
+         logger.debug("Page store is null during rebuildCounters");
+         return;
+      }
+
+      if (!paging) {
+         logger.debug("Ignoring call to rebuild pgStore {}", 
pgStore.getAddress());
+      }
+
+      logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+      for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Rebuilding counter on messages from page {} on 
rebuildCounters for address {}", pgid, pgStore.getAddress());
+         }
+         Page page = pgStore.newPageObject(pgid);
+
+         if (!page.getFile().exists()) {
+            if (logger.isDebugEnabled()) {
+               logger.debug("Skipping page {} on store {}", pgid, 
pgStore.getAddress());
+            }
+            continue;
+         }
+         page.open(false);
+         LinkedList<PagedMessage> msgs = page.read(sm);
+         page.close(false, false);
+
+         try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+            while (iter.hasNext()) {
+               PagedMessage msg = iter.next();
+               if (limitPageId == pgid) {
+                  if (msg.getMessageNumber() >= limitMessageNr) {
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Rebuild counting on {} go to the last 
message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
+                     }
+                     // this is the limit where we should count..
+                     // anything beyond this will be new data
+                     break;
+                  }
+               }
+               msg.initMessage(sm);
+               long[] routedQueues = msg.getQueueIDs();
+
+               if (logger.isTraceEnabled()) {
+                  logger.trace("reading message for rebuild cursor on {}, 
pg={}, messageNR={}, routedQueues={}, message={}, queueList={}", 
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(), 
routedQueues, msg);

Review Comment:
   log message has more placeholders in it than variables are actually provided



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages 
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet 
transactions) {
+      // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new 
LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = 
subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new 
LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the 
PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because 
it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;

Review Comment:
   Throwing would more uniformly fail, and be more succinct from not needing a 
3 line comment to explain it...twice (same below).



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java:
##########
@@ -465,4 +419,17 @@ public void addAndGet(int count, long persistentSize) {
          SIZE_UPDATER.addAndGet(this, persistentSize);
       }
    }
+
+   @Override
+   public PageSubscriptionCounter setSubscription(PageSubscription 
subscription) {
+      this.subscription = subscription;
+
+      if (subscription == null) {
+         this.pageExecutor = null;
+      } else {
+         this.pageExecutor = subscription.getPagingStore().getExecutor();
+         assert pageExecutor != null;

Review Comment:
   Should the executor be cleared on null? 
   
   Actually, is the executor ever actually used?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages 
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private final PagingStore pgStore;
+   private final StorageManager sm;
+   private final LongHashSet transactions;
+
+   public PageCounterRebuildManager(PagingStore store, LongHashSet 
transactions) {
+      // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
+      initialize(store);
+      this.pgStore = store;
+      this.sm = store.getStorageManager();
+      this.transactions = transactions;
+   }
+
+   boolean paging;
+   long limitPageId;
+   int limitMessageNr;
+
+   LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new 
LongObjectHashMap<>();
+
+   private static class CopiedSubscription {
+      CopiedSubscription(PageSubscription subscription) {
+         CopiedSubscription.this.subscriptionCounter = 
subscription.getCounter();
+         CopiedSubscription.this.subscription = subscription;
+      }
+
+      private boolean empty = true;
+
+      LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new 
LongObjectHashMap<>();
+
+      // this is not a copy! This will be the actual object listed in the 
PageSubscription
+      // any changes to this object will reflect in the system and management;
+      PageSubscriptionCounter subscriptionCounter;
+
+      PageSubscription subscription;
+
+      LocalCopiedConsumedPage getPage(long pageNr) {
+         return consumedPageMap.get(pageNr);
+      }
+
+      int addUp;
+      long sizeUp;
+
+   }
+
+   private static class LocalCopiedConsumedPage implements ConsumedPage {
+      boolean done;
+      IntObjectHashMap<Boolean> acks;
+
+      @Override
+      public long getPageId() {
+         // i'm using an assertion instead of always throwing it just because 
it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+         return 0;
+      }
+
+      @Override
+      public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+         // i'm using an assertion instead of always throwing it just because 
it wouldn't be a big deal to call this method
+         // since this is an internal clss;
+         // however I would like to catch it during dev
+         assert false : "Not Implemented";
+      }
+
+      @Override
+      public boolean isDone() {
+         return done;
+      }
+
+      @Override
+      public boolean isAck(int messageNumber) {
+         if (done) {
+            return true;
+         }
+         if (acks != null) {
+            return acks.get(messageNumber) != null;
+         }
+         return false;
+      }
+   }
+
+   /** this method will perform the copy from Acked recorded from the 
subscription into a separate data structure.
+    * So we can count data while we consolidate at the end */
+   private void initialize(PagingStore store) {
+      store.lock(-1);
+      try {
+         try {
+            paging = store.isPaging();
+            if (!paging) {
+               logger.debug("Destination {} was not paging, no need to rebuild 
counters");
+               store.getCursorProvider().forEachSubscription(subscription -> {
+                  subscription.getCounter().markRebuilding();
+                  subscription.getCounter().finishRebuild();
+               });
+
+               store.getCursorProvider().finishCounterRebuild();
+               return;
+            }
+            store.getCursorProvider().startCounterRebuild();
+            Page currentPage = store.getCurrentPage();
+            limitPageId = store.getCurrentWritingPage();
+            limitMessageNr = currentPage.getNumberOfMessages();
+            if (logger.isDebugEnabled()) {
+               logger.debug("PageCounterRebuild for {}, Current writing page 
{} and limit will be {} with lastMessage on last page={}", 
store.getStoreName(), store.getCurrentWritingPage(), limitPageId, 
limitMessageNr);
+            }
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            limitPageId = store.getCurrentWritingPage();
+         }
+         logger.trace("Copying page store ack information from address {}", 
store.getAddress());
+         store.getCursorProvider().forEachSubscription(subscription -> {
+            if (logger.isTraceEnabled()) {
+               logger.trace("Copying subscription ID {}", 
subscription.getId());
+            }
+
+            CopiedSubscription copiedSubscription = new 
CopiedSubscription(subscription);
+            copiedSubscription.subscriptionCounter.markRebuilding();
+            copiedSubscriptionMap.put(subscription.getId(), 
copiedSubscription);
+
+            subscription.forEachConsumedPage(consumedPage -> {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Copying page {}", consumedPage.getPageId());
+               }
+
+               LocalCopiedConsumedPage copiedConsumedPage = new 
LocalCopiedConsumedPage();
+               
copiedSubscription.consumedPageMap.put(consumedPage.getPageId(), 
copiedConsumedPage);
+               if (consumedPage.isDone()) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Marking page {} as done on the copy", 
consumedPage.getPageId());
+                  }
+                  copiedConsumedPage.done = true;
+               } else {
+                  // We only copy the acks if the page is not done
+                  // as if the page is done, we just move over
+                  consumedPage.forEachAck((messageNR, pagePosition) -> {
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Marking messageNR {} as acked on 
pageID={} copy", messageNR, consumedPage.getPageId());
+                     }
+                     if (copiedConsumedPage.acks == null) {
+                        copiedConsumedPage.acks = new IntObjectHashMap<>();
+                     }
+                     copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+                  });
+               }
+            });
+         });
+      } finally {
+         store.unlock();
+      }
+   }
+
+   private synchronized PageSubscriptionCounter getCounter(long queueID) {
+      CopiedSubscription copiedSubscription = 
copiedSubscriptionMap.get(queueID);
+      if (copiedSubscription != null) {
+         return copiedSubscription.subscriptionCounter;
+      } else {
+         return null;
+      }
+   }
+
+   private CopiedSubscription getSubscription(long queueID) {
+      return copiedSubscriptionMap.get(queueID);
+   }
+
+   private boolean isACK(long queueID, long pageNR, int messageNR) {
+      CopiedSubscription subscription = getSubscription(queueID);
+      if (subscription == null) {
+         return true;
+      }
+
+      LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+      if (consumedPage == null) {
+         return false;
+      } else {
+         return consumedPage.isAck(messageNR);
+      }
+   }
+
+   private void done() {
+      copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+            try {
+               copiedSubscription.subscriptionCounter.increment(null, 
copiedSubscription.addUp, copiedSubscription.sizeUp);
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+         if (!copiedSubscription.empty) {
+            copiedSubscription.subscription.notEmpty();
+         }
+         if (copiedSubscription.subscriptionCounter != null) {
+            copiedSubscription.subscriptionCounter.finishRebuild();
+         }
+      });
+      pgStore.getCursorProvider().finishCounterRebuild();
+      pgStore.getCursorProvider().scheduleCleanup();
+   }
+
+   @Override
+   public void run() {
+      try {
+         rebuild();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   public void rebuild() throws Exception {

Review Comment:
   There look to be different ways of calling this, suggesting it could happen 
concurrently, which doesnt seem like it would be safe.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 832818)
    Time Spent: 1h  (was: 50m)

> Option to use non persistent counters in paging. Rebuild them upon start if 
> persistence is disabled on them
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-4065
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4065
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>            Reporter: Clebert Suconic
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Instead of storing records on journal for counting how many records there are 
> in paging, the system should instead just swipe the paging system in parallel 
> with processing data.
> The changes I'm making will take a snapshot of the current records of paging, 
> and then it will read all the pages to rebuild the counters.
> On tests I am making from a real data server, a system that had a lot of 
> pages (700) needed less than 1 minute to rebuild the counters, and the 
> messages were available to be delivered while the swipe was being done.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to