Repository: activemq Updated Branches: refs/heads/master 4fe2bd534 -> 741e3aad3
https://issues.apache.org/jira/browse/AMQ-5630 - add rejectDurableConsumers boolen attribute - when true, requests to create durable subscriptions will fail with a JMSException - not allowed Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/741e3aad Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/741e3aad Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/741e3aad Branch: refs/heads/master Commit: 741e3aad3eb455220759afdd8001311727083882 Parents: 4fe2bd5 Author: gtully <[email protected]> Authored: Tue Mar 3 13:29:36 2015 +0000 Committer: gtully <[email protected]> Committed: Tue Mar 3 13:30:10 2015 +0000 ---------------------------------------------------------------------- .../apache/activemq/broker/BrokerService.java | 9 ++ .../activemq/broker/region/TopicRegion.java | 3 + .../activemq/BrokerDurableRejectedTest.java | 124 +++++++++++++++++++ 3 files changed, 136 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/741e3aad/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index b250d32..f37381c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -249,6 +249,7 @@ public class BrokerService implements Service { private boolean restartAllowed = true; private boolean restartRequested = false; + private boolean rejectDurableConsumers = false; private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION; @@ -3053,4 +3054,12 @@ public class BrokerService implements Service { public void incrementTotalConnections() { this.totalConnections.incrementAndGet(); } + + public boolean isRejectDurableConsumers() { + return rejectDurableConsumers; + } + + public void setRejectDurableConsumers(boolean rejectDurableConsumers) { + this.rejectDurableConsumers = rejectDurableConsumers; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/741e3aad/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 80088d7..d0e15cd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -109,6 +109,9 @@ public class TopicRegion extends AbstractRegion { @Override public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { if (info.isDurable()) { + if (broker.getBrokerService().isRejectDurableConsumers()) { + throw new JMSException("Durable Consumers are not allowed"); + } ActiveMQDestination destination = info.getDestination(); if (!destination.isPattern()) { // Make sure the destination is created. http://git-wip-us.apache.org/repos/asf/activemq/blob/741e3aad/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java new file mode 100755 index 0000000..e8b75ef --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/BrokerDurableRejectedTest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; + +public class BrokerDurableRejectedTest extends TestSupport { + + protected Connection connection; + protected Session consumeSession; + protected Destination consumerDestination; + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm:(broker:(stomp://localhost:0)?persistent=false&rejectDurableConsumers=true)"); + } + + protected void setUp() throws Exception { + super.setUp(); + + connectionFactory = createConnectionFactory(); + connection = createConnection(); + + connection.setClientID(getClass().getName()); + + consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + consumerDestination = consumeSession.createTopic("TestDurableRejected"); + connection.start(); + } + + public void testDurableTopicConsumerJms() throws Exception { + + consumeSession.createConsumer(consumerDestination); + try { + + consumeSession.createDurableSubscriber((Topic)consumerDestination, getName()); + fail("Expect not allowed jms exception on durable creation"); + + } catch (JMSException expected) { + assertTrue("expected exception", expected.getMessage().contains("not allowed")); + } + } + + public void testDurableTopicConsumerStomp() throws Exception { + + // verify stomp ok in this case + StompConnection stompConnection = new StompConnection(); + stompConnection.open("localhost", BrokerRegistry.getInstance().findFirst().getTransportConnectorByScheme("stomp").getPublishableConnectURI().getPort()); + + // connect + String frame = "CONNECT\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + // subscribe + frame = "SUBSCRIBE\n" + "destination:/topic/" + ((Topic) consumerDestination).getTopicName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + assertTrue("contains expected message -" + frame, frame.contains("not allowed")); + + frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + + public void testDurableTopicConsumerStompWithReceipt() throws Exception { + + // verify stomp ok in this case + StompConnection stompConnection = new StompConnection(); + stompConnection.open("localhost", BrokerRegistry.getInstance().findFirst().getTransportConnectorByScheme("stomp").getPublishableConnectURI().getPort()); + + // connect + String frame = "CONNECT\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + // subscribe + frame = "SUBSCRIBE\n" + "destination:/topic/" + ((Topic) consumerDestination).getTopicName() + "\nreceipt:1\n" + + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + assertTrue("contains expected message -" + frame, frame.contains("not allowed")); + + frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + +}
