This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit d026562393ded93b9fc70c614b26525785789a56 Author: Clebert Suconic <[email protected]> AuthorDate: Fri Mar 7 14:56:32 2025 -0500 ARTEMIS-5342 Expose AckManager pending records on management --- .../apache/activemq/artemis/logs/AuditLogger.java | 9 +++++ .../api/core/management/ActiveMQServerControl.java | 3 ++ .../protocol/amqp/connect/mirror/AckManager.java | 19 +++++------ .../management/impl/ActiveMQServerControlImpl.java | 8 +++++ .../artemis/core/server/ActiveMQServer.java | 5 +++ .../core/server/impl/ActiveMQServerImpl.java | 15 ++++++++- .../artemis/core/server/mirror/MirrorRegistry.java | 38 ++++++++++++++++++++++ .../management/ActiveMQServerControlTest.java | 12 +++++++ .../ActiveMQServerControlUsingCoreTest.java | 6 ++++ 9 files changed, 104 insertions(+), 11 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index 4b0c1773d0..dbb34f7349 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2866,4 +2866,13 @@ public interface AuditLogger { @LogMessage(id = 601798, value = "User {} is exporting configuration as properties on target resource: {}", level = LogMessage.Level.INFO) void exportConfigAsProperties(String user, Object source); + + static void getPendingMirrorAcks(Object source) { + BASE_LOGGER.getPendingMirrorAcks(getCaller(), source); + } + + @LogMessage(id = 601799, value = "User {} is getting PendingMirrorAcks on target resource: {}", level = LogMessage.Level.INFO) + void getPendingMirrorAcks(String user, Object source); + + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 307ec0af2d..c744e7aea4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -2091,6 +2091,9 @@ public interface ActiveMQServerControl { @Attribute(desc = AUTHORIZATION_FAILURE_COUNT) long getAuthorizationFailureCount(); + @Attribute(desc = "Number of pending acknowledgements records on mirroring") + int getPendingMirrorAcks(); + @Operation(desc = "Export the broker configuration as properties", impact = MBeanOperationInfo.ACTION) void exportConfigAsProperties() throws Exception; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 5e3b4b744f..0ae57bcea3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -28,7 +28,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.LongSupplier; import io.netty.util.collection.LongObjectHashMap; @@ -56,6 +55,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; @@ -79,11 +79,10 @@ public class AckManager implements ActiveMQComponent { volatile MultiStepProgress progress; ActiveMQScheduledComponent scheduledComponent; - private volatile int size; - private static final AtomicIntegerFieldUpdater<AckManager> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size"); + final MirrorRegistry mirrorRegistry; public int size() { - return sizeUpdater.get(this); + return mirrorRegistry.getMirrorAckSize(); } public AckManager(ActiveMQServer server) { @@ -92,7 +91,7 @@ public class AckManager implements ActiveMQComponent { this.configuration = server.getConfiguration(); this.ioCriticalErrorListener = server.getIoCriticalErrorListener(); this.sequenceGenerator = server.getStorageManager()::generateID; - + this.mirrorRegistry = server.getMirrorRegistry(); // The JournalHashMap has to use the storage manager to guarantee we are using the Replicated Journal Wrapper in case this is a replicated journal journalHashMapProvider = new JournalHashMapProvider<>(sequenceGenerator, server.getStorageManager(), AckRetry.getPersister(), JournalRecordIds.ACK_RETRY, OperationContextImpl::getContext, server.getPostOffice()::findQueue, server.getIoCriticalErrorListener()); this.referenceIDSupplier = new ReferenceIDSupplier(server); @@ -100,7 +99,7 @@ public class AckManager implements ActiveMQComponent { public void reload(RecordInfo recordInfo) { journalHashMapProvider.reload(recordInfo); - sizeUpdater.incrementAndGet(this); + mirrorRegistry.incrementMirrorAckSize(); } @Override @@ -323,7 +322,7 @@ public class AckManager implements ActiveMQComponent { logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts()); } if (retries.remove(retry) != null) { - sizeUpdater.decrementAndGet(AckManager.this); + mirrorRegistry.decrementMirrorAckSize(); } } else { if (logger.isDebugEnabled()) { @@ -379,7 +378,7 @@ public class AckManager implements ActiveMQComponent { } } if (retries.remove(ackRetry, transaction.getID()) != null) { - sizeUpdater.decrementAndGet(AckManager.this); + mirrorRegistry.decrementMirrorAckSize(); } transaction.setContainsPersistent(); logger.trace("retry performed ok, ackRetry={} for message={} on queue", ackRetry, pagedMessage); @@ -416,7 +415,7 @@ public class AckManager implements ActiveMQComponent { if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) { logger.trace("Removing retry {} as the retry went ok", retry); queueRetries.remove(retry); - sizeUpdater.decrementAndGet(this); + mirrorRegistry.decrementMirrorAckSize(); } else { int retried = retry.attemptedQueue(); if (logger.isTraceEnabled()) { @@ -438,7 +437,7 @@ public class AckManager implements ActiveMQComponent { } AckRetry retry = new AckRetry(nodeID, messageID, reason); journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry); - sizeUpdater.incrementAndGet(this); + mirrorRegistry.incrementMirrorAckSize(); if (scheduledComponent != null) { // we set the retry delay again in case it was changed. scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 56a21d9825..1a42a3a0a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -4747,6 +4747,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active return server.getSecurityStore().getAuthorizationFailureCount(); } + @Override + public int getPendingMirrorAcks() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getPendingMirrorAcks(this.server); + } + return server.getPendingMirrorAcks(); + } + @Override public void exportConfigAsProperties() throws Exception { if (AuditLogger.isBaseLoggingEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index dc6dd68332..d625df0e1c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.impl.ConnectorsService; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -1025,4 +1026,8 @@ public interface ActiveMQServer extends ServiceComponent { } void registerRecordsLoader(Consumer<RecordInfo> recordsLoader); + + MirrorRegistry getMirrorRegistry(); + + int getPendingMirrorAcks(); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index aeb1a88981..082adebc7a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -170,6 +170,7 @@ import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl; import org.apache.activemq.artemis.core.server.metrics.MetricsManager; import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry; import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin; import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin; @@ -259,6 +260,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { private HAPolicy haPolicy; + private MirrorRegistry mirrorRegistry = new MirrorRegistry(); + // This will be useful on tests or embedded private boolean rebuildCounters = true; @@ -4821,4 +4824,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { public IOCriticalErrorListener getIoCriticalErrorListener() { return ioCriticalErrorListener; } -} + + @Override + public MirrorRegistry getMirrorRegistry() { + return mirrorRegistry; + } + + @Override + public int getPendingMirrorAcks() { + return mirrorRegistry.getMirrorAckSize(); + } +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java new file mode 100644 index 0000000000..fb8106a775 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.mirror; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +public class MirrorRegistry { + + private volatile int mirrorAckSize; + private static final AtomicIntegerFieldUpdater<MirrorRegistry> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(MirrorRegistry.class, "mirrorAckSize"); + + public int getMirrorAckSize() { + return sizeUpdater.get(this); + } + + public void incrementMirrorAckSize() { + sizeUpdater.incrementAndGet(this); + } + + public void decrementMirrorAckSize() { + sizeUpdater.decrementAndGet(this); + } +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java index 1581f30008..138687b803 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java @@ -250,6 +250,18 @@ public class ActiveMQServerControlTest extends ManagementTestBase { assertTrue(serverControl.isActive()); } + @TestTemplate + public void testPendingMirrorAcks() throws Exception { + ActiveMQServerControl serverControl = createManagementControl(); + // faking some data, to make sure we are not just returning a default value + for (int i = 0; i < 7; i++) { + server.getMirrorRegistry().incrementMirrorAckSize(); + } + + assertEquals(7, serverControl.getPendingMirrorAcks()); + assertEquals(7, server.getPendingMirrorAcks()); + } + @TestTemplate public void testBrokerPluginClassNames() throws Exception { ActiveMQServerControl serverControl = createManagementControl(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 4464cac8f9..aef8f11ccf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -1857,6 +1857,12 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes return (long) proxy.retrieveAttributeValue("authorizationFailureCount"); } + + @Override + public int getPendingMirrorAcks() { + return ((Number) proxy.retrieveAttributeValue("pendingMirrorAcks")).intValue(); + } + @Override public void exportConfigAsProperties() throws Exception { proxy.invokeOperation("exportConfigAsProperties"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] For further information, visit: https://activemq.apache.org/contact
