gemmellr commented on code in PR #4101:
URL: https://github.com/apache/activemq-artemis/pull/4101#discussion_r891092466
##########
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java:
##########
@@ -398,46 +399,50 @@ private void printPagedMessagesAsXML() {
ActiveMQServerLogger.LOGGER.debug("Reading page store " + store
+ " folder = " + folder);
int pageId = (int) pageStore.getFirstPage();
- for (int i = 0; i < pageStore.getNumberOfPages(); i++) {
+ for (long i = 0; i < pageStore.getNumberOfPages(); i++) {
ActiveMQServerLogger.LOGGER.debug("Reading page " + pageId);
- Page page = pageStore.createPage(pageId);
+ Page page = pageStore.newPageObject(pageId);
Review Comment:
The pageId variable above should be changed to _long_ as well, it is
currently being truncated to _int_ on declaration and then widened back to
_long_ here.
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.function.Consumer;
+
+public class EmptyList<E> implements LinkedList<E> {
+
+
+ private static final LinkedList emptyList = new EmptyList();
+
+ public static final <T> LinkedList<T> getEmptyList() {
+ return (LinkedList<T>) emptyList;
+ }
+
+ private EmptyList() {
+ }
+
+
+
+
+ @Override
+ public void addHead(E e) {
+ }
Review Comment:
Its usually nicer for unmodifiable lists to throw on attempts to change
them, so that people who misuse them find it out pretty quickly, as opposed to
having to investigate later where exactly things happened to unexpectedly get
black-holed.
(Same comment applies to all other manipulating methods)
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SizeAwareMetric.java:
##########
@@ -59,6 +59,13 @@ public interface AddCallback {
private Runnable underCallback;
+ /** To be used in a case where we just measure elements */
+ public SizeAwareMetric() {
+ this.sizeEnabled = false;
+ this.elementsEnabled = false;
+ }
Review Comment:
So, a counter? Why use this rather than a basic/atomic numeric type?
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java:
##########
@@ -102,6 +103,17 @@ public void addHead(E e) {
size++;
}
+ @Override
+ public E get(int position) {
+ Node<E> current = head.next;
+
+ for (int i = 0; i < position && current != null; i++) {
+ current = current.next;
+ }
+
+ return current.val();
Review Comment:
This assumes current exists but it may not (which the for handles). Should
throw index out of bounds rather than NPE?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java:
##########
@@ -1261,6 +1265,14 @@ protected Pair<String, AddressSettings>
parseAddressSettings(final Node node) {
long pageSizeLong =
ByteUtil.convertTextBytes(getTrimmedTextContent(child));
Validators.POSITIVE_INT.validate(PAGE_SIZE_BYTES_NODE_NAME,
pageSizeLong);
addressSettings.setPageSizeBytes((int) pageSizeLong);
+ } else if (MAX_READ_PAGE_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
+ long maxReadPageMessages =
Long.parseLong(getTrimmedTextContent(child));
+ Validators.POSITIVE_INT.validate(PAGE_SIZE_BYTES_NODE_NAME,
maxReadPageMessages);
+ addressSettings.setMaxReadPageMessages((int)maxReadPageMessages);
+ } else if (MAX_READ_PAGE_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
+ long maxReadPageBytes =
ByteUtil.convertTextBytes(getTrimmedTextContent(child));
+ Validators.POSITIVE_INT.validate(PAGE_SIZE_BYTES_NODE_NAME,
maxReadPageBytes);
Review Comment:
Both the validate calls pass the same wrong name.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java:
##########
@@ -186,5 +169,5 @@ public interface PageSubscription {
void incrementDeliveredSize(long size);
- void removePendingDelivery(PagePosition position);
+ void removePendingDelivery(PagedMessage position);
Review Comment:
Ditto
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -327,17 +156,11 @@ public void stop() {
}
private void waitForFuture() {
- if (!executor.flush(10, TimeUnit.SECONDS)) {
- ActiveMQServerLogger.LOGGER.timedOutStoppingPagingCursor(executor);
- ActiveMQServerLogger.LOGGER.threadDump(ThreadDumpUtil.threadDump(""));
- }
+ pagingStore.flushExecutors();
Review Comment:
seems odd for this private waitForFuture() method to exist when it doesnt
actually wait for a future, and instead is flushing the executor, and is called
from the flushExecutors() method...
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/ReaderContext.java:
##########
@@ -1,24 +1,49 @@
-/*
+/**
Review Comment:
Ditto
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -462,7 +285,12 @@ public void cleanup() {
// Then we do some check on eventual pages that can be already
removed but they are away from the streaming
cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList,
minPage, firstPage);
- if (pagingStore.getNumberOfPages() == 0 ||
pagingStore.getNumberOfPages() == 1 &&
pagingStore.getCurrentPage().getNumberOfMessages() == 0) {
+ if (pagingStore.getNumberOfPages() < 0) {
+ new Exception("WHAT???").printStackTrace(System.out);
Review Comment:
needs tidied up
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.function.Consumer;
+
+public class EmptyList<E> implements LinkedList<E> {
+
+
+ private static final LinkedList emptyList = new EmptyList();
Review Comment:
uppercase for constant?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageIterator.java:
##########
@@ -21,8 +21,13 @@
public interface PageIterator extends LinkedListIterator<PagedReference> {
- void redeliver(PagePosition reference);
+ enum NextResult {
+ noElements,
+ hasElements,
+ retry
+ }
+ void redeliver(PagedReference reference);
// return 0 if no elements, 1 if having more elements, 2 if taking too long
to find
- int tryNext();
+ NextResult tryNext();
Review Comment:
Comment is inaccurate now it doesnt return int.
##########
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java:
##########
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.function.Consumer;
+
+public class EmptyList<E> implements LinkedList<E> {
+
+
+ private static final LinkedList emptyList = new EmptyList();
+
+ public static final <T> LinkedList<T> getEmptyList() {
+ return (LinkedList<T>) emptyList;
+ }
+
+ private EmptyList() {
+ }
+
+
+
+
+ @Override
+ public void addHead(E e) {
+ }
+
+ @Override
+ public void addTail(E e) {
+ }
+
+ @Override
+ public E get(int position) {
+ return null;
+ }
+
+ @Override
+ public E poll() {
+ return null;
+ }
+
+ LinkedListIterator<E> emptyIterator = new LinkedListIterator<E>() {
Review Comment:
Could be private static final also?
##########
artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml:
##########
@@ -141,6 +141,13 @@
${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
<max-size-bytes>-1</max-size-bytes>
<!-- limit for the address in messages, -1 means unlimited -->
<max-size-messages>-1</max-size-messages>
+ <!-- the size of each file on paging. Notice we keep files in
memory while they are in use.
+ Lower this setting if you have too many queues in memory. -->
+ <page-size-bytes>10M</page-size-bytes>
+ <!-- how many messages are kept in memory from paging. The system
will stop reading whenever this or max-read-page-messages hits the max first.
-->
+ <max-read-page-messages>1000</max-read-page-messages>
Review Comment:
Comment doesnt make sense, it says it will stop reading whenever 'this or
max-read-page-messages', but the latter is the thing currently being described.
Presumably max-read-page-bytes instead?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java:
##########
@@ -94,9 +99,14 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
*/
boolean page(Message message, Transaction tx, RouteContextList listCtx)
throws Exception;
- Page createPage(int page) throws Exception;
+ Page usePage(long page);
+
+ /** Use this method when you want to use the cache of used pages. If you
are just using offline (e.g. print-data), use the newPage method.*/
Review Comment:
There is no 'newPage' method. newPageObject?
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java:
##########
@@ -133,14 +131,9 @@ public interface PageSubscription {
void processReload() throws Exception;
- void addPendingDelivery(PagePosition position);
+ void addPendingDelivery(PagedMessage position);
Review Comment:
variable seems misnamed now
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java:
##########
@@ -50,14 +47,24 @@ public class PagedReferenceImpl extends
LinkedListImpl.Node<PagedReferenceImpl>
private int messageEstimate = -1;
+ PagePosition position;
+
+ @Override
+ public PagePosition getPosition() {
+ if (position == null) {
+ position = getPagedMessage().newPositionObject();
+ }
+ return position;
+ }
Review Comment:
Can this be called from different threads? There are synchronized and
volatile uses elsewhere in the class (including the method it calls) suggesting
it could be. If so this would not seem safe.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java:
##########
@@ -67,14 +59,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
protected final StorageManager storageManager;
- // This is the same executor used at the PageStoreImpl. One Executor per
pageStore
- private final ArtemisExecutor executor;
-
- private final SoftValueLongObjectHashMap<BulkPageCache> softCache;
-
- private LongObjectHashMap<Integer> numberOfMessages = null;
-
- private final LongObjectHashMap<CompletableFuture<BulkPageCache>>
inProgressReadPages;
+ //private LongObjectHashMap<Integer> numberOfMessages = null;
Review Comment:
Unused code
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/QueryPagedReferenceImpl.java:
##########
@@ -1,37 +1,39 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
Review Comment:
This should go away with the other bit.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/QueryPagedReferenceImpl.java:
##########
@@ -1,37 +1,39 @@
-/*
+/**
Review Comment:
It should be left as a comment, not javadoc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]