[
https://issues.apache.org/jira/browse/ARTEMIS-4065?focusedWorklogId=832818&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-832818
]
ASF GitHub Bot logged work on ARTEMIS-4065:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Dec/22 17:33
Start Date: 12/Dec/22 17:33
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #4265:
URL: https://github.com/apache/activemq-artemis/pull/4265#discussion_r1044298291
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean
elementsEnabled) {
return this;
}
+ public void reset() {
+ sizeUpdater.set(this, 0);
+ elementsUpdater.set(this, 0);
+ }
+
+ public void reset(long size, long elements) {
+ sizeUpdater.set(this, size);
+ elementsUpdater.set(this, elements);
+ }
Review Comment:
This looks unused?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java:
##########
@@ -141,6 +143,7 @@ default long getGlobalMessages() {
*/
void checkMemory(Runnable runWhenAvailable);
+ void counterSnashot();
Review Comment:
typo in method name
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java:
##########
@@ -56,4 +62,8 @@ public interface PageCursorProvider {
*/
void close(PageSubscription pageCursorImpl);
+ void startCounterRebuild();
+
+ void finishCounterRebuild();
Review Comment:
Perhaps counterRebuildStarted and counterRebuildFinished? More of a
notification than an instruction to start/stop than the existing names, and
groups the 3 counter-related methods together.
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -163,6 +173,21 @@ public final long addSize(final int delta) {
return addSize(delta, false);
}
+ public final void add(final int elements, final long size) {
+
+ long currentSize = sizeUpdater.addAndGet(this, size);
+ long currentElements = elementsUpdater.addAndGet(this, elements);
+
+ if (elements >= 0) {
+ assert size >= 0 : "If elements is positve, size must be positive";
Review Comment:
Should it be checking that if one is 0 the other is also 0, and if one is
positive the other is? Currently it would allow them to differ, one 0 and the
other not. 0 also isnt positive.
Or is the intent to allow doing the 'elements or size only' behaviour the
other methods facilitate?
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -107,6 +107,16 @@ public SizeAwareMetric setElementsEnabled(boolean
elementsEnabled) {
return this;
}
+ public void reset() {
+ sizeUpdater.set(this, 0);
+ elementsUpdater.set(this, 0);
+ }
Review Comment:
Is this method ever used while other things may be updating the counts? If
so they could get out of sync.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+ private volatile boolean rebuilding = false;
+
+
Review Comment:
Superfluous line.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+ private volatile boolean rebuilding = false;
+
+
+ @Override
+ public void markRebuilding() {
+ rebuilding = true;
+ }
+
+ @Override
+ public void finishRebuild() {
+ rebuilding = false;
+ }
+
+
+ @Override
+ public boolean isRebuilding() {
+ return rebuilding;
+ }
+
+
+
+
Review Comment:
Superfluous lines.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet
transactions) {
+ // we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+
+ boolean paging;
+ long limitPageId;
+ int limitMessageNr;
+
+ LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new
LongObjectHashMap<>();
Review Comment:
Doesnt seem like these need to be package private variables.
Constructors more typically go below the variables, looks nicer.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -216,6 +243,11 @@ public void resumeCleanup() {
protected void cleanup() {
+ if (!countersRebuilt) {
+ logger.debug("Counters were not rebuilt yet, cleanup has to be
ignored on address {}", pagingStore != null ? pagingStore.getAddress() :
"NULL");
+ return;
+ }
Review Comment:
When is cleanup called? Should it be deferred and run once the rebuild is
done? Or will another cleanup call come along promptly to replace the one that
was ignored?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java:
##########
@@ -860,6 +868,13 @@ public void processReload() throws Exception {
public void stop() {
}
+ @Override
+ public void counterSnapshot() {
+ if (counter != null) {
+ counter.snapshot();
+ }
Review Comment:
Seems odd for this method to null check when others such as
isCounterPending() dont.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet
transactions) {
+ // we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+
+ boolean paging;
+ long limitPageId;
+ int limitMessageNr;
+
+ LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new
LongObjectHashMap<>();
+
+ private static class CopiedSubscription {
+ CopiedSubscription(PageSubscription subscription) {
+ CopiedSubscription.this.subscriptionCounter =
subscription.getCounter();
+ CopiedSubscription.this.subscription = subscription;
+ }
+
+ private boolean empty = true;
+
+ LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new
LongObjectHashMap<>();
+
+ // this is not a copy! This will be the actual object listed in the
PageSubscription
+ // any changes to this object will reflect in the system and management;
+ PageSubscriptionCounter subscriptionCounter;
+
+ PageSubscription subscription;
+
+ LocalCopiedConsumedPage getPage(long pageNr) {
+ return consumedPageMap.get(pageNr);
+ }
+
+ int addUp;
+ long sizeUp;
+
+ }
+
+ private static class LocalCopiedConsumedPage implements ConsumedPage {
Review Comment:
Having the 'Local' prefix on this class but not the one above seems odd,
they should follow a similar convention.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+
+public abstract class BasePagingCounter implements PageSubscriptionCounter {
+
+ private volatile boolean rebuilding = false;
+
+
+ @Override
+ public void markRebuilding() {
+ rebuilding = true;
+ }
+
+ @Override
+ public void finishRebuild() {
+ rebuilding = false;
+ }
+
+
Review Comment:
Superfluous line.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/BasePagingCounter.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
Review Comment:
Superfluous line.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet
transactions) {
+ // we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+
+ boolean paging;
+ long limitPageId;
+ int limitMessageNr;
+
+ LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new
LongObjectHashMap<>();
+
+ private static class CopiedSubscription {
+ CopiedSubscription(PageSubscription subscription) {
+ CopiedSubscription.this.subscriptionCounter =
subscription.getCounter();
+ CopiedSubscription.this.subscription = subscription;
Review Comment:
CopiedSubscription.this. is superfluous, its a static class.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet
transactions) {
+ // we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+
+ boolean paging;
+ long limitPageId;
+ int limitMessageNr;
+
+ LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new
LongObjectHashMap<>();
+
+ private static class CopiedSubscription {
+ CopiedSubscription(PageSubscription subscription) {
+ CopiedSubscription.this.subscriptionCounter =
subscription.getCounter();
+ CopiedSubscription.this.subscription = subscription;
+ }
+
+ private boolean empty = true;
+
+ LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new
LongObjectHashMap<>();
+
+ // this is not a copy! This will be the actual object listed in the
PageSubscription
+ // any changes to this object will reflect in the system and management;
+ PageSubscriptionCounter subscriptionCounter;
+
+ PageSubscription subscription;
+
+ LocalCopiedConsumedPage getPage(long pageNr) {
+ return consumedPageMap.get(pageNr);
+ }
+
+ int addUp;
+ long sizeUp;
+
+ }
+
+ private static class LocalCopiedConsumedPage implements ConsumedPage {
+ boolean done;
+ IntObjectHashMap<Boolean> acks;
+
+ @Override
+ public long getPageId() {
+ // i'm using an assertion instead of always throwing it just because
it wouldn't be a big deal to call this method
+ // since this is an internal clss;
+ // however I would like to catch it during dev
+ assert false : "Not Implemented";
+ return 0;
+ }
+
+ @Override
+ public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+ // i'm using an assertion instead of always throwing it just because
it wouldn't be a big deal to call this method
+ // since this is an internal clss;
+ // however I would like to catch it during dev
+ assert false : "Not Implemented";
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public boolean isAck(int messageNumber) {
+ if (done) {
+ return true;
+ }
+ if (acks != null) {
+ return acks.get(messageNumber) != null;
+ }
+ return false;
+ }
+ }
+
+ /** this method will perform the copy from Acked recorded from the
subscription into a separate data structure.
+ * So we can count data while we consolidate at the end */
+ private void initialize(PagingStore store) {
+ store.lock(-1);
+ try {
+ try {
+ paging = store.isPaging();
+ if (!paging) {
+ logger.debug("Destination {} was not paging, no need to rebuild
counters");
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ subscription.getCounter().markRebuilding();
+ subscription.getCounter().finishRebuild();
+ });
+
+ store.getCursorProvider().finishCounterRebuild();
+ return;
+ }
+ store.getCursorProvider().startCounterRebuild();
+ Page currentPage = store.getCurrentPage();
+ limitPageId = store.getCurrentWritingPage();
+ limitMessageNr = currentPage.getNumberOfMessages();
+ if (logger.isDebugEnabled()) {
+ logger.debug("PageCounterRebuild for {}, Current writing page
{} and limit will be {} with lastMessage on last page={}",
store.getStoreName(), store.getCurrentWritingPage(), limitPageId,
limitMessageNr);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ limitPageId = store.getCurrentWritingPage();
+ }
+ logger.trace("Copying page store ack information from address {}",
store.getAddress());
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying subscription ID {}",
subscription.getId());
+ }
+
+ CopiedSubscription copiedSubscription = new
CopiedSubscription(subscription);
+ copiedSubscription.subscriptionCounter.markRebuilding();
+ copiedSubscriptionMap.put(subscription.getId(),
copiedSubscription);
+
+ subscription.forEachConsumedPage(consumedPage -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying page {}", consumedPage.getPageId());
+ }
+
+ LocalCopiedConsumedPage copiedConsumedPage = new
LocalCopiedConsumedPage();
+
copiedSubscription.consumedPageMap.put(consumedPage.getPageId(),
copiedConsumedPage);
+ if (consumedPage.isDone()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking page {} as done on the copy",
consumedPage.getPageId());
+ }
+ copiedConsumedPage.done = true;
+ } else {
+ // We only copy the acks if the page is not done
+ // as if the page is done, we just move over
+ consumedPage.forEachAck((messageNR, pagePosition) -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking messageNR {} as acked on
pageID={} copy", messageNR, consumedPage.getPageId());
+ }
+ if (copiedConsumedPage.acks == null) {
+ copiedConsumedPage.acks = new IntObjectHashMap<>();
+ }
+ copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+ });
+ }
+ });
+ });
+ } finally {
+ store.unlock();
+ }
+ }
+
+ private synchronized PageSubscriptionCounter getCounter(long queueID) {
+ CopiedSubscription copiedSubscription =
copiedSubscriptionMap.get(queueID);
+ if (copiedSubscription != null) {
+ return copiedSubscription.subscriptionCounter;
+ } else {
+ return null;
+ }
+ }
+
+ private CopiedSubscription getSubscription(long queueID) {
+ return copiedSubscriptionMap.get(queueID);
+ }
+
+ private boolean isACK(long queueID, long pageNR, int messageNR) {
+ CopiedSubscription subscription = getSubscription(queueID);
+ if (subscription == null) {
+ return true;
+ }
+
+ LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+ if (consumedPage == null) {
+ return false;
+ } else {
+ return consumedPage.isAck(messageNR);
+ }
+ }
+
+ private void done() {
+ copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ try {
+ copiedSubscription.subscriptionCounter.increment(null,
copiedSubscription.addUp, copiedSubscription.sizeUp);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ }
+ if (copiedSubscription.subscriptionCounter != null) {
+ copiedSubscription.subscriptionCounter.finishRebuild();
+ }
+ });
+ pgStore.getCursorProvider().finishCounterRebuild();
+ pgStore.getCursorProvider().scheduleCleanup();
+ }
+
+ @Override
+ public void run() {
+ try {
+ rebuild();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ public void rebuild() throws Exception {
+ if (pgStore == null) {
+ logger.debug("Page store is null during rebuildCounters");
+ return;
+ }
+
+ if (!paging) {
+ logger.debug("Ignoring call to rebuild pgStore {}",
pgStore.getAddress());
+ }
+
+ logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+ for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebuilding counter on messages from page {} on
rebuildCounters for address {}", pgid, pgStore.getAddress());
+ }
+ Page page = pgStore.newPageObject(pgid);
+
+ if (!page.getFile().exists()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Skipping page {} on store {}", pgid,
pgStore.getAddress());
+ }
+ continue;
+ }
+ page.open(false);
+ LinkedList<PagedMessage> msgs = page.read(sm);
+ page.close(false, false);
+
+ try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+ while (iter.hasNext()) {
+ PagedMessage msg = iter.next();
+ if (limitPageId == pgid) {
+ if (msg.getMessageNumber() >= limitMessageNr) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebuild counting on {} go to the last
message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
Review Comment:
"got to the"?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet
transactions) {
+ // we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+
+ boolean paging;
+ long limitPageId;
+ int limitMessageNr;
+
+ LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new
LongObjectHashMap<>();
+
+ private static class CopiedSubscription {
+ CopiedSubscription(PageSubscription subscription) {
+ CopiedSubscription.this.subscriptionCounter =
subscription.getCounter();
+ CopiedSubscription.this.subscription = subscription;
+ }
+
+ private boolean empty = true;
+
+ LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new
LongObjectHashMap<>();
+
+ // this is not a copy! This will be the actual object listed in the
PageSubscription
+ // any changes to this object will reflect in the system and management;
+ PageSubscriptionCounter subscriptionCounter;
+
+ PageSubscription subscription;
+
+ LocalCopiedConsumedPage getPage(long pageNr) {
+ return consumedPageMap.get(pageNr);
+ }
+
+ int addUp;
+ long sizeUp;
+
+ }
+
+ private static class LocalCopiedConsumedPage implements ConsumedPage {
+ boolean done;
+ IntObjectHashMap<Boolean> acks;
+
+ @Override
+ public long getPageId() {
+ // i'm using an assertion instead of always throwing it just because
it wouldn't be a big deal to call this method
+ // since this is an internal clss;
+ // however I would like to catch it during dev
+ assert false : "Not Implemented";
+ return 0;
+ }
+
+ @Override
+ public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+ // i'm using an assertion instead of always throwing it just because
it wouldn't be a big deal to call this method
+ // since this is an internal clss;
+ // however I would like to catch it during dev
+ assert false : "Not Implemented";
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public boolean isAck(int messageNumber) {
+ if (done) {
+ return true;
+ }
+ if (acks != null) {
+ return acks.get(messageNumber) != null;
+ }
+ return false;
+ }
+ }
+
+ /** this method will perform the copy from Acked recorded from the
subscription into a separate data structure.
+ * So we can count data while we consolidate at the end */
+ private void initialize(PagingStore store) {
+ store.lock(-1);
+ try {
+ try {
+ paging = store.isPaging();
+ if (!paging) {
+ logger.debug("Destination {} was not paging, no need to rebuild
counters");
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ subscription.getCounter().markRebuilding();
+ subscription.getCounter().finishRebuild();
+ });
+
+ store.getCursorProvider().finishCounterRebuild();
+ return;
+ }
+ store.getCursorProvider().startCounterRebuild();
+ Page currentPage = store.getCurrentPage();
+ limitPageId = store.getCurrentWritingPage();
+ limitMessageNr = currentPage.getNumberOfMessages();
+ if (logger.isDebugEnabled()) {
+ logger.debug("PageCounterRebuild for {}, Current writing page
{} and limit will be {} with lastMessage on last page={}",
store.getStoreName(), store.getCurrentWritingPage(), limitPageId,
limitMessageNr);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ limitPageId = store.getCurrentWritingPage();
+ }
+ logger.trace("Copying page store ack information from address {}",
store.getAddress());
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying subscription ID {}",
subscription.getId());
+ }
+
+ CopiedSubscription copiedSubscription = new
CopiedSubscription(subscription);
+ copiedSubscription.subscriptionCounter.markRebuilding();
+ copiedSubscriptionMap.put(subscription.getId(),
copiedSubscription);
+
+ subscription.forEachConsumedPage(consumedPage -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying page {}", consumedPage.getPageId());
+ }
+
+ LocalCopiedConsumedPage copiedConsumedPage = new
LocalCopiedConsumedPage();
+
copiedSubscription.consumedPageMap.put(consumedPage.getPageId(),
copiedConsumedPage);
+ if (consumedPage.isDone()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking page {} as done on the copy",
consumedPage.getPageId());
+ }
+ copiedConsumedPage.done = true;
+ } else {
+ // We only copy the acks if the page is not done
+ // as if the page is done, we just move over
+ consumedPage.forEachAck((messageNR, pagePosition) -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking messageNR {} as acked on
pageID={} copy", messageNR, consumedPage.getPageId());
+ }
+ if (copiedConsumedPage.acks == null) {
+ copiedConsumedPage.acks = new IntObjectHashMap<>();
+ }
+ copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+ });
+ }
+ });
+ });
+ } finally {
+ store.unlock();
+ }
+ }
+
+ private synchronized PageSubscriptionCounter getCounter(long queueID) {
+ CopiedSubscription copiedSubscription =
copiedSubscriptionMap.get(queueID);
+ if (copiedSubscription != null) {
+ return copiedSubscription.subscriptionCounter;
+ } else {
+ return null;
+ }
+ }
+
+ private CopiedSubscription getSubscription(long queueID) {
+ return copiedSubscriptionMap.get(queueID);
+ }
+
+ private boolean isACK(long queueID, long pageNR, int messageNR) {
+ CopiedSubscription subscription = getSubscription(queueID);
+ if (subscription == null) {
+ return true;
+ }
+
+ LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+ if (consumedPage == null) {
+ return false;
+ } else {
+ return consumedPage.isAck(messageNR);
+ }
+ }
+
+ private void done() {
+ copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ try {
+ copiedSubscription.subscriptionCounter.increment(null,
copiedSubscription.addUp, copiedSubscription.sizeUp);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ }
+ if (copiedSubscription.subscriptionCounter != null) {
+ copiedSubscription.subscriptionCounter.finishRebuild();
+ }
+ });
+ pgStore.getCursorProvider().finishCounterRebuild();
+ pgStore.getCursorProvider().scheduleCleanup();
+ }
+
+ @Override
+ public void run() {
+ try {
+ rebuild();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ public void rebuild() throws Exception {
+ if (pgStore == null) {
+ logger.debug("Page store is null during rebuildCounters");
+ return;
+ }
+
+ if (!paging) {
+ logger.debug("Ignoring call to rebuild pgStore {}",
pgStore.getAddress());
+ }
+
+ logger.debug("Rebuilding counter for store {}", pgStore.getAddress());
+
+ for (long pgid = pgStore.getFirstPage(); pgid <= limitPageId; pgid++) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebuilding counter on messages from page {} on
rebuildCounters for address {}", pgid, pgStore.getAddress());
+ }
+ Page page = pgStore.newPageObject(pgid);
+
+ if (!page.getFile().exists()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Skipping page {} on store {}", pgid,
pgStore.getAddress());
+ }
+ continue;
+ }
+ page.open(false);
+ LinkedList<PagedMessage> msgs = page.read(sm);
+ page.close(false, false);
+
+ try (LinkedListIterator<PagedMessage> iter = msgs.iterator()) {
+ while (iter.hasNext()) {
+ PagedMessage msg = iter.next();
+ if (limitPageId == pgid) {
+ if (msg.getMessageNumber() >= limitMessageNr) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebuild counting on {} go to the last
message at {}-{}", pgStore.getAddress(), limitPageId, limitMessageNr);
+ }
+ // this is the limit where we should count..
+ // anything beyond this will be new data
+ break;
+ }
+ }
+ msg.initMessage(sm);
+ long[] routedQueues = msg.getQueueIDs();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("reading message for rebuild cursor on {},
pg={}, messageNR={}, routedQueues={}, message={}, queueList={}",
pgStore.getAddress(), msg.getPageNumber(), msg.getMessageNumber(),
routedQueues, msg);
Review Comment:
log message has more placeholders in it than variables are actually provided
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet
transactions) {
+ // we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+
+ boolean paging;
+ long limitPageId;
+ int limitMessageNr;
+
+ LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new
LongObjectHashMap<>();
+
+ private static class CopiedSubscription {
+ CopiedSubscription(PageSubscription subscription) {
+ CopiedSubscription.this.subscriptionCounter =
subscription.getCounter();
+ CopiedSubscription.this.subscription = subscription;
+ }
+
+ private boolean empty = true;
+
+ LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new
LongObjectHashMap<>();
+
+ // this is not a copy! This will be the actual object listed in the
PageSubscription
+ // any changes to this object will reflect in the system and management;
+ PageSubscriptionCounter subscriptionCounter;
+
+ PageSubscription subscription;
+
+ LocalCopiedConsumedPage getPage(long pageNr) {
+ return consumedPageMap.get(pageNr);
+ }
+
+ int addUp;
+ long sizeUp;
+
+ }
+
+ private static class LocalCopiedConsumedPage implements ConsumedPage {
+ boolean done;
+ IntObjectHashMap<Boolean> acks;
+
+ @Override
+ public long getPageId() {
+ // i'm using an assertion instead of always throwing it just because
it wouldn't be a big deal to call this method
+ // since this is an internal clss;
+ // however I would like to catch it during dev
+ assert false : "Not Implemented";
+ return 0;
Review Comment:
Throwing would more uniformly fail, and be more succinct from not needing a
3 line comment to explain it...twice (same below).
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java:
##########
@@ -465,4 +419,17 @@ public void addAndGet(int count, long persistentSize) {
SIZE_UPDATER.addAndGet(this, persistentSize);
}
}
+
+ @Override
+ public PageSubscriptionCounter setSubscription(PageSubscription
subscription) {
+ this.subscription = subscription;
+
+ if (subscription == null) {
+ this.pageExecutor = null;
+ } else {
+ this.pageExecutor = subscription.getPagingStore().getExecutor();
+ assert pageExecutor != null;
Review Comment:
Should the executor be cleared on null?
Actually, is the executor ever actually used?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.LongObjectHashMap;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.ConsumedPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.collections.LinkedList;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.function.BiConsumer;
+
+/** this class will copy current data from the Subscriptions, count messages
while the server is already active
+ * performing other activity */
+public class PageCounterRebuildManager implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final PagingStore pgStore;
+ private final StorageManager sm;
+ private final LongHashSet transactions;
+
+ public PageCounterRebuildManager(PagingStore store, LongHashSet
transactions) {
+ // we make a copy of the data because we are allowing data to influx. We
will consolidate the values at the end
+ initialize(store);
+ this.pgStore = store;
+ this.sm = store.getStorageManager();
+ this.transactions = transactions;
+ }
+
+ boolean paging;
+ long limitPageId;
+ int limitMessageNr;
+
+ LongObjectHashMap<CopiedSubscription> copiedSubscriptionMap = new
LongObjectHashMap<>();
+
+ private static class CopiedSubscription {
+ CopiedSubscription(PageSubscription subscription) {
+ CopiedSubscription.this.subscriptionCounter =
subscription.getCounter();
+ CopiedSubscription.this.subscription = subscription;
+ }
+
+ private boolean empty = true;
+
+ LongObjectHashMap<LocalCopiedConsumedPage> consumedPageMap = new
LongObjectHashMap<>();
+
+ // this is not a copy! This will be the actual object listed in the
PageSubscription
+ // any changes to this object will reflect in the system and management;
+ PageSubscriptionCounter subscriptionCounter;
+
+ PageSubscription subscription;
+
+ LocalCopiedConsumedPage getPage(long pageNr) {
+ return consumedPageMap.get(pageNr);
+ }
+
+ int addUp;
+ long sizeUp;
+
+ }
+
+ private static class LocalCopiedConsumedPage implements ConsumedPage {
+ boolean done;
+ IntObjectHashMap<Boolean> acks;
+
+ @Override
+ public long getPageId() {
+ // i'm using an assertion instead of always throwing it just because
it wouldn't be a big deal to call this method
+ // since this is an internal clss;
+ // however I would like to catch it during dev
+ assert false : "Not Implemented";
+ return 0;
+ }
+
+ @Override
+ public void forEachAck(BiConsumer<Integer, PagePosition> ackConsumer) {
+ // i'm using an assertion instead of always throwing it just because
it wouldn't be a big deal to call this method
+ // since this is an internal clss;
+ // however I would like to catch it during dev
+ assert false : "Not Implemented";
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public boolean isAck(int messageNumber) {
+ if (done) {
+ return true;
+ }
+ if (acks != null) {
+ return acks.get(messageNumber) != null;
+ }
+ return false;
+ }
+ }
+
+ /** this method will perform the copy from Acked recorded from the
subscription into a separate data structure.
+ * So we can count data while we consolidate at the end */
+ private void initialize(PagingStore store) {
+ store.lock(-1);
+ try {
+ try {
+ paging = store.isPaging();
+ if (!paging) {
+ logger.debug("Destination {} was not paging, no need to rebuild
counters");
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ subscription.getCounter().markRebuilding();
+ subscription.getCounter().finishRebuild();
+ });
+
+ store.getCursorProvider().finishCounterRebuild();
+ return;
+ }
+ store.getCursorProvider().startCounterRebuild();
+ Page currentPage = store.getCurrentPage();
+ limitPageId = store.getCurrentWritingPage();
+ limitMessageNr = currentPage.getNumberOfMessages();
+ if (logger.isDebugEnabled()) {
+ logger.debug("PageCounterRebuild for {}, Current writing page
{} and limit will be {} with lastMessage on last page={}",
store.getStoreName(), store.getCurrentWritingPage(), limitPageId,
limitMessageNr);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ limitPageId = store.getCurrentWritingPage();
+ }
+ logger.trace("Copying page store ack information from address {}",
store.getAddress());
+ store.getCursorProvider().forEachSubscription(subscription -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying subscription ID {}",
subscription.getId());
+ }
+
+ CopiedSubscription copiedSubscription = new
CopiedSubscription(subscription);
+ copiedSubscription.subscriptionCounter.markRebuilding();
+ copiedSubscriptionMap.put(subscription.getId(),
copiedSubscription);
+
+ subscription.forEachConsumedPage(consumedPage -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Copying page {}", consumedPage.getPageId());
+ }
+
+ LocalCopiedConsumedPage copiedConsumedPage = new
LocalCopiedConsumedPage();
+
copiedSubscription.consumedPageMap.put(consumedPage.getPageId(),
copiedConsumedPage);
+ if (consumedPage.isDone()) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking page {} as done on the copy",
consumedPage.getPageId());
+ }
+ copiedConsumedPage.done = true;
+ } else {
+ // We only copy the acks if the page is not done
+ // as if the page is done, we just move over
+ consumedPage.forEachAck((messageNR, pagePosition) -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Marking messageNR {} as acked on
pageID={} copy", messageNR, consumedPage.getPageId());
+ }
+ if (copiedConsumedPage.acks == null) {
+ copiedConsumedPage.acks = new IntObjectHashMap<>();
+ }
+ copiedConsumedPage.acks.put(messageNR, Boolean.TRUE);
+ });
+ }
+ });
+ });
+ } finally {
+ store.unlock();
+ }
+ }
+
+ private synchronized PageSubscriptionCounter getCounter(long queueID) {
+ CopiedSubscription copiedSubscription =
copiedSubscriptionMap.get(queueID);
+ if (copiedSubscription != null) {
+ return copiedSubscription.subscriptionCounter;
+ } else {
+ return null;
+ }
+ }
+
+ private CopiedSubscription getSubscription(long queueID) {
+ return copiedSubscriptionMap.get(queueID);
+ }
+
+ private boolean isACK(long queueID, long pageNR, int messageNR) {
+ CopiedSubscription subscription = getSubscription(queueID);
+ if (subscription == null) {
+ return true;
+ }
+
+ LocalCopiedConsumedPage consumedPage = subscription.getPage(pageNR);
+ if (consumedPage == null) {
+ return false;
+ } else {
+ return consumedPage.isAck(messageNR);
+ }
+ }
+
+ private void done() {
+ copiedSubscriptionMap.forEach((k, copiedSubscription) -> {
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ try {
+ copiedSubscription.subscriptionCounter.increment(null,
copiedSubscription.addUp, copiedSubscription.sizeUp);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ if (!copiedSubscription.empty) {
+ copiedSubscription.subscription.notEmpty();
+ }
+ if (copiedSubscription.subscriptionCounter != null) {
+ copiedSubscription.subscriptionCounter.finishRebuild();
+ }
+ });
+ pgStore.getCursorProvider().finishCounterRebuild();
+ pgStore.getCursorProvider().scheduleCleanup();
+ }
+
+ @Override
+ public void run() {
+ try {
+ rebuild();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ public void rebuild() throws Exception {
Review Comment:
There look to be different ways of calling this, suggesting it could happen
concurrently, which doesnt seem like it would be safe.
Issue Time Tracking
-------------------
Worklog Id: (was: 832818)
Time Spent: 1h (was: 50m)
> Option to use non persistent counters in paging. Rebuild them upon start if
> persistence is disabled on them
> -----------------------------------------------------------------------------------------------------------
>
> Key: ARTEMIS-4065
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4065
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Reporter: Clebert Suconic
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Instead of storing records on journal for counting how many records there are
> in paging, the system should instead just swipe the paging system in parallel
> with processing data.
> The changes I'm making will take a snapshot of the current records of paging,
> and then it will read all the pages to rebuild the counters.
> On tests I am making from a real data server, a system that had a lot of
> pages (700) needed less than 1 minute to rebuild the counters, and the
> messages were available to be delivered while the swipe was being done.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)