andytaylor commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r995650047
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -2769,27 +2775,554 @@ public void testListAllConsumersAsJSON() throws
Exception {
JsonObject first = sorted[0];
JsonObject second = sorted[1];
- Assert.assertTrue(first.getJsonNumber("creationTime").longValue() > 0);
- Assert.assertNotNull(first.getJsonNumber("consumerID").longValue());
- Assert.assertTrue(first.getString("connectionID").length() > 0);
- Assert.assertEquals(factory.getConnection().getID().toString(),
first.getString("connectionID"));
- Assert.assertTrue(first.getString("sessionID").length() > 0);
- Assert.assertEquals(((ClientSessionImpl) session).getName(),
first.getString("sessionID"));
- Assert.assertTrue(first.getString("queueName").length() > 0);
- Assert.assertEquals(queueName.toString(), first.getString("queueName"));
- Assert.assertEquals(false, first.getBoolean("browseOnly"));
- Assert.assertEquals(0,
first.getJsonNumber("deliveringCount").longValue());
+
Assert.assertTrue(first.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue()
> 0);
+
Assert.assertNotNull(first.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+
Assert.assertTrue(first.getString(ConsumerField.CONNECTION.getAlternativeName()).length()
> 0);
+ Assert.assertEquals(factory.getConnection().getID().toString(),
first.getString(ConsumerField.CONNECTION.getAlternativeName()));
+
Assert.assertTrue(first.getString(ConsumerField.SESSION.getAlternativeName()).length()
> 0);
+ Assert.assertEquals(((ClientSessionImpl) session).getName(),
first.getString(ConsumerField.SESSION.getAlternativeName()));
+
Assert.assertTrue(first.getString(ConsumerField.QUEUE.getAlternativeName()).length()
> 0);
+ Assert.assertEquals(queueName.toString(),
first.getString(ConsumerField.QUEUE.getAlternativeName()));
+ Assert.assertEquals(false,
first.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
+
+
Assert.assertTrue(second.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue()
> 0);
+
Assert.assertNotNull(second.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+
Assert.assertTrue(second.getString(ConsumerField.CONNECTION.getAlternativeName()).length()
> 0);
+ Assert.assertEquals(factory2.getConnection().getID().toString(),
second.getString(ConsumerField.CONNECTION.getAlternativeName()));
+
Assert.assertTrue(second.getString(ConsumerField.SESSION.getAlternativeName()).length()
> 0);
+ Assert.assertEquals(((ClientSessionImpl) session2).getName(),
second.getString(ConsumerField.SESSION.getAlternativeName()));
+
Assert.assertTrue(second.getString(ConsumerField.QUEUE.getAlternativeName()).length()
> 0);
+ Assert.assertEquals(queueName.toString(),
second.getString(ConsumerField.QUEUE.getAlternativeName()));
+ Assert.assertEquals(false,
second.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+ Assert.assertEquals(0,
second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(0,
second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(0,
second.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(0,
second.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+ Assert.assertEquals(0,
second.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
- Assert.assertTrue(second.getJsonNumber("creationTime").longValue() > 0);
- Assert.assertNotNull(second.getJsonNumber("consumerID").longValue());
- Assert.assertTrue(second.getString("connectionID").length() > 0);
- Assert.assertEquals(factory2.getConnection().getID().toString(),
second.getString("connectionID"));
- Assert.assertTrue(second.getString("sessionID").length() > 0);
- Assert.assertEquals(((ClientSessionImpl) session2).getName(),
second.getString("sessionID"));
- Assert.assertTrue(second.getString("queueName").length() > 0);
- Assert.assertEquals(queueName.toString(), second.getString("queueName"));
- Assert.assertEquals(false, second.getBoolean("browseOnly"));
- Assert.assertEquals(0,
second.getJsonNumber("deliveringCount").longValue());
+ }
+
+
+ @Test
+ public void testListAllConsumersAsJSONTXCommit() throws Exception {
+ SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = createSessionFactory(locator);
+ ClientSession session = factory.createSession(true,false, 1);
+ addClientSession(session);
+
+ serverControl.createAddress(queueName.toString(),
RoutingType.ANYCAST.name());
+ if (legacyCreateQueue) {
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null,
false, false);
+ } else {
+ server.createQueue(new
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+ }
+
+ ClientConsumer consumer = session.createConsumer(queueName, null, 100,
-1, false);
+ addClientConsumer(consumer);
+ session.start();
+ Thread.sleep(200);
+
+ ClientProducer producer = session.createProducer(queueName);
+ int size = 0;
+ ClientMessage receive = null;
+ for (int i = 0; i < 100; i++) {
+ ClientMessage message = session.createMessage(true);
+
+ producer.send(message);
+ size += message.getEncodeSize();
+ receive = consumer.receive();
+ }
+
+ String jsonString = serverControl.listAllConsumersAsJSON();
+ log.debug(jsonString);
+ Assert.assertNotNull(jsonString);
+ JsonArray array = JsonUtil.readJsonArray(jsonString);
+ JsonObject first = (JsonObject) array.get(0);
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+ receive.acknowledge();
+ session.commit();
+
+ jsonString = serverControl.listAllConsumersAsJSON();
+ log.debug(jsonString);
+ Assert.assertNotNull(jsonString);
+ array = JsonUtil.readJsonArray(jsonString);
+ first = (JsonObject) array.get(0);
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+
+ int allSize = size;
+ for (int i = 0; i < 100; i++) {
+ ClientMessage message = session.createMessage(true);
+
+ producer.send(message);
+ allSize += message.getEncodeSize();
+ receive = consumer.receive();
+ }
+
+ jsonString = serverControl.listAllConsumersAsJSON();
+ log.debug(jsonString);
+ Assert.assertNotNull(jsonString);
+ array = JsonUtil.readJsonArray(jsonString);
+ first = (JsonObject) array.get(0);
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(200,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+ Assert.assertEquals(allSize,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+ }
+
+ @Test
+ public void testListAllConsumersAsJSONTXCommitAck() throws Exception {
+ SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = createSessionFactory(locator);
+ ClientSession session = factory.createSession(true,false, 1);
+ addClientSession(session);
+
+ serverControl.createAddress(queueName.toString(),
RoutingType.ANYCAST.name());
+ if (legacyCreateQueue) {
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null,
false, false);
+ } else {
+ server.createQueue(new
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+ }
+
+ ClientConsumer consumer = session.createConsumer(queueName, null, 100,
-1, false);
+ addClientConsumer(consumer);
+ session.start();
+ Thread.sleep(200);
+
+ ClientProducer producer = session.createProducer(queueName);
+ int size = 0;
+ ClientMessage receive = null;
+ for (int i = 0; i < 100; i++) {
+ ClientMessage message = session.createMessage(true);
+
+ producer.send(message);
+ size += message.getEncodeSize();
+ receive = consumer.receive();
+ receive.acknowledge();
+ }
+
+ String jsonString = serverControl.listAllConsumersAsJSON();
+ log.debug(jsonString);
+ Assert.assertNotNull(jsonString);
+ JsonArray array = JsonUtil.readJsonArray(jsonString);
+ JsonObject first = (JsonObject) array.get(0);
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+ session.commit();
+
+ jsonString = serverControl.listAllConsumersAsJSON();
+ log.debug(jsonString);
+ Assert.assertNotNull(jsonString);
+ array = JsonUtil.readJsonArray(jsonString);
+ first = (JsonObject) array.get(0);
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+ }
+
+ @Test
+ public void testListAllConsumersAsJSONTXRollback() throws Exception {
+ SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory factory = createSessionFactory(locator);
+ ClientSession session = factory.createSession(true,false, 1);
+ addClientSession(session);
+
+ serverControl.createAddress(queueName.toString(),
RoutingType.ANYCAST.name());
+ if (legacyCreateQueue) {
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null,
false, false);
+ } else {
+ server.createQueue(new
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+ }
+
+ ClientConsumer consumer = session.createConsumer(queueName, null, 100,
-1, false);
+ addClientConsumer(consumer);
+ session.start();
+ Thread.sleep(200);
+
+ ClientProducer producer = session.createProducer(queueName);
+ int size = 0;
+ ClientMessage receive = null;
+ for (int i = 0; i < 100; i++) {
+ ClientMessage message = session.createMessage(true);
+
+ producer.send(message);
+ size += message.getEncodeSize();
+ receive = consumer.receive();
+ }
+
+ String jsonString = serverControl.listAllConsumersAsJSON();
+ log.debug(jsonString);
+ Assert.assertNotNull(jsonString);
+ JsonArray array = JsonUtil.readJsonArray(jsonString);
+ JsonObject first = (JsonObject) array.get(0);
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+ receive.acknowledge(); //stop the session so we dont receive the same
messages
+ session.stop();
+ jsonString = serverControl.listAllConsumersAsJSON();
+ log.debug(jsonString);
+ Assert.assertNotNull(jsonString);
+ array = JsonUtil.readJsonArray(jsonString);
+ first = (JsonObject) array.get(0);
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+ Assert.assertEquals(0,
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+ Assert.assertEquals(size,
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+ Assert.assertEquals(100,
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+
+ session.rollback();
+ // Thread.sleep(1000);
Review Comment:
fixed
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -4023,6 +4565,114 @@ public void testListProducers() throws Exception {
}
}
+ @Test
+ public void testListProducersMessageCounts() throws Exception {
+ SimpleString queueName1 = new SimpleString("my_queue_one");
+ SimpleString addressName1 = new SimpleString("my_address_one");
+
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ server.addAddressInfo(new AddressInfo(addressName1,
RoutingType.ANYCAST));
+ if (legacyCreateQueue) {
+ server.createQueue(addressName1, RoutingType.ANYCAST, queueName1,
null, false, false);
+ } else {
+ server.createQueue(new
QueueConfiguration(queueName1).setAddress(addressName1).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+ }
+
+ int numMessages = 10;
+
+
+ // create some consumers
+ try (ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);) {
+
+ ClientSession session1 = csf.createSession();
+
+ ClientProducer producer1 = session1.createProducer(addressName1);
+ int messagesSize = 0;
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session1.createMessage(true);
+ producer1.send(message);
+ messagesSize += message.getEncodeSize();
+ }
+ //bring back all producers
+ String filterString = createJsonFilter("", "", "");
+ String producersAsJsonString =
serverControl.listProducers(filterString, 1, 50);
+ JsonObject producersAsJsonObject =
JsonUtil.readJsonObject(producersAsJsonString);
+ JsonArray array = (JsonArray) producersAsJsonObject.get("data");
+
+ Assert.assertEquals("number of producers returned from query", 1,
array.size());
+
+ JsonObject jsonSession = array.getJsonObject(0);
+
+ //check all fields
+ Assert.assertNotEquals(ProducerField.ID.getName(), "",
jsonSession.getString(ProducerField.ID.getName()));
+ Assert.assertNotEquals(ProducerField.SESSION.getName(), "",
jsonSession.getString(ProducerField.SESSION.getName()));
+ Assert.assertEquals(ProducerField.CLIENT_ID.getName(), "",
jsonSession.getString(ProducerField.CLIENT_ID.getName()));
+ Assert.assertEquals(ProducerField.USER.getName(), "",
jsonSession.getString(ProducerField.USER.getName()));
+ Assert.assertNotEquals(ProducerField.PROTOCOL.getAlternativeName(),
"", jsonSession.getString(ProducerField.PROTOCOL.getName()));
+ Assert.assertEquals(ProducerField.ADDRESS.getName(),
addressName1.toString(),
jsonSession.getString(ProducerField.ADDRESS.getName()));
+ Assert.assertNotEquals(ProducerField.LOCAL_ADDRESS.getName(), "",
jsonSession.getString(ProducerField.LOCAL_ADDRESS.getName()));
+ Assert.assertNotEquals(ProducerField.REMOTE_ADDRESS.getName(), "",
jsonSession.getString(ProducerField.REMOTE_ADDRESS.getName()));
+ Assert.assertNotEquals(ProducerField.CREATION_TIME.getName(), "",
jsonSession.getString(ProducerField.CREATION_TIME.getName()));
+ Assert.assertEquals(ProducerField.MESSAGE_SENT.getName(),
numMessages, jsonSession.getInt(ProducerField.MESSAGE_SENT.getName()));
+ Assert.assertEquals(ProducerField.MESSAGE_SENT_SIZE.getName(),
messagesSize, jsonSession.getInt(ProducerField.MESSAGE_SENT_SIZE.getName()));
+ Assert.assertEquals(ProducerField.LAST_UUID_SENT.getName(), "",
jsonSession.getString(ProducerField.LAST_UUID_SENT.getName()));
+ }
+ }
+
+ @Test
+ public void testListProducersMessageCounts2() throws Exception {
+ SimpleString queueName1 = new SimpleString("my_queue_one");
+ SimpleString addressName1 = new SimpleString("my_address_one");
+
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ server.addAddressInfo(new AddressInfo(addressName1,
RoutingType.ANYCAST));
+ if (legacyCreateQueue) {
+ server.createQueue(addressName1, RoutingType.ANYCAST, queueName1,
null, false, false);
+ } else {
+ server.createQueue(new
QueueConfiguration(queueName1).setAddress(addressName1).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+ }
+
+ int numMessages = 10;
+
+
+ // create some consumers
+ try (ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);) {
+
+ ClientSession session1 = csf.createSession();
+
+ ClientProducer producer1 = session1.createProducer(addressName1);
+ int messagesSize = 0;
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session1.createMessage(true);
+ producer1.send(message);
+ messagesSize += message.getEncodeSize();
+ }
+ //bring back all producers
+ String producersAsJsonString =
serverControl.listProducersInfoAsJSON();
+ JsonArray jsonArray = JsonUtil.readJsonArray(producersAsJsonString);
+
+ JsonObject jsonSession = jsonArray.getJsonObject(0);
+
+ System.out.println("jsonSession = " + jsonSession);
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]