This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 11f76bc133 ARTEMIS-4476 Validating process to remove orphaned consumers
11f76bc133 is described below
commit 11f76bc1339f41ce08e61c2e91a7e51beabb1321
Author: Clebert Suconic <[email protected]>
AuthorDate: Sat Nov 11 21:52:38 2023 -0500
ARTEMIS-4476 Validating process to remove orphaned consumers
---
.../src/main/webapp/plugin/js/components/consumers.js | 9 ++++++---
.../management/impl/ActiveMQServerControlImpl.java | 4 +++-
.../core/management/impl/QueueControlImpl.java | 9 ++++-----
.../core/management/impl/view/ConsumerField.java | 3 ++-
.../core/management/impl/view/ConsumerView.java | 19 ++++++++++++++++++-
.../consumer/DetectOrphanedConsumerTest.java | 15 +++++++++++----
6 files changed, 44 insertions(+), 15 deletions(-)
diff --git
a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
index 3da804108e..6c7ed8bc1d 100644
---
a/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
+++
b/artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/consumers.js
@@ -106,7 +106,8 @@ var Artemis;
{name: "Messages Acknowledged", visible: false},
{name: "Messages Acknowledged awaiting Commit", visible: false},
{name: "Last Delivered Time", visible: false},
- {name: "Last Acknowledged Time", visible: false}
+ {name: "Last Acknowledged Time", visible: false},
+ {name: "Status", visible: false}
]
};
@@ -144,7 +145,8 @@ var Artemis;
{id: 'messagesDelivered', name: 'Messages Delivered'},
{id: 'messagesDeliveredSize', name: 'Messages Delivered Size'},
{id: 'messagesAcknowledged', name: 'Messages Acknowledged'},
- {id: 'messagesAcknowledgedAwaitingCommit', name: 'Messages
Acknowledged awaiting Commit'}
+ {id: 'messagesAcknowledgedAwaitingCommit', name: 'Messages
Acknowledged awaiting Commit'},
+ {id: 'status', name: 'status'}
],
operationOptions: [
{id: 'EQUALS', name: 'Equals'},
@@ -203,7 +205,8 @@ var Artemis;
{ header: 'Messages Acknowledged', itemField:
'messagesAcknowledged' },
{ header: 'Messages Acknowledged awaiting Commit', itemField:
'messagesAcknowledgedAwaitingCommit' },
{ header: 'Last Delivered Time', itemField: 'lastDeliveredTime',
templateFn: function(value) { return formatTimestamp(value);} },
- { header: 'Last Acknowledged Time', itemField:
'lastAcknowledgedTime' , templateFn: function(value) { return
formatTimestamp(value);} }
+ { header: 'Last Acknowledged Time', itemField:
'lastAcknowledgedTime' , templateFn: function(value) { return
formatTimestamp(value);} },
+ { header: 'Status', itemField: 'status' }
];
ctrl.refresh = function () {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 0cd2e229f2..8f1565a3ec 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -2786,6 +2786,7 @@ public class ActiveMQServerControlImpl extends
AbstractControl implements Active
List<MessageReference> deliveringMessages =
consumer.getDeliveringMessages();
JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
.add(ConsumerField.ID.getAlternativeName(), consumer.getID())
+ .add(ConsumerField.SEQUENTIAL_ID.getAlternativeName(),
consumer.getSequentialID())
.add(ConsumerField.CONNECTION.getAlternativeName(),
consumer.getConnectionID().toString())
.add(ConsumerField.SESSION.getAlternativeName(),
consumer.getSessionID())
.add(ConsumerField.QUEUE.getAlternativeName(),
consumer.getQueue().getName().toString())
@@ -2800,7 +2801,8 @@ public class ActiveMQServerControlImpl extends
AbstractControl implements Active
.add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(),
consumer.getMessagesAcknowledged())
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(),
consumer.getMessagesAcknowledgedAwaitingCommit())
.add(ConsumerField.LAST_DELIVERED_TIME.getName(),
consumer.getLastDeliveredTime())
- .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(),
consumer.getLastAcknowledgedTime());
+ .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(),
consumer.getLastAcknowledgedTime())
+ .add(ConsumerField.STATUS.getName(),
ConsumerView.checkConsumerStatus(consumer, server));
if (consumer.getFilter() != null) {
obj.add("filter", consumer.getFilter().getFilterString().toString());
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index e7be97e596..f6eac24afb 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -37,6 +37,7 @@ import
org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
+import org.apache.activemq.artemis.core.management.impl.view.ConsumerView;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -1829,6 +1830,7 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
ServerConsumer serverConsumer = (ServerConsumer) consumer;
JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
.add(ConsumerField.ID.getAlternativeName(),
serverConsumer.getID())
+ .add(ConsumerField.SEQUENTIAL_ID.getAlternativeName(),
serverConsumer.getSequentialID())
.add(ConsumerField.CONNECTION.getAlternativeName(),
serverConsumer.getConnectionID().toString())
.add(ConsumerField.SESSION.getAlternativeName(),
serverConsumer.getSessionID())
.add(ConsumerField.BROWSE_ONLY.getName(),
serverConsumer.isBrowseOnly())
@@ -1840,11 +1842,8 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
.add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(),
serverConsumer.getMessagesAcknowledged())
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(),
serverConsumer.getMessagesAcknowledgedAwaitingCommit())
.add(ConsumerField.LAST_DELIVERED_TIME.getName(),
serverConsumer.getLastDeliveredTime())
- .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(),
serverConsumer.getLastAcknowledgedTime());
-
- if (server.getRemotingService().getConnection(((ServerConsumer)
consumer).getConnectionID()) == null) {
- obj.add(ConsumerField.ORPHANED.getName(), true);
- }
+ .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(),
serverConsumer.getLastAcknowledgedTime())
+ .add(ConsumerField.STATUS.getName(),
ConsumerView.checkConsumerStatus(serverConsumer, server));
jsonArray.add(obj);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
index 5c7c90b921..f7c689e501 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerField.java
@@ -21,6 +21,7 @@ import java.util.TreeMap;
public enum ConsumerField {
ID("id", "consumerID"),
+ SEQUENTIAL_ID("sequentialId", "sequentialId"),
SESSION("session", "sessionID"),
CONNECTION("connection", "connectionID"),
QUEUE("queue", "queueName"),
@@ -43,7 +44,7 @@ public enum ConsumerField {
MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT("messagesAcknowledgedAwaitingCommit"),
LAST_DELIVERED_TIME("lastDeliveredTime"),
LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime"),
- ORPHANED("orphaned");
+ STATUS("status");
private static final Map<String, ConsumerField> lookup = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
index 97eaf93ee2..48c6293330 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
@@ -29,6 +29,10 @@ import org.apache.activemq.artemis.utils.JsonLoader;
public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
+ public static final String CONSUMER_STATUS_OK = "OK";
+ public static final String CONSUMER_STATUS_ORPHANED = "Orphaned";
+
+
private static final String defaultSortColumn = ConsumerField.ID.getName();
private final ActiveMQServer server;
@@ -80,7 +84,9 @@ public class ConsumerView extends
ActiveMQAbstractView<ServerConsumer> {
.add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(),
toString(consumer.getMessagesAcknowledged()))
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(),
toString(consumer.getMessagesAcknowledgedAwaitingCommit()))
.add(ConsumerField.LAST_DELIVERED_TIME.getName(),
consumer.getLastDeliveredTime())
- .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(),
consumer.getLastAcknowledgedTime());
+ .add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(),
consumer.getLastAcknowledgedTime())
+ .add(ConsumerField.STATUS.getName(),
ConsumerView.checkConsumerStatus(consumer, server));
+
return obj;
}
@@ -136,11 +142,22 @@ public class ConsumerView extends
ActiveMQAbstractView<ServerConsumer> {
return consumer.getLastDeliveredTime();
case LAST_ACKNOWLEDGED_TIME:
return consumer.getLastAcknowledgedTime();
+ case STATUS:
+ return checkConsumerStatus(consumer, server);
default:
throw new IllegalArgumentException("Unsupported field, " +
fieldName);
}
}
+ public static String checkConsumerStatus(ServerConsumer consumer,
ActiveMQServer server) {
+ if
(server.getRemotingService().getConnection((consumer).getConnectionID()) ==
null) {
+ return CONSUMER_STATUS_ORPHANED;
+ } else {
+ return CONSUMER_STATUS_OK;
+ }
+
+ }
+
@Override
public String getDefaultOrderColumn() {
return defaultSortColumn;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
index 707d87c2e4..3c0de6eb19 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/consumer/DetectOrphanedConsumerTest.java
@@ -27,8 +27,11 @@ import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
+import org.apache.activemq.artemis.core.management.impl.view.ConsumerView;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
@@ -94,7 +97,7 @@ public class DetectOrphanedConsumerTest extends
ActiveMQTestBase {
JsonArray resultArray = JsonUtil.readJsonArray(result);
Assert.assertEquals(1, resultArray.size());
-
Assert.assertFalse(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName()));
+ Assert.assertEquals(ConsumerView.CONSUMER_STATUS_OK,
resultArray.getJsonObject(0).getString(ConsumerField.STATUS.getName()));
queue.getConsumers().forEach(c -> {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) c;
@@ -104,12 +107,16 @@ public class DetectOrphanedConsumerTest extends
ActiveMQTestBase {
});
result = queueControl.listConsumersAsJSON();
-
logger.debug("json: {}", result);
resultArray = JsonUtil.readJsonArray(result);
Assert.assertEquals(1, resultArray.size());
-
Assert.assertTrue(resultArray.getJsonObject(0).containsKey(ConsumerField.ORPHANED.getName()));
-
Assert.assertTrue(resultArray.getJsonObject(0).getBoolean(ConsumerField.ORPHANED.getName()));
+ Assert.assertEquals(ConsumerView.CONSUMER_STATUS_ORPHANED,
resultArray.getJsonObject(0).getString(ConsumerField.STATUS.getName()));
+
+ ActiveMQServerControl serverControl = (ActiveMQServerControl)
server.getManagementService().getResource(ResourceNames.BROKER);
+ String sessionID =
resultArray.getJsonObject(0).getString(ConsumerField.SESSION.getAlternativeName());
+ int consumerID =
resultArray.getJsonObject(0).getInt(ConsumerField.SEQUENTIAL_ID.getAlternativeName());
+ logger.debug("SessionID{} ConsumerID::{}", sessionID, consumerID);
+ Assert.assertTrue(serverControl.closeConsumerWithID(sessionID,
String.valueOf(consumerID)));
}
}