This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 11f76bc133 ARTEMIS-4476 Validating process to remove orphaned consumers
11f76bc133 is described below

commit 11f76bc1339f41ce08e61c2e91a7e51beabb1321
Author: Clebert Suconic <[email protected]>
AuthorDate: Sat Nov 11 21:52:38 2023 -0500

    ARTEMIS-4476 Validating process to remove orphaned consumers
---
 .../src/main/webapp/plugin/js/components/consumers.js |  9 ++++++---
 .../management/impl/ActiveMQServerControlImpl.java    |  4 +++-
 .../core/management/impl/QueueControlImpl.java        |  9 ++++-----
 .../core/management/impl/view/ConsumerField.java      |  3 ++-
 .../core/management/impl/view/ConsumerView.java       | 19 ++++++++++++++++++-
 .../consumer/DetectOrphanedConsumerTest.java          | 15 +++++++++++----
 6 files changed, 44 insertions(+), 15 deletions(-)

diff --git 
a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
 
b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
index 3da804108e..6c7ed8bc1d 100644
--- 
a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
+++ 
b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
@@ -106,7 +106,8 @@ var Artemis;
               {name: "Messages Acknowledged", visible: false},
               {name: "Messages Acknowledged awaiting Commit", visible: false},
               {name: "Last Delivered Time", visible: false},
-              {name: "Last Acknowledged Time", visible: false}
+              {name: "Last Acknowledged Time", visible: false},
+              {name: "Status", visible: false}
          ]
         };
 
@@ -144,7 +145,8 @@ var Artemis;
                 {id: 'messagesDelivered', name: 'Messages Delivered'},
                 {id: 'messagesDeliveredSize', name: 'Messages Delivered Size'},
                 {id: 'messagesAcknowledged', name: 'Messages Acknowledged'},
-                {id: 'messagesAcknowledgedAwaitingCommit', name: 'Messages 
Acknowledged awaiting Commit'}
+                {id: 'messagesAcknowledgedAwaitingCommit', name: 'Messages 
Acknowledged awaiting Commit'},
+                {id: 'status', name: 'status'}
             ],
             operationOptions: [
                 {id: 'EQUALS', name: 'Equals'},
@@ -203,7 +205,8 @@ var Artemis;
             { header: 'Messages Acknowledged', itemField: 
'messagesAcknowledged' },
             { header: 'Messages Acknowledged awaiting Commit', itemField: 
'messagesAcknowledgedAwaitingCommit' },
             { header: 'Last Delivered Time', itemField: 'lastDeliveredTime', 
templateFn: function(value) { return formatTimestamp(value);} },
-            { header: 'Last Acknowledged Time', itemField: 
'lastAcknowledgedTime' ,  templateFn: function(value) { return 
formatTimestamp(value);} }
+            { header: 'Last Acknowledged Time', itemField: 
'lastAcknowledgedTime' ,  templateFn: function(value) { return 
formatTimestamp(value);} },
+            { header: 'Status', itemField: 'status' }
         ];
 
         ctrl.refresh = function () {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 0cd2e229f2..8f1565a3ec 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2786,6 +2786,7 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
       List<MessageReference> deliveringMessages = 
consumer.getDeliveringMessages();
       JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
             .add(ConsumerField.ID.getAlternativeName(), consumer.getID())
+            .add(ConsumerField.SEQUENTIAL_ID.getAlternativeName(), 
consumer.getSequentialID())
             .add(ConsumerField.CONNECTION.getAlternativeName(), 
consumer.getConnectionID().toString())
             .add(ConsumerField.SESSION.getAlternativeName(), 
consumer.getSessionID())
             .add(ConsumerField.QUEUE.getAlternativeName(), 
consumer.getQueue().getName().toString())
@@ -2800,7 +2801,8 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
             .add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), 
consumer.getMessagesAcknowledged())
             
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), 
consumer.getMessagesAcknowledgedAwaitingCommit())
             .add(ConsumerField.LAST_DELIVERED_TIME.getName(), 
consumer.getLastDeliveredTime())
-            .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), 
consumer.getLastAcknowledgedTime());
+            .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), 
consumer.getLastAcknowledgedTime())
+            .add(ConsumerField.STATUS.getName(), 
ConsumerView.checkConsumerStatus(consumer, server));
       if (consumer.getFilter() != null) {
          obj.add("filter", consumer.getFilter().getFilterString().toString());
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index e7be97e596..f6eac24afb 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -37,6 +37,7 @@ import 
org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
+import org.apache.activemq.artemis.core.management.impl.view.ConsumerView;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -1829,6 +1830,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                ServerConsumer serverConsumer = (ServerConsumer) consumer;
                JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
                        .add(ConsumerField.ID.getAlternativeName(), 
serverConsumer.getID())
+                       .add(ConsumerField.SEQUENTIAL_ID.getAlternativeName(), 
serverConsumer.getSequentialID())
                        .add(ConsumerField.CONNECTION.getAlternativeName(), 
serverConsumer.getConnectionID().toString())
                        .add(ConsumerField.SESSION.getAlternativeName(), 
serverConsumer.getSessionID())
                        .add(ConsumerField.BROWSE_ONLY.getName(), 
serverConsumer.isBrowseOnly())
@@ -1840,11 +1842,8 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
                        .add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), 
serverConsumer.getMessagesAcknowledged())
                        
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), 
serverConsumer.getMessagesAcknowledgedAwaitingCommit())
                        .add(ConsumerField.LAST_DELIVERED_TIME.getName(), 
serverConsumer.getLastDeliveredTime())
-                       .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), 
serverConsumer.getLastAcknowledgedTime());
-
-               if (server.getRemotingService().getConnection(((ServerConsumer) 
consumer).getConnectionID()) == null) {
-                  obj.add(ConsumerField.ORPHANED.getName(), true);
-               }
+                       .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), 
serverConsumer.getLastAcknowledgedTime())
+                       .add(ConsumerField.STATUS.getName(), 
ConsumerView.checkConsumerStatus(serverConsumer, server));
 
                jsonArray.add(obj);
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
index 5c7c90b921..f7c689e501 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
@@ -21,6 +21,7 @@ import java.util.TreeMap;
 
 public enum ConsumerField {
    ID("id", "consumerID"),
+   SEQUENTIAL_ID("sequentialId", "sequentialId"),
    SESSION("session", "sessionID"),
    CONNECTION("connection", "connectionID"),
    QUEUE("queue", "queueName"),
@@ -43,7 +44,7 @@ public enum ConsumerField {
    MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT("messagesAcknowledgedAwaitingCommit"),
    LAST_DELIVERED_TIME("lastDeliveredTime"),
    LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime"),
-   ORPHANED("orphaned");
+   STATUS("status");
 
 
    private static final Map<String, ConsumerField> lookup = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
index 97eaf93ee2..48c6293330 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
@@ -29,6 +29,10 @@ import org.apache.activemq.artemis.utils.JsonLoader;
 
 public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
 
+   public static final String CONSUMER_STATUS_OK = "OK";
+   public static final String CONSUMER_STATUS_ORPHANED = "Orphaned";
+
+
    private static final String defaultSortColumn = ConsumerField.ID.getName();
 
    private final ActiveMQServer server;
@@ -80,7 +84,9 @@ public class ConsumerView extends 
ActiveMQAbstractView<ServerConsumer> {
          .add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), 
toString(consumer.getMessagesAcknowledged()))
          .add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), 
toString(consumer.getMessagesAcknowledgedAwaitingCommit()))
          .add(ConsumerField.LAST_DELIVERED_TIME.getName(), 
consumer.getLastDeliveredTime())
-         .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), 
consumer.getLastAcknowledgedTime());
+         .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), 
consumer.getLastAcknowledgedTime())
+         .add(ConsumerField.STATUS.getName(), 
ConsumerView.checkConsumerStatus(consumer, server));
+
       return obj;
    }
 
@@ -136,11 +142,22 @@ public class ConsumerView extends 
ActiveMQAbstractView<ServerConsumer> {
             return consumer.getLastDeliveredTime();
          case LAST_ACKNOWLEDGED_TIME:
             return consumer.getLastAcknowledgedTime();
+         case STATUS:
+            return checkConsumerStatus(consumer, server);
          default:
             throw new IllegalArgumentException("Unsupported field, " + 
fieldName);
       }
    }
 
+   public static String checkConsumerStatus(ServerConsumer consumer, 
ActiveMQServer server) {
+      if 
(server.getRemotingService().getConnection((consumer).getConnectionID()) == 
null) {
+         return CONSUMER_STATUS_ORPHANED;
+      } else {
+         return CONSUMER_STATUS_OK;
+      }
+
+   }
+
    @Override
    public String getDefaultOrderColumn() {
       return defaultSortColumn;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
index 707d87c2e4..3c0de6eb19 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
@@ -27,8 +27,11 @@ import java.lang.invoke.MethodHandles;
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
+import org.apache.activemq.artemis.core.management.impl.view.ConsumerView;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
@@ -94,7 +97,7 @@ public class DetectOrphanedConsumerTest extends 
ActiveMQTestBase {
 
       JsonArray resultArray = JsonUtil.readJsonArray(result);
       Assert.assertEquals(1, resultArray.size());
-      
Assert.assertFalse(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName()));
+      Assert.assertEquals(ConsumerView.CONSUMER_STATUS_OK, 
resultArray.getJsonObject(0).getString(ConsumerField.STATUS.getName()));
 
       queue.getConsumers().forEach(c -> {
          ServerConsumerImpl serverConsumer = (ServerConsumerImpl) c;
@@ -104,12 +107,16 @@ public class DetectOrphanedConsumerTest extends 
ActiveMQTestBase {
       });
 
       result = queueControl.listConsumersAsJSON();
-
       logger.debug("json: {}", result);
 
       resultArray = JsonUtil.readJsonArray(result);
       Assert.assertEquals(1, resultArray.size());
-      
Assert.assertTrue(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName()));
-      
Assert.assertTrue(resultArray.getJsonObject(0).getBoolean(ConsumerField.ORPHANED.getName()));
+      Assert.assertEquals(ConsumerView.CONSUMER_STATUS_ORPHANED, 
resultArray.getJsonObject(0).getString(ConsumerField.STATUS.getName()));
+
+      ActiveMQServerControl serverControl = (ActiveMQServerControl) 
server.getManagementService().getResource(ResourceNames.BROKER);
+      String sessionID = 
resultArray.getJsonObject(0).getString(ConsumerField.SESSION.getAlternativeName());
+      int consumerID = 
resultArray.getJsonObject(0).getInt(ConsumerField.SEQUENTIAL_ID.getAlternativeName());
+      logger.debug("SessionID{} ConsumerID::{}", sessionID, consumerID);
+      Assert.assertTrue(serverControl.closeConsumerWithID(sessionID, 
String.valueOf(consumerID)));
    }
 }

Reply via email to