gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r1018966363
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerProducer.java:
##########
@@ -32,4 +32,16 @@ public interface ServerProducer {
String getID();
long getCreationTime();
+
+ void setUserID(Object userID);
Review Comment:
Once again I have seen this and thought it actually meant User ID, rather
than actually being what everyone else calls Message ID.
Is the aim to show the 'last produced MessageID on this address' (since the
ServerProducer object is not actually really producer-specific or tied to a
producer entity, even where they exist)? If so could we call it something more
specific LastProducedMessageID ?
(EDIT: I now see 'getLastSentMessageID' was actually just removed from
ServerSession in the changes..thats also nicer)
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java:
##########
@@ -18,13 +18,19 @@
import org.apache.activemq.artemis.core.server.ServerProducer;
+import java.util.concurrent.atomic.AtomicLong;
+
public class ServerProducerImpl implements ServerProducer {
private final String ID;
private final String protocol;
private final long creationTime;
+ private AtomicLong messagesSent = new AtomicLong(0);
+
+ private AtomicLong messagesSentSize = new AtomicLong(0);
Review Comment:
Although I dont actually like FieldUpdater's much, since these objects are
effectively created per-address-per-session rather than just actually
per-producer, they are probably even better candidates for use of a
FieldUpdater than the consumer metric was, as there may be a lot more of these
than actual producers,whereas on the consumer side it is 'just' 1:1 with the
number of consumers.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,130 @@ public String getConnectionRemoteAddress() {
return
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
}
+ @Override
+ public int getMessagesInTransitSize() {
+ return metrics.getMessagesInTransitSize();
+ }
+
+
+ @Override
+ public int getMessagesInTransit() {
+ return deliveringRefs.size();
+ }
+
+ @Override
+ public long getLastDeliveredTime() {
+ return metrics.getLastDeliveredTime();
+ }
+
+ @Override
+ public long getLastAcknowledgedTime() {
+ return metrics.getLastAcknowledgedTime();
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ return metrics.getMessagesAcknowledged();
+ }
+
+ @Override
+ public int getMessagesDeliveredSize() {
+ return metrics.getMessagesDeliveredSize();
+ }
+
+ @Override
+ public int getMessagesDelivered() {
+ return metrics.getMessagesDelivered();
+ }
+
+ @Override
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return metrics.getMessagesAcknowledgedAwaitingCommit();
+ }
+
public SessionCallback getCallback() {
return callback;
}
+
+ static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+ /**
+ * Since messages can be delivered (incremented) and acknowledged
(decremented) at the same time we have to protect
+ * the encode size and make it atomic. The other fields are ok since
they are only accessed from a single thread.
+ */
+ AtomicIntegerFieldUpdater<ServerConsumerMetrics>
messagesInTransitSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class,
"messagesInTransitSize");
+ private volatile int messagesInTransitSize = 0;
+
+ private int messagesDeliveredSize = 0;
+
+ private long lastDeliveredTime = 0;
+
+ private long lastAcknowledgedTime = 0;
+
+ private int messagesDelivered = 0;
+
+ private int messagesAcknowledged = 0;
+
+ private int messagesAcknowledgedAwaitingCommit = 0;
+
+ public int getMessagesInTransitSize() {
+ return messagesInTransitSize;
+ }
+
+ public int getMessagesDeliveredSize() {
+ return messagesDeliveredSize;
+ }
+
+ public long getLastDeliveredTime() {
+ return lastDeliveredTime;
+ }
+
+ public long getLastAcknowledgedTime() {
+ return lastAcknowledgedTime;
+ }
+
+ public int getMessagesDelivered() {
+ return messagesDelivered;
+ }
+
+ public int getMessagesAcknowledged() {
+ return messagesAcknowledged;
+ }
+
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return messagesAcknowledgedAwaitingCommit;
+ }
+
+ public void addMessage(int encodeSize) {
+ messagesInTransitSizeUpdater.addAndGet(this, encodeSize);
+ messagesDeliveredSize += encodeSize;
+ messagesDelivered++;
+ lastDeliveredTime = System.currentTimeMillis();
+ }
+
+ public void addAcknowledge(int encodeSize) {
+ messagesInTransitSizeUpdater.addAndGet(this, -encodeSize);
+ messagesAcknowledged++;
+ messagesAcknowledgedAwaitingCommit++;
+ lastAcknowledgedTime = System.currentTimeMillis();
+ }
Review Comment:
the calling of this whole method didnt seem thread-safe, and if so all the
values updated in it arent safe, especially the messagesAcknowledged++; and
messagesAcknowledgedAwaitingCommit++; operations doing non-atomic arithmetic.
The lastAcknowledgedTime update not being safe probably doenst matter given its
only an indicator and will still be about if not the same even in the face of
concurrency, unlike the others that will likely be flat out incorrect.
Seperately, this doesnt seem to have any check whether there is a
transaction before incrementing the _messagesAcknowledgedAwaitingCommit_ which
seems off, is it just assuming there is one?
##########
artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js:
##########
@@ -147,7 +150,10 @@ var Artemis;
{ header: 'Validated User', name: 'validatedUser'},
{ header: 'Address', itemField: 'addressName' , htmlTemplate:
'producers-anchor-column-template', colActionFn: (item) =>
selectAddress(item.idx) },
{ header: 'Remote Address', itemField: 'remoteAddress' },
- { header: 'Local Address', itemField: 'localAddress' }
+ { header: 'Local Address', itemField: 'localAddress' },
+ { header: 'Messages Sent', itemField: 'msgSent'},
+ { header: 'Messages Sent Size', itemField: 'msgSizeSent'},
+ { header: ' Last UUID Sent', itemField: 'lastUUIDSent'}
Review Comment:
Wayward space at start of name? +same comments as earlier.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -2340,10 +2318,24 @@ public Pair<SimpleString, EnumSet<RoutingType>>
getAddressAndRoutingTypes(Simple
}
@Override
- public void addProducer(ServerProducer serverProducer) {
+ public void addProducer(ServerProducer serverProducer, String address) {
+ if (address == null) {
+ //this is ok as it will be an anonymous producer which we dont track
+ return;
+ }
serverProducer.setSessionID(getName());
- serverProducer.setConnectionID(getConnectionID().toString());
- producers.put(serverProducer.getID(), serverProducer);
+ serverProducer.setConnectionID(getConnectionID() != null ?
getConnectionID().toString() : null);
+ producers.put(address, serverProducer);
Review Comment:
Inserting by address muddies whether it is tracking everything only
per-address or per-actual-producer. It is also mismatched with the
removeProducer() method still using the 'producer id/name' rather than the
address to try removing things, which will almost always fail to remove
anything. I dont see anything removing based on address, other than the
FQQN-specific bit below this. There is an MQTT bit also trying to remove based
on its session name, which its not clear is ever used to insert.
If a producer is created for the same address previously used (either
anonymously or by another fixed producer) this simply overwrites the existing
metrics for it to this point, effectively zero'ing them, regardless whether the
previous producer is still active. It also only counts as 1 producer for the
'producer count' when there may still be >=2.
Kind of feels like it should be tracking the real producers only (inc
anonymous ones), or else possibly also incorporating whether its an anonymous
sender or not and the producer name so it can track them all properly.
##########
artemis-hawtio/artemis-plugin/src/main/webapp/plugin/js/components/producers.js:
##########
@@ -79,7 +79,10 @@ var Artemis;
{name: "Validated User", visible: false},
{name: "Address", visible: true},
{name: "Remote Address", visible: true},
- {name: "Local Address", visible: true}
+ {name: "Local Address", visible: true},
+ {name: "Messages Sent", visible: false},
+ {name: "Messages Sent Size", visible: false},
+ {name: "Last UUID Sent", visible: false}
Review Comment:
The value in here isnt necessarily a UUID, and its not so clear that this is
referring to what most things call Message ID. Per other comment, something
clearer like Last Produced Message ID or the prior Last Sent Message ID name
the session used would seem nicer. Even if the underlying field were still
named lastUUIDSent, we can present it in a more consistently understandable way.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java:
##########
@@ -2340,10 +2318,24 @@ public Pair<SimpleString, EnumSet<RoutingType>>
getAddressAndRoutingTypes(Simple
}
@Override
- public void addProducer(ServerProducer serverProducer) {
+ public void addProducer(ServerProducer serverProducer, String address) {
+ if (address == null) {
+ //this is ok as it will be an anonymous producer which we dont track
+ return;
+ }
serverProducer.setSessionID(getName());
- serverProducer.setConnectionID(getConnectionID().toString());
- producers.put(serverProducer.getID(), serverProducer);
+ serverProducer.setConnectionID(getConnectionID() != null ?
getConnectionID().toString() : null);
+ producers.put(address, serverProducer);
+ if (CompositeAddress.isFullyQualified(address)) {
+ PostQueueDeletionCallback postQueueDeletionCallback = new
PostQueueDeletionCallback() {
+ @Override
+ public void callback(SimpleString address, SimpleString queueName)
throws Exception {
+ producers.remove(CompositeAddress.toFullyQualified(address,
queueName).toString());
+ server.unregisterPostQueueDeletionCallback(this);
Review Comment:
If the FQQN related queue isnt deleted, isnt this callback registration
going to retain the session object and anything it refers to forever? Nothing
else looks to have a reference to the callback to unregister it, and its not
clear when it would if it did.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]