gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r981040625


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java:
##########
@@ -441,6 +446,84 @@ public void testGetConsumerJSON() throws Exception {
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testGetConsumerWithMessagesJSON() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(new 
QueueConfiguration(queue).setAddress(address).setDurable(durable));
+
+      QueueControl queueControl = createManagementControl(address, queue);
+
+      ClientProducer producer = session.createProducer(address);
+
+      for (int i = 0; i < 10; i++) {
+         producer.send(session.createMessage(true));
+      }
+
+      Wait.assertEquals(0, () -> queueControl.getConsumerCount());
+
+      ClientConsumer consumer = session.createConsumer(queue);
+      Wait.assertEquals(1, () -> queueControl.getConsumerCount());
+
+      session.start();
+
+      ClientMessage clientMessage = null;
+
+      int size = 0;
+      for (int i = 0; i < 5; i++) {
+         clientMessage = consumer.receiveImmediate();
+         size += clientMessage.getEncodeSize();
+      }
+
+      JsonArray obj = 
JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
+
+      assertEquals(1, obj.size());
+
+      Wait.assertEquals(5, () -> 
JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
+
+      obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
+
+      JsonObject jsonObject = obj.get(0).asJsonObject();
+
+      assertEquals(5, 
jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
+
+      assertEquals(size, 
jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
+
+      assertEquals(5, 
jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
+
+      assertEquals(size, 
jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()));
+
+      assertEquals(0, 
jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
+
+      //we cant assume an elapseed time to only checking for its existence
+      
assertNotNull(jsonObject.getInt(ConsumerField.LAST_DELIVERED_TIME.getName()));
+
+      assertEquals( 0, 
jsonObject.getInt(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()));

Review Comment:
   If these are going to be a timestamp they can be asserted to be a value 
within a fairly specific delta during the test itself, and also their values 
relative to each other. E.g ensure they are in flight, then a further 1ms delay 
before acking would mean the 2 values also have to differ by at least that 
much, but be after the point the test started, and you can then also put a 
narrow delta for the ack time value by using the time you called it + allowance.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -2342,8 +2331,8 @@ public Pair<SimpleString, EnumSet<RoutingType>> 
getAddressAndRoutingTypes(Simple
    @Override
    public void addProducer(ServerProducer serverProducer) {
       serverProducer.setSessionID(getName());
-      serverProducer.setConnectionID(getConnectionID().toString());
-      producers.put(serverProducer.getID(), serverProducer);
+      serverProducer.setConnectionID(getConnectionID() != null ? 
getConnectionID().toString() : null);
+      producers.put(serverProducer.getAddress(), serverProducer);

Review Comment:
   This makes the addition use getAddress() however the removeProducer() method 
below it is still using the 'ID' value which will likely mismatch the 
add+remove operations and so could lead to retention.
   
   Also, by using getAddress() here, it might mean the producer addition here 
may not align with the producer lookup later in doSend for an anonymous 
producer (and perhaps also producer to FQQN?), meaning a new address-specific 
'synthetic producer' value could get created in doSend() per-address and 
leading to multiple ServerProducerImpl entries for a single actual producer, 
throwing off the producer count and also making the metric values wrong for the 
initial actual producer (since the metrics would be assigned to the 'synthetic 
duplicates' per-address). The change in ActiveMQPacketHandler.java would 
presumably have been to stop [part of] this problem manifesting for Core.
   
   If that issue didnt exist, two producers coming in with the same address 
would potentially cause the metrics to that point to be discarded.
   
   



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -1958,14 +1931,29 @@ public StorageManager getStorageManager() {
 
    @Override
    public void describeProducersInfo(JsonArrayBuilder array) throws Exception {
-      Map<SimpleString, Pair<Object, AtomicLong>> targetCopy = 
cloneTargetAddresses();
-
-      for (Map.Entry<SimpleString, Pair<Object, AtomicLong>> entry : 
targetCopy.entrySet()) {
+      Map<String, ServerProducer> targetCopy = cloneProducers();
+      String sessionClientID = getRemotingConnection().getClientID();
+      String localAddress = 
getRemotingConnection().getTransportConnection().getLocalAddress();
+      String remoteAddress = 
getRemotingConnection().getTransportConnection().getRemoteAddress();
+      for (Map.Entry<String, ServerProducer> entry : targetCopy.entrySet()) {
          String uuid = null;
-         if (entry.getValue().getA() != null) {
-            uuid = entry.getValue().getA().toString();
+         if (entry.getValue().getUserID() != null) {
+            uuid = entry.getValue().getUserID().toString();
          }
-         JsonObjectBuilder producerInfo = 
JsonLoader.createObjectBuilder().add("connectionID", 
this.getConnectionID().toString()).add("sessionID", 
this.getName()).add("destination", 
entry.getKey().toString()).add("lastUUIDSent", uuid, 
JsonValue.NULL).add("msgSent", entry.getValue().getB().longValue());
+         JsonObjectBuilder producerInfo = JsonLoader.createObjectBuilder()
+               .add(ProducerField.ID.getName(), getName())
+               .add(ProducerField.CONNECTION_ID.getName(), 
this.getConnectionID().toString())
+               .add(ProducerField.SESSION.getAlternativeName(), this.getName())
+               .add(ProducerField.CLIENT_ID.getName(), sessionClientID != null 
? sessionClientID : "")
+               .add(ProducerField.USER.getName(), getUsername() != null ? 
getUsername() : "")
+               .add(ProducerField.PROTOCOL.getName(), 
remotingConnection.getProtocolName())
+               .add(ProducerField.LOCAL_ADDRESS.getName(), localAddress != 
null ? localAddress : "")
+               .add(ProducerField.REMOTE_ADDRESS.getName(), remoteAddress != 
null ? remoteAddress : "")

Review Comment:
   These seem superfluous given they are session or connection details, which 
should respectively have all these already.
   
   It seems consumer bits also do this sometimes, but it only seems to be the 
'ConsumerView' adding these details for certain management stuff, the apparent 
equivalent of this producer method for the consumers in 
ActiveMQServerControlImpl.listConsumersAsJSON / toJSONObject(ServerConsumer) 
doesnt add these connection/session details the rest of the time. I think 
matching that behaviour by only having 'ProducerView' add them in similar cases 
would make sense.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -2769,27 +2775,554 @@ public void testListAllConsumersAsJSON() throws 
Exception {
       JsonObject first = sorted[0];
       JsonObject second = sorted[1];
 
-      Assert.assertTrue(first.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(first.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(first.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory.getConnection().getID().toString(), 
first.getString("connectionID"));
-      Assert.assertTrue(first.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session).getName(), 
first.getString("sessionID"));
-      Assert.assertTrue(first.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), first.getString("queueName"));
-      Assert.assertEquals(false, first.getBoolean("browseOnly"));
-      Assert.assertEquals(0, 
first.getJsonNumber("deliveringCount").longValue());
+      
Assert.assertTrue(first.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue()
 > 0);
+      
Assert.assertNotNull(first.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      
Assert.assertTrue(first.getString(ConsumerField.CONNECTION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(factory.getConnection().getID().toString(), 
first.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      
Assert.assertTrue(first.getString(ConsumerField.SESSION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(((ClientSessionImpl) session).getName(), 
first.getString(ConsumerField.SESSION.getAlternativeName()));
+      
Assert.assertTrue(first.getString(ConsumerField.QUEUE.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(queueName.toString(), 
first.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, 
first.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
+
+      
Assert.assertTrue(second.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue()
 > 0);
+      
Assert.assertNotNull(second.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      
Assert.assertTrue(second.getString(ConsumerField.CONNECTION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(factory2.getConnection().getID().toString(), 
second.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      
Assert.assertTrue(second.getString(ConsumerField.SESSION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(((ClientSessionImpl) session2).getName(), 
second.getString(ConsumerField.SESSION.getAlternativeName()));
+      
Assert.assertTrue(second.getString(ConsumerField.QUEUE.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(queueName.toString(), 
second.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, 
second.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
 
-      Assert.assertTrue(second.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(second.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(second.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory2.getConnection().getID().toString(), 
second.getString("connectionID"));
-      Assert.assertTrue(second.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session2).getName(), 
second.getString("sessionID"));
-      Assert.assertTrue(second.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), second.getString("queueName"));
-      Assert.assertEquals(false, second.getBoolean("browseOnly"));
-      Assert.assertEquals(0, 
second.getJsonNumber("deliveringCount").longValue());
+   }
+
+
+   @Test
+   public void testListAllConsumersAsJSONTXCommit() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), 
RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
false, false);
+      } else {
+         server.createQueue(new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, 
-1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);

Review Comment:
   Necessary? Repeat comment for other tests below



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -1185,7 +1182,7 @@ public void deleteQueue(final SimpleString queueToDelete) 
throws Exception {
       }
 
       if (server.getAddressInfo(unPrefixedQueueName) == null) {
-         targetAddressInfos.remove(queueToDelete);
+         producers.remove(queueToDelete.toString());

Review Comment:
   The addition to the map uses the message or producer address rather than the 
queue name this removal does, what happens when these differ? (Thinking mainly 
topic sub queues, but maybe even FQQN usage).
   
   Also the actual queue being removed was the one from "unPrefixedQueueName" 
variable, while this isnt and may be different..is that a factor?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -4023,6 +4565,114 @@ public void testListProducers() throws Exception {
       }
    }
 
+   @Test
+   public void testListProducersMessageCounts() throws Exception {
+      SimpleString queueName1 = new SimpleString("my_queue_one");
+      SimpleString addressName1 = new SimpleString("my_address_one");
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      server.addAddressInfo(new AddressInfo(addressName1, 
RoutingType.ANYCAST));
+      if (legacyCreateQueue) {
+         server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, 
null, false, false);
+      } else {
+         server.createQueue(new 
QueueConfiguration(queueName1).setAddress(addressName1).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      int numMessages = 10;
+
+
+      // create some consumers
+      try (ServerLocator locator = createInVMNonHALocator(); 
ClientSessionFactory csf = createSessionFactory(locator);) {
+
+         ClientSession session1 = csf.createSession();
+
+         ClientProducer producer1 = session1.createProducer(addressName1);
+         int messagesSize = 0;
+         for (int i = 0; i < numMessages; i++) {
+            ClientMessage message = session1.createMessage(true);
+            producer1.send(message);
+            messagesSize += message.getEncodeSize();
+         }
+         //bring back all producers
+         String filterString = createJsonFilter("", "", "");
+         String producersAsJsonString = 
serverControl.listProducers(filterString, 1, 50);
+         JsonObject producersAsJsonObject = 
JsonUtil.readJsonObject(producersAsJsonString);
+         JsonArray array = (JsonArray) producersAsJsonObject.get("data");
+
+         Assert.assertEquals("number of producers returned from query", 1, 
array.size());
+
+         JsonObject jsonSession = array.getJsonObject(0);
+
+         //check all fields
+         Assert.assertNotEquals(ProducerField.ID.getName(), "", 
jsonSession.getString(ProducerField.ID.getName()));
+         Assert.assertNotEquals(ProducerField.SESSION.getName(), "", 
jsonSession.getString(ProducerField.SESSION.getName()));
+         Assert.assertEquals(ProducerField.CLIENT_ID.getName(), "", 
jsonSession.getString(ProducerField.CLIENT_ID.getName()));
+         Assert.assertEquals(ProducerField.USER.getName(), "", 
jsonSession.getString(ProducerField.USER.getName()));
+         Assert.assertNotEquals(ProducerField.PROTOCOL.getAlternativeName(), 
"", jsonSession.getString(ProducerField.PROTOCOL.getName()));
+         Assert.assertEquals(ProducerField.ADDRESS.getName(), 
addressName1.toString(), 
jsonSession.getString(ProducerField.ADDRESS.getName()));
+         Assert.assertNotEquals(ProducerField.LOCAL_ADDRESS.getName(), "", 
jsonSession.getString(ProducerField.LOCAL_ADDRESS.getName()));
+         Assert.assertNotEquals(ProducerField.REMOTE_ADDRESS.getName(), "", 
jsonSession.getString(ProducerField.REMOTE_ADDRESS.getName()));
+         Assert.assertNotEquals(ProducerField.CREATION_TIME.getName(), "", 
jsonSession.getString(ProducerField.CREATION_TIME.getName()));
+         Assert.assertEquals(ProducerField.MESSAGE_SENT.getName(), 
numMessages, jsonSession.getInt(ProducerField.MESSAGE_SENT.getName()));
+         Assert.assertEquals(ProducerField.MESSAGE_SENT_SIZE.getName(), 
messagesSize, jsonSession.getInt(ProducerField.MESSAGE_SENT_SIZE.getName()));
+         Assert.assertEquals(ProducerField.LAST_UUID_SENT.getName(), "", 
jsonSession.getString(ProducerField.LAST_UUID_SENT.getName()));
+      }
+   }
+
+   @Test
+   public void testListProducersMessageCounts2() throws Exception {
+      SimpleString queueName1 = new SimpleString("my_queue_one");
+      SimpleString addressName1 = new SimpleString("my_address_one");
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      server.addAddressInfo(new AddressInfo(addressName1, 
RoutingType.ANYCAST));
+      if (legacyCreateQueue) {
+         server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, 
null, false, false);
+      } else {
+         server.createQueue(new 
QueueConfiguration(queueName1).setAddress(addressName1).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      int numMessages = 10;
+
+
+      // create some consumers
+      try (ServerLocator locator = createInVMNonHALocator(); 
ClientSessionFactory csf = createSessionFactory(locator);) {
+
+         ClientSession session1 = csf.createSession();
+
+         ClientProducer producer1 = session1.createProducer(addressName1);
+         int messagesSize = 0;
+         for (int i = 0; i < numMessages; i++) {
+            ClientMessage message = session1.createMessage(true);
+            producer1.send(message);
+            messagesSize += message.getEncodeSize();
+         }
+         //bring back all producers
+         String producersAsJsonString = 
serverControl.listProducersInfoAsJSON();
+         JsonArray jsonArray = JsonUtil.readJsonArray(producersAsJsonString);
+
+         JsonObject jsonSession = jsonArray.getJsonObject(0);
+
+         System.out.println("jsonSession = " + jsonSession);

Review Comment:
   logger.debug("jsonSession = {}",  jsonSession); ?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -704,9 +708,9 @@ public List<MessageReference> cancelRefs(final boolean 
failed,
          final List<MessageReference> refs = new 
ArrayList<>(deliveringRefs.size());
          MessageReference ref;
          while ((ref = deliveringRefs.poll()) != null) {
+            metrics.addAcknowledge(ref.getMessage().getEncodeSize());

Review Comment:
   This one doesnt look to be synchronized on the consumer the way the others 
are, worth looking at as it could mean thread safety issues corrupting the 
metrics updates since it does stuff like -= and ++ on the values.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -4023,6 +4565,114 @@ public void testListProducers() throws Exception {
       }
    }
 
+   @Test
+   public void testListProducersMessageCounts() throws Exception {

Review Comment:
   Doing some tests with AMQP which has actual producer elements would be good, 
vs only using Core which doesnt really seem to. Actually, using the Core JMS 
client as well as or instead of just the Core client would probably make sense 
too.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java:
##########
@@ -2769,27 +2775,554 @@ public void testListAllConsumersAsJSON() throws 
Exception {
       JsonObject first = sorted[0];
       JsonObject second = sorted[1];
 
-      Assert.assertTrue(first.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(first.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(first.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory.getConnection().getID().toString(), 
first.getString("connectionID"));
-      Assert.assertTrue(first.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session).getName(), 
first.getString("sessionID"));
-      Assert.assertTrue(first.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), first.getString("queueName"));
-      Assert.assertEquals(false, first.getBoolean("browseOnly"));
-      Assert.assertEquals(0, 
first.getJsonNumber("deliveringCount").longValue());
+      
Assert.assertTrue(first.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue()
 > 0);
+      
Assert.assertNotNull(first.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      
Assert.assertTrue(first.getString(ConsumerField.CONNECTION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(factory.getConnection().getID().toString(), 
first.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      
Assert.assertTrue(first.getString(ConsumerField.SESSION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(((ClientSessionImpl) session).getName(), 
first.getString(ConsumerField.SESSION.getAlternativeName()));
+      
Assert.assertTrue(first.getString(ConsumerField.QUEUE.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(queueName.toString(), 
first.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, 
first.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
+
+      
Assert.assertTrue(second.getJsonNumber(ConsumerField.CREATION_TIME.getName()).longValue()
 > 0);
+      
Assert.assertNotNull(second.getJsonNumber(ConsumerField.ID.getAlternativeName()).longValue());
+      
Assert.assertTrue(second.getString(ConsumerField.CONNECTION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(factory2.getConnection().getID().toString(), 
second.getString(ConsumerField.CONNECTION.getAlternativeName()));
+      
Assert.assertTrue(second.getString(ConsumerField.SESSION.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(((ClientSessionImpl) session2).getName(), 
second.getString(ConsumerField.SESSION.getAlternativeName()));
+      
Assert.assertTrue(second.getString(ConsumerField.QUEUE.getAlternativeName()).length()
 > 0);
+      Assert.assertEquals(queueName.toString(), 
second.getString(ConsumerField.QUEUE.getAlternativeName()));
+      Assert.assertEquals(false, 
second.getBoolean(ConsumerField.BROWSE_ONLY.getName()));
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue());
+      Assert.assertEquals(0, 
second.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue());
 
-      Assert.assertTrue(second.getJsonNumber("creationTime").longValue() > 0);
-      Assert.assertNotNull(second.getJsonNumber("consumerID").longValue());
-      Assert.assertTrue(second.getString("connectionID").length() > 0);
-      Assert.assertEquals(factory2.getConnection().getID().toString(), 
second.getString("connectionID"));
-      Assert.assertTrue(second.getString("sessionID").length() > 0);
-      Assert.assertEquals(((ClientSessionImpl) session2).getName(), 
second.getString("sessionID"));
-      Assert.assertTrue(second.getString("queueName").length() > 0);
-      Assert.assertEquals(queueName.toString(), second.getString("queueName"));
-      Assert.assertEquals(false, second.getBoolean("browseOnly"));
-      Assert.assertEquals(0, 
second.getJsonNumber("deliveringCount").longValue());
+   }
+
+
+   @Test
+   public void testListAllConsumersAsJSONTXCommit() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), 
RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
false, false);
+      } else {
+         server.createQueue(new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, 
-1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);
+
+      ClientProducer producer = session.createProducer(queueName);
+      int size = 0;
+      ClientMessage receive = null;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         size += message.getEncodeSize();
+         receive = consumer.receive();
+      }
+
+      String jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      JsonArray array = JsonUtil.readJsonArray(jsonString);
+      JsonObject first = (JsonObject) array.get(0);
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+      receive.acknowledge();
+      session.commit();
+
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+
+      int allSize = size;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         allSize += message.getEncodeSize();
+         receive = consumer.receive();
+      }
+
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(200, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(allSize, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+   }
+
+   @Test
+   public void testListAllConsumersAsJSONTXCommitAck() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), 
RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
false, false);
+      } else {
+         server.createQueue(new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, 
-1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);
+
+      ClientProducer producer = session.createProducer(queueName);
+      int size = 0;
+      ClientMessage receive = null;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         size += message.getEncodeSize();
+         receive = consumer.receive();
+         receive.acknowledge();
+      }
+
+      String jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      JsonArray array = JsonUtil.readJsonArray(jsonString);
+      JsonObject first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+      session.commit();
+
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+   }
+
+   @Test
+   public void testListAllConsumersAsJSONTXRollback() throws Exception {
+      SimpleString queueName = new SimpleString(UUID.randomUUID().toString());
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory factory = createSessionFactory(locator);
+      ClientSession session = factory.createSession(true,false, 1);
+      addClientSession(session);
+
+      serverControl.createAddress(queueName.toString(), 
RoutingType.ANYCAST.name());
+      if (legacyCreateQueue) {
+         server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, 
false, false);
+      } else {
+         server.createQueue(new 
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setDurable(false));
+      }
+
+      ClientConsumer consumer = session.createConsumer(queueName, null, 100, 
-1, false);
+      addClientConsumer(consumer);
+      session.start();
+      Thread.sleep(200);
+
+      ClientProducer producer = session.createProducer(queueName);
+      int size = 0;
+      ClientMessage receive = null;
+      for (int i = 0; i < 100; i++) {
+         ClientMessage message = session.createMessage(true);
+
+         producer.send(message);
+         size += message.getEncodeSize();
+         receive = consumer.receive();
+      }
+
+      String jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      JsonArray array = JsonUtil.readJsonArray(jsonString);
+      JsonObject first = (JsonObject) array.get(0);
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+      receive.acknowledge();   //stop the session so we dont receive the same 
messages
+      session.stop();
+      jsonString = serverControl.listAllConsumersAsJSON();
+      log.debug(jsonString);
+      Assert.assertNotNull(jsonString);
+      array = JsonUtil.readJsonArray(jsonString);
+      first = (JsonObject) array.get(0);
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT.getName()).longValue());
+      Assert.assertEquals(0, 
first.getJsonNumber(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED.getName()).longValue());
+      Assert.assertEquals(size, 
first.getJsonNumber(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()).longValue());
+      Assert.assertEquals(100, 
first.getJsonNumber(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()).longValue());
+
+      session.rollback();
+     // Thread.sleep(1000);

Review Comment:
   Delete? Same in other instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to