gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r1018966363


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerProducer.java:
##########
@@ -32,4 +32,16 @@ public interface ServerProducer {
    String getID();
 
    long getCreationTime();
+
+   void setUserID(Object userID);

Review Comment:
   Once again I have seen this and thought it actually meant User ID, rather 
than actually being what everyone else calls Message ID.
   
   Is the aim to show the 'last produced MessageID on this address' (since the 
ServerProducer object is not actually really producer-specific or tied to a 
producer entity, even where they exist)? If so could we call it something more 
specific LastProducedMessageID ?
   
   (EDIT: I now see 'getLastSentMessageID' was actually just removed from 
ServerSession in the changes..thats also nicer)



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java:
##########
@@ -18,13 +18,19 @@
 
 import org.apache.activemq.artemis.core.server.ServerProducer;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 public class ServerProducerImpl implements ServerProducer {
    private final String ID;
    private final String protocol;
    private final long creationTime;
 
+   private AtomicLong messagesSent = new AtomicLong(0);
+
+   private AtomicLong messagesSentSize = new AtomicLong(0);

Review Comment:
   Although I dont actually like FieldUpdater's much, since these objects are 
effectively created per-address-per-session rather than just actually 
per-producer, they are probably even better candidates for use of a 
FieldUpdater than the consumer metric was, as there may be a lot more of these 
than actual producers,whereas on the consumer side it is 'just' 1:1 with the 
number of consumers.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,130 @@ public String getConnectionRemoteAddress() {
       return 
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
    }
 
+   @Override
+   public int getMessagesInTransitSize() {
+      return metrics.getMessagesInTransitSize();
+   }
+
+
+   @Override
+   public int getMessagesInTransit() {
+      return deliveringRefs.size();
+   }
+
+   @Override
+   public long getLastDeliveredTime() {
+      return metrics.getLastDeliveredTime();
+   }
+
+   @Override
+   public long getLastAcknowledgedTime() {
+      return metrics.getLastAcknowledgedTime();
+   }
+
+   @Override
+   public long getMessagesAcknowledged() {
+      return metrics.getMessagesAcknowledged();
+   }
+
+   @Override
+   public int getMessagesDeliveredSize() {
+      return metrics.getMessagesDeliveredSize();
+   }
+
+   @Override
+   public int getMessagesDelivered() {
+      return metrics.getMessagesDelivered();
+   }
+
+   @Override
+   public int getMessagesAcknowledgedAwaitingCommit() {
+      return metrics.getMessagesAcknowledgedAwaitingCommit();
+   }
+
    public SessionCallback getCallback() {
       return callback;
    }
+
+   static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+      /**
+       * Since messages can be delivered (incremented) and acknowledged 
(decremented) at the same time we have to protect
+       * the encode size and make it atomic. The other fields are ok since 
they are only accessed from a single thread.
+       */
+      AtomicIntegerFieldUpdater<ServerConsumerMetrics> 
messagesInTransitSizeUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, 
"messagesInTransitSize");
+      private volatile int messagesInTransitSize = 0;
+
+      private int messagesDeliveredSize = 0;
+
+      private long lastDeliveredTime = 0;
+
+      private long lastAcknowledgedTime = 0;
+
+      private int messagesDelivered = 0;
+
+      private int messagesAcknowledged = 0;
+
+      private int messagesAcknowledgedAwaitingCommit = 0;
+
+      public int getMessagesInTransitSize() {
+         return messagesInTransitSize;
+      }
+
+      public int getMessagesDeliveredSize() {
+         return messagesDeliveredSize;
+      }
+
+      public long getLastDeliveredTime() {
+         return lastDeliveredTime;
+      }
+
+      public long getLastAcknowledgedTime() {
+         return lastAcknowledgedTime;
+      }
+
+      public int getMessagesDelivered() {
+         return messagesDelivered;
+      }
+
+      public int getMessagesAcknowledged() {
+         return messagesAcknowledged;
+      }
+
+      public int getMessagesAcknowledgedAwaitingCommit() {
+         return messagesAcknowledgedAwaitingCommit;
+      }
+
+      public void addMessage(int encodeSize) {
+         messagesInTransitSizeUpdater.addAndGet(this, encodeSize);
+         messagesDeliveredSize += encodeSize;
+         messagesDelivered++;
+         lastDeliveredTime = System.currentTimeMillis();
+      }
+
+      public void addAcknowledge(int encodeSize) {
+         messagesInTransitSizeUpdater.addAndGet(this, -encodeSize);
+         messagesAcknowledged++;
+         messagesAcknowledgedAwaitingCommit++;
+         lastAcknowledgedTime = System.currentTimeMillis();
+      }

Review Comment:
   the calling of this whole method didnt seem thread-safe, and if so all the 
values updated in it arent safe, especially the messagesAcknowledged++;  and 
messagesAcknowledgedAwaitingCommit++; operations doing non-atomic arithmetic. 
The lastAcknowledgedTime update not being safe probably doenst matter given its 
only an indicator and will still be about if not the same even in the face of 
concurrency, unlike the others that will likely be flat out incorrect.
   
   Seperately, this doesnt seem to have any check whether there is a 
transaction before incrementing the _messagesAcknowledgedAwaitingCommit_ which 
seems off, is it just assuming there is one?
   
   



##########
artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js:
##########
@@ -147,7 +150,10 @@ var Artemis;
             { header: 'Validated User', name: 'validatedUser'},
             { header: 'Address', itemField: 'addressName' , htmlTemplate: 
'producers-anchor-column-template', colActionFn: (item) => 
selectAddress(item.idx) },
             { header: 'Remote Address', itemField: 'remoteAddress' },
-            { header: 'Local Address', itemField: 'localAddress' }
+            { header: 'Local Address', itemField: 'localAddress' },
+            { header: 'Messages Sent', itemField: 'msgSent'},
+            { header: 'Messages Sent Size', itemField: 'msgSizeSent'},
+            { header: ' Last UUID Sent', itemField: 'lastUUIDSent'}

Review Comment:
   Wayward space at start of name? +same comments as earlier.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -2340,10 +2318,24 @@ public Pair<SimpleString, EnumSet<RoutingType>> 
getAddressAndRoutingTypes(Simple
    }
 
    @Override
-   public void addProducer(ServerProducer serverProducer) {
+   public void addProducer(ServerProducer serverProducer, String address) {
+      if (address == null) {
+         //this is ok as it will be an anonymous producer which we dont track
+         return;
+      }
       serverProducer.setSessionID(getName());
-      serverProducer.setConnectionID(getConnectionID().toString());
-      producers.put(serverProducer.getID(), serverProducer);
+      serverProducer.setConnectionID(getConnectionID() != null ? 
getConnectionID().toString() : null);
+      producers.put(address, serverProducer);

Review Comment:
   Inserting by address muddies whether it is tracking everything only 
per-address or per-actual-producer. It is also mismatched with the 
removeProducer() method still using the 'producer id/name' rather than the 
address to try removing things, which will almost always fail to remove 
anything. I dont see anything removing based on address, other than the 
FQQN-specific bit below this. There is an MQTT bit also trying to remove based 
on its session name, which its not clear is ever used to insert.
   
   If a producer is created for the same address previously used (either 
anonymously or by another fixed producer) this simply overwrites the existing 
metrics for it to this point, effectively zero'ing them, regardless whether the 
previous producer is still active. It also only counts as 1 producer for the 
'producer count' when there may still be >=2.
   
   Kind of feels like it should be tracking the real producers only (inc 
anonymous ones), or else possibly also incorporating whether its an anonymous 
sender or not and the producer name so it can track them all properly.



##########
artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js:
##########
@@ -79,7 +79,10 @@ var Artemis;
                 {name: "Validated User", visible: false},
                 {name: "Address", visible: true},
                 {name: "Remote Address", visible: true},
-                {name: "Local Address", visible: true}
+                {name: "Local Address", visible: true},
+                {name: "Messages Sent", visible: false},
+                {name: "Messages Sent Size", visible: false},
+                {name: "Last UUID Sent", visible: false}

Review Comment:
   The value in here isnt necessarily a UUID, and its not so clear that this is 
referring to what most things call Message ID. Per other comment, something 
clearer like Last Produced Message ID or the prior Last Sent Message ID name 
the session used would seem nicer. Even if the underlying field were still 
named lastUUIDSent, we can present it in a more consistently understandable way.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -2340,10 +2318,24 @@ public Pair<SimpleString, EnumSet<RoutingType>> 
getAddressAndRoutingTypes(Simple
    }
 
    @Override
-   public void addProducer(ServerProducer serverProducer) {
+   public void addProducer(ServerProducer serverProducer, String address) {
+      if (address == null) {
+         //this is ok as it will be an anonymous producer which we dont track
+         return;
+      }
       serverProducer.setSessionID(getName());
-      serverProducer.setConnectionID(getConnectionID().toString());
-      producers.put(serverProducer.getID(), serverProducer);
+      serverProducer.setConnectionID(getConnectionID() != null ? 
getConnectionID().toString() : null);
+      producers.put(address, serverProducer);
+      if (CompositeAddress.isFullyQualified(address)) {
+         PostQueueDeletionCallback postQueueDeletionCallback = new 
PostQueueDeletionCallback() {
+            @Override
+            public void callback(SimpleString address, SimpleString queueName) 
throws Exception {
+               producers.remove(CompositeAddress.toFullyQualified(address, 
queueName).toString());
+               server.unregisterPostQueueDeletionCallback(this);

Review Comment:
   If the FQQN related queue isnt deleted, isnt this callback registration 
going to retain the session object and anything it refers to forever? Nothing 
else looks to have a reference to the callback to unregister it, and its not 
clear when it would if it did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to