gemmellr commented on code in PR #4183:
URL: https://github.com/apache/activemq-artemis/pull/4183#discussion_r1067904831
##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java:
##########
@@ -2026,10 +2028,20 @@ private ClientProducer internalCreateProducer(final
SimpleString address,
final int maxRate) throws
ActiveMQException {
checkClosed();
- ClientProducerInternal producer = new ClientProducerImpl(this, address,
maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
autoCommitSends && blockOnNonDurableSend, autoCommitSends &&
blockOnDurableSend, autoGroup, groupID == null ? null : new
SimpleString(groupID), minLargeMessageSize, sessionContext);
+ ClientProducerInternal producer = new ClientProducerImpl(this,
+ address,
+ maxRate
== -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
+ autoCommitSends &&
blockOnNonDurableSend,
+ autoCommitSends &&
blockOnDurableSend,
+
autoGroup, groupID == null ? null : new SimpleString(groupID),
+
minLargeMessageSize,
+
sessionContext,
+
producerIDs.getAndIncrement());
Review Comment:
Purely personal choice/style, I'd usually go with incrementAndGet so that
the variable effectively always contains the 'current/last value for
count/index/whatever-it-is' rather than the next value thats currently
unused...unless e.g 'next value' is the info needed for comparison.
##########
artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java:
##########
@@ -379,7 +379,7 @@ public void setNoLocal(boolean noLocal) {
}
public void sendInternal(Message message, boolean direct) throws Exception {
- session.send(message, direct);
+ session.send(message, direct, null);
Review Comment:
Should this be adding a name (and similarly in the other send call) ?
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java:
##########
@@ -72,6 +74,8 @@ public class MQTTPublishManager {
private SimpleString managementAddress;
+ private static final String senderName =
UUIDGenerator.getInstance().generateUUID().toString();
Review Comment:
Shouldnt be static, so each 'publish manager' is considered a different
sender.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java:
##########
@@ -18,23 +18,50 @@
import org.apache.activemq.artemis.core.server.ServerProducer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
public class ServerProducerImpl implements ServerProducer {
- private final String ID;
+
+ private static final AtomicLong PRODUCER_ID_GENERATOR = new AtomicLong();
+
+ private final long ID;
+ private final String name;
private final String protocol;
private final long creationTime;
+ private volatile long messagesSent = 0;
+ private volatile long messagesSentSize = 0;
+
+ private static final AtomicLongFieldUpdater<ServerProducerImpl>
messagesSentUpdater =
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSent");
+ private static final AtomicLongFieldUpdater<ServerProducerImpl>
messagesSentSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSentSize");
private final String address;
+
+ private Object lastProducedMessageID;
+
private String sessionID;
+
private String connectionID;
- public ServerProducerImpl(String ID, String protocol, String address) {
- this.ID = ID;
+ public ServerProducerImpl(String name, String protocol, String address) {
+ this.ID = PRODUCER_ID_GENERATOR.getAndIncrement();
Review Comment:
Same as earlier incrementAndGet() comment.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,137 @@ public String getConnectionRemoteAddress() {
return
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
}
+ @Override
+ public int getMessagesInTransitSize() {
+ return metrics.getMessagesInTransitSize();
+ }
+
+ @Override
+ public int getMessagesInTransit() {
+ return deliveringRefs.size();
+ }
+
+ @Override
+ public long getLastDeliveredTime() {
+ return metrics.getLastDeliveredTime();
+ }
+
+ @Override
+ public long getLastAcknowledgedTime() {
+ return metrics.getLastAcknowledgedTime();
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ return metrics.getMessagesAcknowledged();
+ }
+
+ @Override
+ public int getMessagesDeliveredSize() {
+ return metrics.getMessagesDeliveredSize();
+ }
+
+ @Override
+ public int getMessagesDelivered() {
+ return metrics.getMessagesDelivered();
+ }
+
+ @Override
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return metrics.getMessagesAcknowledgedAwaitingCommit();
+ }
+
public SessionCallback getCallback() {
return callback;
}
+
+ static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+ /**
+ * Since messages can be delivered (incremented) and acknowledged
(decremented) at the same time we have to protect
+ * the encode size and make it atomic. The other fields are ok since
they are only accessed from a single thread.
+ */
+ private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics>
messagesInTransitSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class,
"messagesInTransitSize");
+
+ private volatile int messagesInTransitSize = 0;
+
+ private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics>
messagesAcknowledgedAwaitingCommitUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class,
"messagesAcknowledgedAwaitingCommit");
+
+ private volatile int messagesAcknowledgedAwaitingCommit = 0;
+
+ private int messagesDeliveredSize = 0;
+
+ private long lastDeliveredTime = 0;
+
+ private long lastAcknowledgedTime = 0;
+
+ private int messagesDelivered = 0;
+
+ private int messagesAcknowledged = 0;
Review Comment:
Probably ought to be volatile as well, even if their writes are safe under
synchronization the reader will typically be a different thread that may not
synchronize those.
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerProducerImpl.java:
##########
@@ -18,23 +18,50 @@
import org.apache.activemq.artemis.core.server.ServerProducer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
public class ServerProducerImpl implements ServerProducer {
- private final String ID;
+
+ private static final AtomicLong PRODUCER_ID_GENERATOR = new AtomicLong();
+
+ private final long ID;
+ private final String name;
private final String protocol;
private final long creationTime;
+ private volatile long messagesSent = 0;
+ private volatile long messagesSentSize = 0;
+
+ private static final AtomicLongFieldUpdater<ServerProducerImpl>
messagesSentUpdater =
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSent");
+ private static final AtomicLongFieldUpdater<ServerProducerImpl>
messagesSentSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSentSize");
private final String address;
+
+ private Object lastProducedMessageID;
Review Comment:
Probably ought to be volatile as well as it will be the session etc thread
setting it but the management bits reading it with no clear memory barrier to
prod visibility update.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -139,7 +139,6 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
public static final int MAX_MESSAGE_PRIORITY = 9;
protected static final int VALUE_NOT_PRESENT = -1;
-
Review Comment:
Can drop this leftover non-change, its better as it was and would avoid the
commit touching the file.
##########
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java:
##########
@@ -92,6 +96,7 @@ public MQTTPublishManager(MQTTSession session, boolean
closeMqttConnectionOnPubl
synchronized void start() {
this.state = session.getState();
this.outboundStore = state.getOutboundStore();
+ session.getServerSession().addProducer(senderName,
MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME, ServerProducer.ANONYMOUS);
Review Comment:
Seems like this will happen for every session at creation, even if it never
sends anything...could this be deferred at all, until something is sent?
Also, stop() below has a mis-aligned remove call a few lines below which
seems off:
```
if (serversession != null) {
serversession.removeProducer(serversession.getName());
}
```
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java:
##########
@@ -635,6 +647,20 @@ private void slowPacketHandler(final Packet packet) {
}
break;
}
+ case CREATE_PRODUCER: {
+ CreateProducerMessage message = (CreateProducerMessage)
packet;
+ // this is used to create/destroy the producer so needs to
be unique
+ String senderName = PRODUCER_ID_PREFIX +
UUIDGenerator.getInstance().generateUUID();
+ producers.put(message.getId(), senderName);
Review Comment:
Worth a check it doesnt already exist and barf if it does? (and also a
check that it does on removal and maybe log?)
##########
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V3.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+/**
+ * A SessionSendContinuationMessage<br>
+ */
+public class SessionSendContinuationMessage_V3 extends
SessionSendContinuationMessage_V2 {
+
+ private int senderID;
+
+ public SessionSendContinuationMessage_V3() {
+ super();
+ }
+
+ /**
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ public SessionSendContinuationMessage_V3(final Message message,
+ final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse,
+ final long messageBodySize,
+ final int senderID,
+ SendAcknowledgementHandler
handler) {
Review Comment:
Handler should be final too if all the others are.
Might as well delete the javadoc since it doesnt detail anything, and doesnt
cover more than half the args.
(Probably same about where copied from..)
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java:
##########
@@ -1513,7 +1524,137 @@ public String getConnectionRemoteAddress() {
return
this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
}
+ @Override
+ public int getMessagesInTransitSize() {
+ return metrics.getMessagesInTransitSize();
+ }
+
+ @Override
+ public int getMessagesInTransit() {
+ return deliveringRefs.size();
+ }
+
+ @Override
+ public long getLastDeliveredTime() {
+ return metrics.getLastDeliveredTime();
+ }
+
+ @Override
+ public long getLastAcknowledgedTime() {
+ return metrics.getLastAcknowledgedTime();
+ }
+
+ @Override
+ public long getMessagesAcknowledged() {
+ return metrics.getMessagesAcknowledged();
+ }
+
+ @Override
+ public int getMessagesDeliveredSize() {
+ return metrics.getMessagesDeliveredSize();
+ }
+
+ @Override
+ public int getMessagesDelivered() {
+ return metrics.getMessagesDelivered();
+ }
+
+ @Override
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return metrics.getMessagesAcknowledgedAwaitingCommit();
+ }
+
public SessionCallback getCallback() {
return callback;
}
+
+ static class ServerConsumerMetrics extends TransactionOperationAbstract {
+
+ /**
+ * Since messages can be delivered (incremented) and acknowledged
(decremented) at the same time we have to protect
+ * the encode size and make it atomic. The other fields are ok since
they are only accessed from a single thread.
+ */
+ private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics>
messagesInTransitSizeUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class,
"messagesInTransitSize");
+
+ private volatile int messagesInTransitSize = 0;
+
+ private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics>
messagesAcknowledgedAwaitingCommitUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class,
"messagesAcknowledgedAwaitingCommit");
+
+ private volatile int messagesAcknowledgedAwaitingCommit = 0;
+
+ private int messagesDeliveredSize = 0;
+
+ private long lastDeliveredTime = 0;
+
+ private long lastAcknowledgedTime = 0;
+
+ private int messagesDelivered = 0;
+
+ private int messagesAcknowledged = 0;
+
+
+ public int getMessagesInTransitSize() {
+ return messagesInTransitSizeUpdater.get(this);
+ }
+
+ public int getMessagesDeliveredSize() {
+ return messagesDeliveredSize;
+ }
+
+ public long getLastDeliveredTime() {
+ return lastDeliveredTime;
+ }
+
+ public long getLastAcknowledgedTime() {
+ return lastAcknowledgedTime;
+ }
+
+ public int getMessagesDelivered() {
+ return messagesDelivered;
+ }
+
+ public int getMessagesAcknowledged() {
+ return messagesAcknowledged;
+ }
+
+ public int getMessagesAcknowledgedAwaitingCommit() {
+ return messagesAcknowledgedAwaitingCommitUpdater.get(this);
+ }
+
+ public void addMessage(int encodeSize) {
+ messagesInTransitSizeUpdater.addAndGet(this, encodeSize);
+ messagesDeliveredSize += encodeSize;
+ messagesDelivered++;
+ lastDeliveredTime = System.currentTimeMillis();
+ }
+
+ public void addAcknowledge(int encodeSize, Transaction tx) {
+ messagesInTransitSizeUpdater.addAndGet(this, -encodeSize);
+ messagesAcknowledged++;
Review Comment:
I think this one is unsafe also. Looks like rollback synchronizes 'lock' but
not the consumer itself as some other calls of this method from other threads
do. The openwire bit using it doesnt synchronize on either.
Given the large number of places this method is used from, some on the queue
thread, others not, and the mishmash of different synchronization by those
calls...I think it would be simpler/safer to just use an updater for this
regardless whether it is currently safe, it is certainly brittle and its likely
someone change other bits in a way that breaks this area later if it isnt
already (which it looks like it is).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]