gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r1068419406


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java:
##########
@@ -635,6 +648,27 @@ private void slowPacketHandler(final Packet packet) {
                   }
                   break;
                }
+               case CREATE_PRODUCER: {
+                  CreateProducerMessage message = (CreateProducerMessage) 
packet;
+                  // this is used to create/destroy the producer so needs to 
be unique
+                  String senderName = null;
+                  if (!producers.containsKey(message.getId())) {
+                     senderName = PRODUCER_ID_PREFIX + 
UUIDGenerator.getInstance().generateUUID();
+                     producers.put(message.getId(), senderName);
+                  }
+                  session.addProducer(senderName, 
ActiveMQClient.DEFAULT_CORE_PROTOCOL, message.getAddress() != null ? 
message.getAddress().toString() : null);

Review Comment:
   This would still call addProducer even supposing an id was reused, but would 
call it with null senderName. Should probably not call it, or log or throw?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java:
##########
@@ -635,6 +648,27 @@ private void slowPacketHandler(final Packet packet) {
                   }
                   break;
                }
+               case CREATE_PRODUCER: {
+                  CreateProducerMessage message = (CreateProducerMessage) 
packet;
+                  // this is used to create/destroy the producer so needs to 
be unique
+                  String senderName = null;
+                  if (!producers.containsKey(message.getId())) {
+                     senderName = PRODUCER_ID_PREFIX + 
UUIDGenerator.getInstance().generateUUID();
+                     producers.put(message.getId(), senderName);
+                  }
+                  session.addProducer(senderName, 
ActiveMQClient.DEFAULT_CORE_PROTOCOL, message.getAddress() != null ? 
message.getAddress().toString() : null);
+                  break;
+               }
+               case REMOVE_PRODUCER: {
+                  RemoveProducerMessage message = (RemoveProducerMessage) 
packet;
+                  String remove = producers.remove(message.getId());
+                  if (remove != null) {
+                     session.removeProducer(remove);
+                  } else {
+                     
ActiveMQServerLogger.LOGGER.producerDoesNotExist(session.getName());

Review Comment:
   Should probably say which producer (id) didnt exist to give some detail 
beyond the session. Can you tell which connection it is from the session name? 
If not then perhaps that also to aid tracking down problem client.



##########
artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java:
##########
@@ -55,11 +55,14 @@
 import java.lang.invoke.MethodHandles;
 
 import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
+import static 
org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory.STOMP_PROTOCOL_NAME;
 
 public class StompSession implements SessionCallback {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+   private static final String senderName = 
UUIDGenerator.getInstance().generateUUID().toString();

Review Comment:
   Shouldnt be static or theyll all have same name.



##########
artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java:
##########
@@ -109,6 +112,7 @@ public boolean isWritable(ReadyListener callback, Object 
protocolContext) {
 
    void setServerSession(ServerSession session) {
       this.session = session;
+      session.addProducer(senderName, STOMP_PROTOCOL_NAME, null);

Review Comment:
   Should this pass ANONYMOUS explicitly like some others?



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,139 @@ public String getConnectionRemoteAddress() {
       return 
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
    }
 
+   @Override
+   public int getMessagesInTransitSize() {
+      return metrics.getMessagesInTransitSize();
+   }
+
+   @Override
+   public int getMessagesInTransit() {
+      return deliveringRefs.size();
+   }
+
+   @Override
+   public long getLastDeliveredTime() {
+      return metrics.getLastDeliveredTime();
+   }
+
+   @Override
+   public long getLastAcknowledgedTime() {
+      return metrics.getLastAcknowledgedTime();
+   }
+
+   @Override
+   public long getMessagesAcknowledged() {
+      return metrics.getMessagesAcknowledged();
+   }
+
+   @Override
+   public int getMessagesDeliveredSize() {
+      return metrics.getMessagesDeliveredSize();
+   }
+
+   @Override
+   public int getMessagesDelivered() {
+      return metrics.getMessagesDelivered();
+   }
+
+   @Override
+   public int getMessagesAcknowledgedAwaitingCommit() {
+      return metrics.getMessagesAcknowledgedAwaitingCommit();
+   }
+
    public SessionCallback getCallback() {
       return callback;
    }
+
+   static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+      /**
+       * Since messages can be delivered (incremented) and acknowledged 
(decremented) at the same time we have to protect
+       * the encode size and make it atomic. The other fields are ok since 
they are only accessed from a single thread.
+       */
+      private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics> 
messagesInTransitSizeUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, 
"messagesInTransitSize");
+
+      private volatile int messagesInTransitSize = 0;
+
+      private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics> 
messagesAcknowledgedAwaitingCommitUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, 
"messagesAcknowledgedAwaitingCommit");
+
+      private volatile int messagesAcknowledgedAwaitingCommit = 0;
+
+      private int messagesDeliveredSize = 0;
+
+      private volatile long lastDeliveredTime = 0;
+
+      private volatile long lastAcknowledgedTime = 0;
+
+      private int messagesDelivered = 0;

Review Comment:
   If messagesDelivered and messagesDeliveredSize are primarily being read from 
other threads than the updaters, and arent synchronizing any of the same thing 
things used when updating the values, they should be volatile also ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to