This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 54f5dae458 ARTEMIS-4575 Only start the consumers that were added
54f5dae458 is described below

commit 54f5dae458ab7bc1ce554e8c5b74c8fa30401833
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Jan 19 15:37:15 2024 -0500

    ARTEMIS-4575 Only start the consumers that were added
    
    Change from forcing a session start cycle on each consumer add
    event and start only those consumers that were added which will
    trigger a prompt delivery action on each. The session should be
    marked started on create to account for the remove of the start
    on each consumer add event.
---
 .../artemis/core/protocol/openwire/OpenWireConnection.java        | 3 ++-
 .../activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java  | 8 ++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 670eca258d..eb51aad727 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -982,7 +982,7 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             return;
          }
 
-         amqSession.start();
+         consumersList.forEach((c) -> c.start());
 
          if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             //advisory for temp destinations
@@ -1122,6 +1122,7 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    public AMQSession addSession(SessionInfo ss) {
       AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, 
this, protocolManager, coreMessageObjectPools);
       amqSession.initialize();
+      amqSession.start();
 
       sessions.put(ss.getSessionId(), amqSession);
       sessionIdMap.put(amqSession.getCoreSession().getName(), 
ss.getSessionId());
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 38a249e04c..03c99b74ff 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -137,6 +137,14 @@ public class AMQConsumer {
       return filterString;
    }
 
+   public void start() {
+      if (serverConsumer == null) {
+         throw new IllegalStateException("Cannot start the AMQConsumer until 
it has been initialized");
+      }
+
+      serverConsumer.setStarted(true);
+   }
+
    public void init(SlowConsumerDetectionListener 
slowConsumerDetectionListener, long nativeId) throws Exception {
 
       SimpleString selector = info.getSelector() == null ? null : new 
SimpleString(convertOpenWireToActiveMQFilterString(info.getSelector()));

Reply via email to