This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new ec21009  ARTEMIS-2230 Exception closing advisory consumers when 
supportAdvisory=false
ec21009 is described below

commit ec21009edc17f3c501eab287fd62fc5218350810
Author: Howard Gao <howard....@gmail.com>
AuthorDate: Fri Jan 18 10:32:25 2019 +0800

    ARTEMIS-2230 Exception closing advisory consumers when supportAdvisory=false
    
    When broker's advisory is disabled (supportAdvisory=false) any
    advisory consumer won't get created at broker and the advisory
    consumer ID won't be stored.
    
    Legacy openwire clients can have a reference of advisory consumer
    regardless broker's settings and therefore when it closes the
    advisory consumer the broker has no reference to it.
    Therefore broker throws an exception like:
    
    javax.jms.IllegalStateException: Cannot
    remove a consumer that had not been registered
    
    If the broker stores the consumer info (even it doesn't create
    it) the exception can be avoided.
    
    (cherry picked from commit b5e7b703b5edb9126e016a897e6f4e6133632b9b)
---
 .../core/protocol/openwire/OpenWireConnection.java |   6 +-
 .../integration/openwire/DisableAdvisoryTest.java  | 102 +++++++++++++++++++++
 2 files changed, 106 insertions(+), 2 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index ced463b..320b42e 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -863,12 +863,14 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
          }
 
          List<AMQConsumer> consumersList = amqSession.createConsumer(info, new 
SlowConsumerDetection());
+
+         this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, 
consumersList);
+         ss.addConsumer(info);
+
          if (consumersList.size() == 0) {
             return;
          }
 
-         this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, 
consumersList);
-         ss.addConsumer(info);
          amqSession.start();
 
          if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/DisableAdvisoryTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/DisableAdvisoryTest.java
new file mode 100644
index 0000000..c7fe96e
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/DisableAdvisoryTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.remoting.server.RemotingService;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.state.ConsumerState;
+import org.apache.activemq.state.SessionState;
+import org.junit.Test;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Collection;
+import java.util.Set;
+
+public class DisableAdvisoryTest extends BasicOpenWireTest {
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      Set<TransportConfiguration> acceptors = 
serverConfig.getAcceptorConfigurations();
+      for (TransportConfiguration tc : acceptors) {
+         if (tc.getName().equals("netty")) {
+            tc.getExtraParams().put("supportAdvisory", "false");
+            break;
+         }
+      }
+   }
+
+   @Test
+   public void testAdvisoryCosnumerRemoveWarning() throws Exception {
+
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+      Queue queue = session.createQueue(queueName);
+
+      MessageProducer producer = session.createProducer(queue);
+
+      TextMessage message = session.createTextMessage("This is a text 
message");
+
+      System.out.println("Sent message: " + message.getText());
+
+      producer.send(message);
+
+      MessageConsumer messageConsumer = session.createConsumer(queue);
+
+      connection.start();
+
+      TextMessage messageReceived = (TextMessage) 
messageConsumer.receive(5000);
+
+      System.out.println("Received message: " + messageReceived.getText());
+
+      //Openwire client will create advisory consumers to the broker.
+      //The broker is configured supportAdvisory=false and therefore doesn't
+      //actually create a server consumer. However the consumer info should be
+      //kept in the connection state so that when client closes the consumer
+      //it won't cause the broker to throw an exception because of the missing
+      //consumer info.
+      //See OpenWireConnection.CommandProcessor.processRemoveConsumer
+      ActiveMQSession owSession = (ActiveMQSession) session;
+      RemotingService remotingService = server.getRemotingService();
+      Set<RemotingConnection> conns = remotingService.getConnections();
+      assertEquals(1, conns.size());
+      OpenWireConnection owconn = (OpenWireConnection) conns.iterator().next();
+      Collection<SessionState> sstates = owconn.getState().getSessionStates();
+      //there must be 2 sessions, one is normal, the other is for advisories
+      assertEquals(2, sstates.size());
+      boolean hasAdvisoryConsumer = false;
+      for (SessionState state : sstates) {
+         Collection<ConsumerState> cstates = state.getConsumerStates();
+         for (ConsumerState cs : cstates) {
+            cs.getInfo().getDestination();
+            if 
(AdvisorySupport.isAdvisoryTopic(cs.getInfo().getDestination())) {
+               hasAdvisoryConsumer = true;
+               break;
+            }
+         }
+      }
+      assertTrue(hasAdvisoryConsumer);
+   }
+}

Reply via email to