gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r1067904831


##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java:
##########
@@ -2026,10 +2028,20 @@ private ClientProducer internalCreateProducer(final 
SimpleString address,
                                                  final int maxRate) throws 
ActiveMQException {
       checkClosed();
 
-      ClientProducerInternal producer = new ClientProducerImpl(this, address, 
maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false), 
autoCommitSends && blockOnNonDurableSend, autoCommitSends && 
blockOnDurableSend, autoGroup, groupID == null ? null : new 
SimpleString(groupID), minLargeMessageSize, sessionContext);
+      ClientProducerInternal producer = new ClientProducerImpl(this,
+                                                                      address,
+                                                                      maxRate 
== -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
+                                                  autoCommitSends && 
blockOnNonDurableSend,
+                                                     autoCommitSends && 
blockOnDurableSend,
+                                                                      
autoGroup, groupID == null ? null : new SimpleString(groupID),
+                                                                      
minLargeMessageSize,
+                                                                      
sessionContext,
+                                                                      
producerIDs.getAndIncrement());

Review Comment:
   Purely personal choice/style, I'd usually go with incrementAndGet so that 
the variable effectively always contains the 'current/last value for 
count/index/whatever-it-is' rather than the next value thats currently 
unused...unless e.g 'next value' is the info needed for comparison.



##########
artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java:
##########
@@ -379,7 +379,7 @@ public void setNoLocal(boolean noLocal) {
    }
 
    public void sendInternal(Message message, boolean direct) throws Exception {
-      session.send(message, direct);
+      session.send(message, direct, null);

Review Comment:
   Should this be adding a name (and similarly in the other send call) ?



##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java:
##########
@@ -72,6 +74,8 @@ public class MQTTPublishManager {
 
    private SimpleString managementAddress;
 
+   private static final String senderName = 
UUIDGenerator.getInstance().generateUUID().toString();

Review Comment:
   Shouldnt be static, so each 'publish manager' is considered a different 
sender.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java:
##########
@@ -18,23 +18,50 @@
 
 import org.apache.activemq.artemis.core.server.ServerProducer;
 
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
 public class ServerProducerImpl implements ServerProducer {
-   private final String ID;
+
+   private static final AtomicLong PRODUCER_ID_GENERATOR = new AtomicLong();
+
+   private final long ID;
+   private final String name;
    private final String protocol;
    private final long creationTime;
+   private volatile long messagesSent = 0;
+   private volatile long messagesSentSize = 0;
+
+   private static final AtomicLongFieldUpdater<ServerProducerImpl> 
messagesSentUpdater = 
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSent");
 
+   private static final AtomicLongFieldUpdater<ServerProducerImpl> 
messagesSentSizeUpdater = 
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSentSize");
 
    private final String address;
+
+   private Object lastProducedMessageID;
+
    private String sessionID;
+
    private String connectionID;
 
-   public ServerProducerImpl(String ID, String protocol, String address) {
-      this.ID = ID;
+   public ServerProducerImpl(String name, String protocol, String address) {
+      this.ID = PRODUCER_ID_GENERATOR.getAndIncrement();

Review Comment:
   Same as earlier incrementAndGet() comment.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,137 @@ public String getConnectionRemoteAddress() {
       return 
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
    }
 
+   @Override
+   public int getMessagesInTransitSize() {
+      return metrics.getMessagesInTransitSize();
+   }
+
+   @Override
+   public int getMessagesInTransit() {
+      return deliveringRefs.size();
+   }
+
+   @Override
+   public long getLastDeliveredTime() {
+      return metrics.getLastDeliveredTime();
+   }
+
+   @Override
+   public long getLastAcknowledgedTime() {
+      return metrics.getLastAcknowledgedTime();
+   }
+
+   @Override
+   public long getMessagesAcknowledged() {
+      return metrics.getMessagesAcknowledged();
+   }
+
+   @Override
+   public int getMessagesDeliveredSize() {
+      return metrics.getMessagesDeliveredSize();
+   }
+
+   @Override
+   public int getMessagesDelivered() {
+      return metrics.getMessagesDelivered();
+   }
+
+   @Override
+   public int getMessagesAcknowledgedAwaitingCommit() {
+      return metrics.getMessagesAcknowledgedAwaitingCommit();
+   }
+
    public SessionCallback getCallback() {
       return callback;
    }
+
+   static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+      /**
+       * Since messages can be delivered (incremented) and acknowledged 
(decremented) at the same time we have to protect
+       * the encode size and make it atomic. The other fields are ok since 
they are only accessed from a single thread.
+       */
+      private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics> 
messagesInTransitSizeUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, 
"messagesInTransitSize");
+
+      private volatile int messagesInTransitSize = 0;
+
+      private static final    AtomicIntegerFieldUpdater<ServerConsumerMetrics> 
messagesAcknowledgedAwaitingCommitUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, 
"messagesAcknowledgedAwaitingCommit");
+
+      private volatile int messagesAcknowledgedAwaitingCommit = 0;
+
+      private int messagesDeliveredSize = 0;
+
+      private long lastDeliveredTime = 0;
+
+      private long lastAcknowledgedTime = 0;
+
+      private int messagesDelivered = 0;
+
+      private int messagesAcknowledged = 0;

Review Comment:
   Probably ought to be volatile as well, even if their writes are safe under 
synchronization the reader will typically be a different thread that may not 
synchronize those.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java:
##########
@@ -18,23 +18,50 @@
 
 import org.apache.activemq.artemis.core.server.ServerProducer;
 
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
 public class ServerProducerImpl implements ServerProducer {
-   private final String ID;
+
+   private static final AtomicLong PRODUCER_ID_GENERATOR = new AtomicLong();
+
+   private final long ID;
+   private final String name;
    private final String protocol;
    private final long creationTime;
+   private volatile long messagesSent = 0;
+   private volatile long messagesSentSize = 0;
+
+   private static final AtomicLongFieldUpdater<ServerProducerImpl> 
messagesSentUpdater = 
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSent");
 
+   private static final AtomicLongFieldUpdater<ServerProducerImpl> 
messagesSentSizeUpdater = 
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSentSize");
 
    private final String address;
+
+   private Object lastProducedMessageID;

Review Comment:
   Probably ought to be volatile as well as it will be the session etc thread 
setting it but the management bits reading it with no clear memory barrier to 
prod visibility update.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -139,7 +139,6 @@ public abstract class AMQPMessage extends RefCountMessage 
implements org.apache.
    public static final int MAX_MESSAGE_PRIORITY = 9;
 
    protected static final int VALUE_NOT_PRESENT = -1;
-

Review Comment:
   Can drop this leftover non-change, its better as it was and would avoid the 
commit touching the file.



##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java:
##########
@@ -92,6 +96,7 @@ public MQTTPublishManager(MQTTSession session, boolean 
closeMqttConnectionOnPubl
    synchronized void start() {
       this.state = session.getState();
       this.outboundStore = state.getOutboundStore();
+      session.getServerSession().addProducer(senderName, 
MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME, ServerProducer.ANONYMOUS);

Review Comment:
   Seems like this will happen for every session at creation, even if it never 
sends anything...could this be deferred at all, until something is sent?
   
   Also, stop() below has a mis-aligned remove call a few lines below which 
seems off:
   ```
         if (serversession != null) {
            serversession.removeProducer(serversession.getName());
         }
   ```



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java:
##########
@@ -635,6 +647,20 @@ private void slowPacketHandler(final Packet packet) {
                   }
                   break;
                }
+               case CREATE_PRODUCER: {
+                  CreateProducerMessage message = (CreateProducerMessage) 
packet;
+                  // this is used to create/destroy the producer so needs to 
be unique
+                  String senderName = PRODUCER_ID_PREFIX + 
UUIDGenerator.getInstance().generateUUID();
+                  producers.put(message.getId(), senderName);

Review Comment:
   Worth a check it doesnt already exist and barf  if it does?  (and also a 
check that it does on removal and maybe log?)



##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V3.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+/**
+ * A SessionSendContinuationMessage<br>
+ */
+public class SessionSendContinuationMessage_V3 extends 
SessionSendContinuationMessage_V2 {
+
+   private int senderID;
+
+   public SessionSendContinuationMessage_V3() {
+      super();
+   }
+
+   /**
+    * @param body
+    * @param continues
+    * @param requiresResponse
+    */
+   public SessionSendContinuationMessage_V3(final Message message,
+                                            final byte[] body,
+                                            final boolean continues,
+                                            final boolean requiresResponse,
+                                            final long messageBodySize,
+                                            final int senderID,
+                                            SendAcknowledgementHandler 
handler) {

Review Comment:
   Handler should be final too if all the others are.
   
   Might as well delete the javadoc since it doesnt detail anything, and doesnt 
cover more than half the args.
   
   (Probably same about where copied from..)



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,137 @@ public String getConnectionRemoteAddress() {
       return 
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
    }
 
+   @Override
+   public int getMessagesInTransitSize() {
+      return metrics.getMessagesInTransitSize();
+   }
+
+   @Override
+   public int getMessagesInTransit() {
+      return deliveringRefs.size();
+   }
+
+   @Override
+   public long getLastDeliveredTime() {
+      return metrics.getLastDeliveredTime();
+   }
+
+   @Override
+   public long getLastAcknowledgedTime() {
+      return metrics.getLastAcknowledgedTime();
+   }
+
+   @Override
+   public long getMessagesAcknowledged() {
+      return metrics.getMessagesAcknowledged();
+   }
+
+   @Override
+   public int getMessagesDeliveredSize() {
+      return metrics.getMessagesDeliveredSize();
+   }
+
+   @Override
+   public int getMessagesDelivered() {
+      return metrics.getMessagesDelivered();
+   }
+
+   @Override
+   public int getMessagesAcknowledgedAwaitingCommit() {
+      return metrics.getMessagesAcknowledgedAwaitingCommit();
+   }
+
    public SessionCallback getCallback() {
       return callback;
    }
+
+   static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+      /**
+       * Since messages can be delivered (incremented) and acknowledged 
(decremented) at the same time we have to protect
+       * the encode size and make it atomic. The other fields are ok since 
they are only accessed from a single thread.
+       */
+      private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics> 
messagesInTransitSizeUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, 
"messagesInTransitSize");
+
+      private volatile int messagesInTransitSize = 0;
+
+      private static final    AtomicIntegerFieldUpdater<ServerConsumerMetrics> 
messagesAcknowledgedAwaitingCommitUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, 
"messagesAcknowledgedAwaitingCommit");
+
+      private volatile int messagesAcknowledgedAwaitingCommit = 0;
+
+      private int messagesDeliveredSize = 0;
+
+      private long lastDeliveredTime = 0;
+
+      private long lastAcknowledgedTime = 0;
+
+      private int messagesDelivered = 0;
+
+      private int messagesAcknowledged = 0;
+
+
+      public int getMessagesInTransitSize() {
+         return messagesInTransitSizeUpdater.get(this);
+      }
+
+      public int getMessagesDeliveredSize() {
+         return messagesDeliveredSize;
+      }
+
+      public long getLastDeliveredTime() {
+         return lastDeliveredTime;
+      }
+
+      public long getLastAcknowledgedTime() {
+         return lastAcknowledgedTime;
+      }
+
+      public int getMessagesDelivered() {
+         return messagesDelivered;
+      }
+
+      public int getMessagesAcknowledged() {
+         return messagesAcknowledged;
+      }
+
+      public int getMessagesAcknowledgedAwaitingCommit() {
+         return messagesAcknowledgedAwaitingCommitUpdater.get(this);
+      }
+
+      public void addMessage(int encodeSize) {
+         messagesInTransitSizeUpdater.addAndGet(this, encodeSize);
+         messagesDeliveredSize += encodeSize;
+         messagesDelivered++;
+         lastDeliveredTime = System.currentTimeMillis();
+      }
+
+      public void addAcknowledge(int encodeSize, Transaction tx) {
+         messagesInTransitSizeUpdater.addAndGet(this, -encodeSize);
+         messagesAcknowledged++;

Review Comment:
   I think this one is unsafe also. Looks like rollback synchronizes 'lock' but 
not the consumer itself as some other calls of this method from other threads 
do. The openwire bit using it doesnt synchronize on either.
   
   Given the large number of places this method is used from, some on the queue 
thread, others not, and the mishmash of different synchronization by those 
calls...I think it would be simpler/safer to just use an updater for this 
regardless whether it is currently safe, it is certainly brittle and its likely 
someone change other bits in a way that breaks this area later if it isnt 
already (which it looks like it is).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to