This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d026562393ded93b9fc70c614b26525785789a56
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Mar 7 14:56:32 2025 -0500

    ARTEMIS-5342 Expose AckManager pending records on management
---
 .../apache/activemq/artemis/logs/AuditLogger.java  |  9 +++++
 .../api/core/management/ActiveMQServerControl.java |  3 ++
 .../protocol/amqp/connect/mirror/AckManager.java   | 19 +++++------
 .../management/impl/ActiveMQServerControlImpl.java |  8 +++++
 .../artemis/core/server/ActiveMQServer.java        |  5 +++
 .../core/server/impl/ActiveMQServerImpl.java       | 15 ++++++++-
 .../artemis/core/server/mirror/MirrorRegistry.java | 38 ++++++++++++++++++++++
 .../management/ActiveMQServerControlTest.java      | 12 +++++++
 .../ActiveMQServerControlUsingCoreTest.java        |  6 ++++
 9 files changed, 104 insertions(+), 11 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 4b0c1773d0..dbb34f7349 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2866,4 +2866,13 @@ public interface AuditLogger {
 
    @LogMessage(id = 601798, value = "User {} is exporting configuration as 
properties on target resource: {}", level = LogMessage.Level.INFO)
    void exportConfigAsProperties(String user, Object source);
+
+   static void getPendingMirrorAcks(Object source) {
+      BASE_LOGGER.getPendingMirrorAcks(getCaller(), source);
+   }
+
+   @LogMessage(id = 601799, value = "User {} is getting PendingMirrorAcks on 
target resource: {}", level = LogMessage.Level.INFO)
+   void getPendingMirrorAcks(String user, Object source);
+
+
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 307ec0af2d..c744e7aea4 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -2091,6 +2091,9 @@ public interface ActiveMQServerControl {
    @Attribute(desc = AUTHORIZATION_FAILURE_COUNT)
    long getAuthorizationFailureCount();
 
+   @Attribute(desc = "Number of pending acknowledgements records on mirroring")
+   int getPendingMirrorAcks();
+
    @Operation(desc = "Export the broker configuration as properties", impact = 
MBeanOperationInfo.ACTION)
    void exportConfigAsProperties() throws Exception;
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 5e3b4b744f..0ae57bcea3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.LongSupplier;
 
 import io.netty.util.collection.LongObjectHashMap;
@@ -56,6 +55,7 @@ import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
@@ -79,11 +79,10 @@ public class AckManager implements ActiveMQComponent {
    volatile MultiStepProgress progress;
    ActiveMQScheduledComponent scheduledComponent;
 
-   private volatile int size;
-   private static final AtomicIntegerFieldUpdater<AckManager> sizeUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size");
+   final MirrorRegistry mirrorRegistry;
 
    public int size() {
-      return sizeUpdater.get(this);
+      return mirrorRegistry.getMirrorAckSize();
    }
 
    public AckManager(ActiveMQServer server) {
@@ -92,7 +91,7 @@ public class AckManager implements ActiveMQComponent {
       this.configuration = server.getConfiguration();
       this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
       this.sequenceGenerator = server.getStorageManager()::generateID;
-
+      this.mirrorRegistry = server.getMirrorRegistry();
       // The JournalHashMap has to use the storage manager to guarantee we are 
using the Replicated Journal Wrapper in case this is a replicated journal
       journalHashMapProvider = new JournalHashMapProvider<>(sequenceGenerator, 
server.getStorageManager(), AckRetry.getPersister(), 
JournalRecordIds.ACK_RETRY, OperationContextImpl::getContext, 
server.getPostOffice()::findQueue, server.getIoCriticalErrorListener());
       this.referenceIDSupplier = new ReferenceIDSupplier(server);
@@ -100,7 +99,7 @@ public class AckManager implements ActiveMQComponent {
 
    public void reload(RecordInfo recordInfo) {
       journalHashMapProvider.reload(recordInfo);
-      sizeUpdater.incrementAndGet(this);
+      mirrorRegistry.incrementMirrorAckSize();
    }
 
    @Override
@@ -323,7 +322,7 @@ public class AckManager implements ActiveMQComponent {
                   logger.debug("Retried {} {} times, giving up on the entry 
now. Configured Page Attempts={}", retry, retry.getPageAttempts(), 
configuration.getMirrorAckManagerPageAttempts());
                }
                if (retries.remove(retry) != null) {
-                  sizeUpdater.decrementAndGet(AckManager.this);
+                  mirrorRegistry.decrementMirrorAckSize();
                }
             } else {
                if (logger.isDebugEnabled()) {
@@ -379,7 +378,7 @@ public class AckManager implements ActiveMQComponent {
                         }
                      }
                      if (retries.remove(ackRetry, transaction.getID()) != 
null) {
-                        sizeUpdater.decrementAndGet(AckManager.this);
+                        mirrorRegistry.decrementMirrorAckSize();
                      }
                      transaction.setContainsPersistent();
                      logger.trace("retry performed ok, ackRetry={} for 
message={} on queue", ackRetry, pagedMessage);
@@ -416,7 +415,7 @@ public class AckManager implements ActiveMQComponent {
             if (ack(retry.getNodeID(), queue, retry.getMessageID(), 
retry.getReason(), false)) {
                logger.trace("Removing retry {} as the retry went ok", retry);
                queueRetries.remove(retry);
-               sizeUpdater.decrementAndGet(this);
+               mirrorRegistry.decrementMirrorAckSize();
             } else {
                int retried = retry.attemptedQueue();
                if (logger.isTraceEnabled()) {
@@ -438,7 +437,7 @@ public class AckManager implements ActiveMQComponent {
       }
       AckRetry retry = new AckRetry(nodeID, messageID, reason);
       journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
-      sizeUpdater.incrementAndGet(this);
+      mirrorRegistry.incrementMirrorAckSize();
       if (scheduledComponent != null) {
          // we set the retry delay again in case it was changed.
          
scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay());
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 56a21d9825..1a42a3a0a6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -4747,6 +4747,14 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
       return server.getSecurityStore().getAuthorizationFailureCount();
    }
 
+   @Override
+   public int getPendingMirrorAcks() {
+      if (AuditLogger.isBaseLoggingEnabled()) {
+         AuditLogger.getPendingMirrorAcks(this.server);
+      }
+      return server.getPendingMirrorAcks();
+   }
+
    @Override
    public void exportConfigAsProperties() throws Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index dc6dd68332..d625df0e1c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -59,6 +59,7 @@ import 
org.apache.activemq.artemis.core.server.impl.ConnectorsService;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
 import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@@ -1025,4 +1026,8 @@ public interface ActiveMQServer extends ServiceComponent {
    }
 
    void registerRecordsLoader(Consumer<RecordInfo> recordsLoader);
+
+   MirrorRegistry getMirrorRegistry();
+
+   int getPendingMirrorAcks();
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index aeb1a88981..082adebc7a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -170,6 +170,7 @@ import 
org.apache.activemq.artemis.core.server.management.ManagementService;
 import 
org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
 import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
 import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@@ -259,6 +260,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private HAPolicy haPolicy;
 
+   private MirrorRegistry mirrorRegistry = new MirrorRegistry();
+
    // This will be useful on tests or embedded
    private boolean rebuildCounters = true;
 
@@ -4821,4 +4824,14 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
    public IOCriticalErrorListener getIoCriticalErrorListener() {
       return ioCriticalErrorListener;
    }
-}
+
+   @Override
+   public MirrorRegistry getMirrorRegistry() {
+      return mirrorRegistry;
+   }
+
+   @Override
+   public int getPendingMirrorAcks() {
+      return mirrorRegistry.getMirrorAckSize();
+   }
+}
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java
new file mode 100644
index 0000000000..fb8106a775
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/mirror/MirrorRegistry.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.mirror;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+public class MirrorRegistry {
+
+   private volatile int mirrorAckSize;
+   private static final AtomicIntegerFieldUpdater<MirrorRegistry> sizeUpdater 
= AtomicIntegerFieldUpdater.newUpdater(MirrorRegistry.class, "mirrorAckSize");
+
+   public int getMirrorAckSize() {
+      return sizeUpdater.get(this);
+   }
+
+   public void incrementMirrorAckSize() {
+      sizeUpdater.incrementAndGet(this);
+   }
+
+   public void decrementMirrorAckSize() {
+      sizeUpdater.decrementAndGet(this);
+   }
+}
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 1581f30008..138687b803 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -250,6 +250,18 @@ public class ActiveMQServerControlTest extends 
ManagementTestBase {
       assertTrue(serverControl.isActive());
    }
 
+   @TestTemplate
+   public void testPendingMirrorAcks() throws Exception {
+      ActiveMQServerControl serverControl = createManagementControl();
+      // faking some data, to make sure we are not just returning a default 
value
+      for (int i = 0; i < 7; i++) {
+         server.getMirrorRegistry().incrementMirrorAckSize();
+      }
+
+      assertEquals(7, serverControl.getPendingMirrorAcks());
+      assertEquals(7, server.getPendingMirrorAcks());
+   }
+
    @TestTemplate
    public void testBrokerPluginClassNames() throws Exception {
       ActiveMQServerControl serverControl = createManagementControl();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 4464cac8f9..aef8f11ccf 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -1857,6 +1857,12 @@ public class ActiveMQServerControlUsingCoreTest extends 
ActiveMQServerControlTes
             return (long) 
proxy.retrieveAttributeValue("authorizationFailureCount");
          }
 
+
+         @Override
+         public int getPendingMirrorAcks() {
+            return ((Number) 
proxy.retrieveAttributeValue("pendingMirrorAcks")).intValue();
+         }
+
          @Override
          public void exportConfigAsProperties() throws Exception {
             proxy.invokeOperation("exportConfigAsProperties");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to