Author: chirino
Date: Thu Jun 18 13:47:27 2009
New Revision: 786063
URL: http://svn.apache.org/viewvc?rev=786063&view=rev
Log:
Auto create queues on demand
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java?rev=786063&r1=786062&r2=786063&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
Thu Jun 18 13:47:27 2009
@@ -39,7 +39,8 @@
public BrokerMessageDelivery createMessageDelivery(MessageRecord record)
throws IOException;
- public interface ConsumerContext extends Subscription<MessageDelivery>,
IFlowSink<MessageDelivery>{
+ public interface ConsumerContext extends Subscription<MessageDelivery>,
IFlowSink<MessageDelivery> {
+
public String getConsumerId();
public Destination getDestination();
@@ -51,6 +52,13 @@
public boolean isDurable();
public String getSubscriptionName();
+
+ /**
+ * If the destination does not exist, should it automatically be
created?
+ * @return
+ */
+ public boolean autoCreateDestination();
+
}
}
\ No newline at end of file
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=786063&r1=786062&r2=786063&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
Thu Jun 18 13:47:27 2009
@@ -168,7 +168,7 @@
return queueStore;
}
- public BrokerSubscription createSubscription(ConsumerContext consumer) {
+ public BrokerSubscription createSubscription(ConsumerContext consumer)
throws Exception {
Destination destination = consumer.getDestination();
BrokerSubscription sub = null;
@@ -192,6 +192,13 @@
sub = new TopicSubscription(this, destination,
consumer.getSelectorExpression());
} else {
Queue queue = queues.get(destination.getName());
+ if( queue == null ) {
+ if( consumer.autoCreateDestination() ) {
+ queue = createQueue(destination);
+ } else {
+ throw new IllegalStateException("The queue does
not exist: "+destination.getName());
+ }
+ }
sub = new Queue.QueueSubscription(queue);
}
}
Modified:
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786063&r1=786062&r2=786063&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Thu Jun 18 13:47:27 2009
@@ -436,7 +436,7 @@
HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new
HashMap<MessageId, SubscriptionDeliveryCallback>();
LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
- public ConsumerContext(final ConsumerInfo info) throws
FilterException, UserAlreadyConnectedException {
+ public ConsumerContext(final ConsumerInfo info) throws Exception {
this.info = info;
this.name = info.getConsumerId().toString();
@@ -675,6 +675,10 @@
return offer(message, source, null);
}
+ public boolean autoCreateDestination() {
+ return true;
+ }
+
}
static public Destination convert(ActiveMQDestination dest) {
Modified:
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=786063&r1=786062&r2=786063&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Thu Jun 18 13:47:27 2009
@@ -486,6 +486,10 @@
public boolean offer(MessageDelivery elem, ISourceController<?>
source) {
return offer(elem, source, null);
}
+
+ public boolean autoCreateDestination() {
+ return true;
+ }
}
private void sendError(String message) {