gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r1025000974
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,130 @@ public String getConnectionRemoteAddress() {
return
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
}
+ @Override
+ public int getMessagesInTransitSize() {
+ return metrics.getMessagesInTransitSize();
+ }
+
+
+ @Override
+ public int getMessagesInTransit() {
+ return deliveringRefs.size();
+ }
+
+ @Override
+ public long getLastDeliveredTime() {
+ return metrics.getLastDeliveredTime();
+ }
+
+ @Override
+ public long getLastAcknowledgedTime() {
+ return metrics.getLastAcknowledgedTime();
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ return metrics.getMessagesAcknowledged();
+ }
+
+ @Override
+ public int getMessagesDeliveredSize() {
+ return metrics.getMessagesDeliveredSize();
+ }
+
+ @Override
+ public int getMessagesDelivered() {
+ return metrics.getMessagesDelivered();
+ }
+
+ @Override
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return metrics.getMessagesAcknowledgedAwaitingCommit();
+ }
+
public SessionCallback getCallback() {
return callback;
}
+
+ static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+ /**
+ * Since messages can be delivered (incremented) and acknowledged
(decremented) at the same time we have to protect
+ * the encode size and make it atomic. The other fields are ok since
they are only accessed from a single thread.
+ */
+ AtomicIntegerFieldUpdater<ServerConsumerMetrics>
messagesInTransitSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class,
"messagesInTransitSize");
+ private volatile int messagesInTransitSize = 0;
+
+ private int messagesDeliveredSize = 0;
+
+ private long lastDeliveredTime = 0;
+
+ private long lastAcknowledgedTime = 0;
+
+ private int messagesDelivered = 0;
+
+ private int messagesAcknowledged = 0;
+
+ private int messagesAcknowledgedAwaitingCommit = 0;
+
+ public int getMessagesInTransitSize() {
+ return messagesInTransitSize;
+ }
+
+ public int getMessagesDeliveredSize() {
+ return messagesDeliveredSize;
+ }
+
+ public long getLastDeliveredTime() {
+ return lastDeliveredTime;
+ }
+
+ public long getLastAcknowledgedTime() {
+ return lastAcknowledgedTime;
+ }
+
+ public int getMessagesDelivered() {
+ return messagesDelivered;
+ }
+
+ public int getMessagesAcknowledged() {
+ return messagesAcknowledged;
+ }
+
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return messagesAcknowledgedAwaitingCommit;
+ }
+
+ public void addMessage(int encodeSize) {
+ messagesInTransitSizeUpdater.addAndGet(this, encodeSize);
+ messagesDeliveredSize += encodeSize;
+ messagesDelivered++;
+ lastDeliveredTime = System.currentTimeMillis();
+ }
+
+ public void addAcknowledge(int encodeSize) {
+ messagesInTransitSizeUpdater.addAndGet(this, -encodeSize);
+ messagesAcknowledged++;
+ messagesAcknowledgedAwaitingCommit++;
+ lastAcknowledgedTime = System.currentTimeMillis();
+ }
Review Comment:
I tried this out with a simple modified HelloWorld from Qpid JMS, sending +
receiving some messages. The Messages Acknowledged Awaiting Commit value just
increases, matching the Messages Acknowledged value, whereas I would expect it
to be 0.
The Messages In Transit Size value was also incorrect/negative, being the
negation of the Messages Delivered Size, rather than being 0 like the Messages
In Transit Size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]