This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 54f5dae458 ARTEMIS-4575 Only start the consumers that were added
54f5dae458 is described below
commit 54f5dae458ab7bc1ce554e8c5b74c8fa30401833
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Jan 19 15:37:15 2024 -0500
ARTEMIS-4575 Only start the consumers that were added
Change from forcing a session start cycle on each consumer add
event and start only those consumers that were added which will
trigger a prompt delivery action on each. The session should be
marked started on create to account for the remove of the start
on each consumer add event.
---
.../artemis/core/protocol/openwire/OpenWireConnection.java | 3 ++-
.../activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java | 8 ++++++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 670eca258d..eb51aad727 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -982,7 +982,7 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
return;
}
- amqSession.start();
+ consumersList.forEach((c) -> c.start());
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
//advisory for temp destinations
@@ -1122,6 +1122,7 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
public AMQSession addSession(SessionInfo ss) {
AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server,
this, protocolManager, coreMessageObjectPools);
amqSession.initialize();
+ amqSession.start();
sessions.put(ss.getSessionId(), amqSession);
sessionIdMap.put(amqSession.getCoreSession().getName(),
ss.getSessionId());
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 38a249e04c..03c99b74ff 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -137,6 +137,14 @@ public class AMQConsumer {
return filterString;
}
+ public void start() {
+ if (serverConsumer == null) {
+ throw new IllegalStateException("Cannot start the AMQConsumer until
it has been initialized");
+ }
+
+ serverConsumer.setStarted(true);
+ }
+
public void init(SlowConsumerDetectionListener
slowConsumerDetectionListener, long nativeId) throws Exception {
SimpleString selector = info.getSelector() == null ? null : new
SimpleString(convertOpenWireToActiveMQFilterString(info.getSelector()));