This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.6.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/2.6.x by this push: new ec21009 ARTEMIS-2230 Exception closing advisory consumers when supportAdvisory=false ec21009 is described below commit ec21009edc17f3c501eab287fd62fc5218350810 Author: Howard Gao <howard....@gmail.com> AuthorDate: Fri Jan 18 10:32:25 2019 +0800 ARTEMIS-2230 Exception closing advisory consumers when supportAdvisory=false When broker's advisory is disabled (supportAdvisory=false) any advisory consumer won't get created at broker and the advisory consumer ID won't be stored. Legacy openwire clients can have a reference of advisory consumer regardless broker's settings and therefore when it closes the advisory consumer the broker has no reference to it. Therefore broker throws an exception like: javax.jms.IllegalStateException: Cannot remove a consumer that had not been registered If the broker stores the consumer info (even it doesn't create it) the exception can be avoided. (cherry picked from commit b5e7b703b5edb9126e016a897e6f4e6133632b9b) --- .../core/protocol/openwire/OpenWireConnection.java | 6 +- .../integration/openwire/DisableAdvisoryTest.java | 102 +++++++++++++++++++++ 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index ced463b..320b42e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -863,12 +863,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } List<AMQConsumer> consumersList = amqSession.createConsumer(info, new SlowConsumerDetection()); + + this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList); + ss.addConsumer(info); + if (consumersList.size() == 0) { return; } - this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList); - ss.addConsumer(info); amqSession.start(); if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/DisableAdvisoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/DisableAdvisoryTest.java new file mode 100644 index 0000000..c7fe96e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/DisableAdvisoryTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; +import org.apache.activemq.artemis.core.remoting.server.RemotingService; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.state.ConsumerState; +import org.apache.activemq.state.SessionState; +import org.junit.Test; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Collection; +import java.util.Set; + +public class DisableAdvisoryTest extends BasicOpenWireTest { + + @Override + protected void extraServerConfig(Configuration serverConfig) { + Set<TransportConfiguration> acceptors = serverConfig.getAcceptorConfigurations(); + for (TransportConfiguration tc : acceptors) { + if (tc.getName().equals("netty")) { + tc.getExtraParams().put("supportAdvisory", "false"); + break; + } + } + } + + @Test + public void testAdvisoryCosnumerRemoveWarning() throws Exception { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("This is a text message"); + + System.out.println("Sent message: " + message.getText()); + + producer.send(message); + + MessageConsumer messageConsumer = session.createConsumer(queue); + + connection.start(); + + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + System.out.println("Received message: " + messageReceived.getText()); + + //Openwire client will create advisory consumers to the broker. + //The broker is configured supportAdvisory=false and therefore doesn't + //actually create a server consumer. However the consumer info should be + //kept in the connection state so that when client closes the consumer + //it won't cause the broker to throw an exception because of the missing + //consumer info. + //See OpenWireConnection.CommandProcessor.processRemoveConsumer + ActiveMQSession owSession = (ActiveMQSession) session; + RemotingService remotingService = server.getRemotingService(); + Set<RemotingConnection> conns = remotingService.getConnections(); + assertEquals(1, conns.size()); + OpenWireConnection owconn = (OpenWireConnection) conns.iterator().next(); + Collection<SessionState> sstates = owconn.getState().getSessionStates(); + //there must be 2 sessions, one is normal, the other is for advisories + assertEquals(2, sstates.size()); + boolean hasAdvisoryConsumer = false; + for (SessionState state : sstates) { + Collection<ConsumerState> cstates = state.getConsumerStates(); + for (ConsumerState cs : cstates) { + cs.getInfo().getDestination(); + if (AdvisorySupport.isAdvisoryTopic(cs.getInfo().getDestination())) { + hasAdvisoryConsumer = true; + break; + } + } + } + assertTrue(hasAdvisoryConsumer); + } +}