Author: chirino
Date: Wed Mar 18 18:04:17 2009
New Revision: 755682
URL: http://svn.apache.org/viewvc?rev=755682&view=rev
Log:
Added virtual hosts to the broker model. Some protocols may be able to take
advantage of it.. if not, they can use the default virtual host.
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
Removed:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
Wed Mar 18 18:04:17 2009
@@ -26,7 +26,7 @@
public class BrokerConnection extends Connection {
- protected Broker broker;
+ protected MessageBroker broker;
private ProtocolHandler protocolHandler;
public BrokerConnection() {
@@ -41,11 +41,11 @@
});
}
- public Broker getBroker() {
+ public MessageBroker getBroker() {
return broker;
}
- public void setBroker(Broker broker) {
+ public void setBroker(MessageBroker broker) {
this.broker = broker;
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
Wed Mar 18 18:04:17 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker;
+import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.flow.IFlowSink;
public interface DeliveryTarget {
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
Wed Mar 18 18:04:17 2009
@@ -18,6 +18,7 @@
import java.util.Collection;
+import org.apache.activemq.broker.Destination;
import org.apache.activemq.protobuf.AsciiBuffer;
public interface Destination {
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
Wed Mar 18 18:04:17 2009
@@ -18,6 +18,8 @@
import java.util.Collection;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.protobuf.AsciiBuffer;
/**
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=755682&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
(added)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
Wed Mar 18 18:04:17 2009
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.transport.DispatchableTransportServer;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+
+public class MessageBroker implements TransportAcceptListener {
+
+ public static final int MAX_USER_PRIORITY = 10;
+ public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
+
+ final ArrayList<Connection> clientConnections = new
ArrayList<Connection>();
+ private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new
LinkedHashMap<AsciiBuffer, VirtualHost>();
+ private VirtualHost defaultVirtualHost;
+
+ private TransportServer transportServer;
+ private String bindUri;
+ private String connectUri;
+ private String name;
+ private IDispatcher dispatcher;
+ private final AtomicBoolean stopping = new AtomicBoolean();
+
+ public String getName() {
+ return name;
+ }
+
+ public final void stop() throws Exception {
+ stopping.set(true);
+ transportServer.stop();
+
+ for (Connection connection : clientConnections) {
+ connection.stop();
+ }
+
+ for (VirtualHost virtualHost : virtualHosts.values()) {
+ virtualHost.stop();
+ }
+ dispatcher.shutdown();
+
+ }
+
+ public final void start() throws Exception {
+ dispatcher.start();
+
+ for (VirtualHost virtualHost : virtualHosts.values()) {
+ virtualHost.start();
+ }
+
+ transportServer = TransportFactory.bind(new URI(bindUri));
+ transportServer.setAcceptListener(this);
+ if (transportServer instanceof DispatchableTransportServer) {
+ ((DispatchableTransportServer)
transportServer).setDispatcher(dispatcher);
+ }
+ transportServer.start();
+
+ }
+
+ public void onAccept(final Transport transport) {
+ BrokerConnection connection = new BrokerConnection();
+ connection.setBroker(this);
+ connection.setTransport(transport);
+ connection.setPriorityLevels(MAX_PRIORITY);
+ connection.setDispatcher(dispatcher);
+ clientConnections.add(connection);
+ try {
+ connection.start();
+ } catch (Exception e1) {
+ onAcceptError(e1);
+ }
+ }
+
+ public void onAcceptError(Exception error) {
+ System.out.println("Accept error: " + error);
+ error.printStackTrace();
+ }
+
+ public IDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public String getBindUri() {
+ return bindUri;
+ }
+
+ public void setBindUri(String uri) {
+ this.bindUri = uri;
+ }
+
+ public boolean isStopping() {
+ return stopping.get();
+ }
+
+ public String getConnectUri() {
+ return connectUri;
+ }
+
+ public void setConnectUri(String connectUri) {
+ this.connectUri = connectUri;
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Virtual Host Related Opperations
+ // /////////////////////////////////////////////////////////////////
+ public VirtualHost getDefaultVirtualHost() {
+ synchronized (virtualHosts) {
+ if( defaultVirtualHost==null ) {
+ defaultVirtualHost = new VirtualHost();
+ }
+ return defaultVirtualHost;
+ }
+ }
+
+ public void setDefaultVirtualHost(VirtualHost defaultVirtualHost) {
+ synchronized (virtualHosts) {
+ this.defaultVirtualHost = defaultVirtualHost;
+ }
+ }
+
+ public void addVirtualHost(VirtualHost host) throws Exception {
+ synchronized (virtualHosts) {
+ // Make sure it's valid.
+ ArrayList<AsciiBuffer> hostNames = host.getHostNames();
+ if (hostNames.isEmpty()) {
+ throw new Exception("Virtual host must be configured with at
least one host name.");
+ }
+ for (AsciiBuffer name : hostNames) {
+ if (virtualHosts.containsKey(name)) {
+ throw new Exception("Virtual host with host name " + name
+ " already exists.");
+ }
+ }
+
+ // Register it.
+ for (AsciiBuffer name : hostNames) {
+ virtualHosts.put(name, host);
+ }
+
+ // The first virtual host defined is the default virtual host.
+ if (virtualHosts.size() == 1) {
+ setDefaultVirtualHost(host);
+ }
+ }
+ }
+
+ public synchronized void removeVirtualHost(VirtualHost host) throws
Exception {
+ synchronized (virtualHosts) {
+ for (AsciiBuffer name : host.getHostNames()) {
+ virtualHosts.remove(name);
+ }
+ // Was the default virtual host removed? Set the default to the
next virtual host.
+ if( host == defaultVirtualHost ) {
+ if( virtualHosts.isEmpty() ) {
+ defaultVirtualHost = null;
+ } else {
+ defaultVirtualHost =
virtualHosts.values().iterator().next();
+ }
+ }
+ }
+ }
+
+ public VirtualHost getVirtualHost(AsciiBuffer name) {
+ synchronized (virtualHosts) {
+ return virtualHosts.get(name);
+ }
+ }
+
+ public synchronized Collection<VirtualHost> getVirtualHosts() {
+ synchronized (virtualHosts) {
+ return new ArrayList<VirtualHost>(virtualHosts.values());
+ }
+ }
+
+}
\ No newline at end of file
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
Wed Mar 18 18:04:17 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker;
+import org.apache.activemq.broker.Destination;
import org.apache.activemq.protobuf.AsciiBuffer;
public interface MessageDelivery {
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
Wed Mar 18 18:04:17 2009
@@ -18,6 +18,9 @@
import java.util.HashMap;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PrioritySizeLimiter;
@@ -35,7 +38,7 @@
HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new
HashMap<DeliveryTarget, Subscription<MessageDelivery>>();
private Destination destination;
private IQueue<AsciiBuffer, MessageDelivery> queue;
- private Broker broker;
+ private MessageBroker broker;
private Mapper<Integer, MessageDelivery> partitionMapper;
private Mapper<AsciiBuffer, MessageDelivery> keyExtractor;
@@ -65,8 +68,8 @@
};
private IQueue<AsciiBuffer, MessageDelivery> createSharedFlowQueue() {
- if (Broker.MAX_PRIORITY > 1) {
- PrioritySizeLimiter<MessageDelivery> limiter = new
PrioritySizeLimiter<MessageDelivery>(100, 1, Broker.MAX_PRIORITY);
+ if (MessageBroker.MAX_PRIORITY > 1) {
+ PrioritySizeLimiter<MessageDelivery> limiter = new
PrioritySizeLimiter<MessageDelivery>(100, 1, MessageBroker.MAX_PRIORITY);
limiter.setPriorityMapper(PRIORITY_MAPPER);
SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new
SharedPriorityQueue<AsciiBuffer,
MessageDelivery>(destination.getName().toString(), limiter);
queue.setKeyMapper(keyExtractor);
@@ -141,11 +144,11 @@
return true;
}
- public Broker getBroker() {
+ public MessageBroker getBroker() {
return broker;
}
- public void setBroker(Broker broker) {
+ public void setBroker(MessageBroker broker) {
this.broker = broker;
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
Wed Mar 18 18:04:17 2009
@@ -20,6 +20,10 @@
import java.util.Collection;
import java.util.HashMap;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Domain;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
import org.apache.activemq.protobuf.AsciiBuffer;
public class QueueDomain implements Domain {
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
Wed Mar 18 18:04:17 2009
@@ -20,6 +20,12 @@
import java.util.HashMap;
import java.util.HashSet;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.Domain;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.QueueDomain;
+import org.apache.activemq.broker.TopicDomain;
import org.apache.activemq.protobuf.AsciiBuffer;
final public class Router {
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
Wed Mar 18 18:04:17 2009
@@ -20,6 +20,9 @@
import java.util.Collection;
import java.util.HashMap;
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Domain;
+import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.protobuf.AsciiBuffer;
public class TopicDomain implements Domain {
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=755682&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
(added)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
Wed Mar 18 18:04:17 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+/**
+ * @author chirino
+ */
+public class VirtualHost implements Service {
+
+ private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
+ private Router router = new Router();
+ private HashMap<Destination, Queue> queues = new HashMap<Destination,
Queue>();
+
+ public AsciiBuffer getHostName() {
+ if( hostNames.size() > 0 ) {
+ hostNames.get(0);
+ }
+ return null;
+ }
+
+ public ArrayList<AsciiBuffer> getHostNames() {
+ return hostNames;
+ }
+ public void setHostNames(ArrayList<AsciiBuffer> hostNames) {
+ this.hostNames = hostNames;
+ }
+
+ public Router getRouter() {
+ return router;
+ }
+
+ public void start() throws Exception {
+ for (Queue queue : queues.values()) {
+ queue.start();
+ }
+ }
+ public void stop() throws Exception {
+ for (Queue queue : queues.values()) {
+ queue.stop();
+ }
+ }
+
+ public void addQueue(Queue queue) {
+ Domain domain = router.getDomain(queue.getDestination().getDomain());
+ domain.add(queue.getDestination().getName(), queue);
+ }
+
+
+}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Wed Mar 18 18:04:17 2009
@@ -91,7 +91,8 @@
protected IFlowController<MessageDelivery> inboundController;
protected BrokerConnection connection;
- private OpenWireFormat wireFormat;
+ private OpenWireFormat wireFormat;
+ private Router router;
public void start() throws Exception {
// Setup the inbound processing..
@@ -137,7 +138,7 @@
public Response processAddConsumer(ConsumerInfo info) throws
Exception {
ConsumerContext ctx = new ConsumerContext(info);
consumers.put(info.getConsumerId(), ctx);
-
connection.getBroker().getRouter().bind(convert(info.getDestination()), ctx);
+ router.bind(convert(info.getDestination()), ctx);
return ack(command);
}
@@ -468,7 +469,7 @@
// Consider doing some caching of this target list. Most producers
// always send to
// the same destination.
- Collection<DeliveryTarget> targets =
connection.getBroker().getRouter().route(elem);
+ Collection<DeliveryTarget> targets = router.route(elem);
final Message message = ((OpenWireMessageDelivery) elem).getMessage();
if (targets != null) {
@@ -556,6 +557,7 @@
public void setConnection(BrokerConnection connection) {
this.connection = connection;
+ this.router =
connection.getBroker().getDefaultVirtualHost().getRouter();
}
public void setWireFormat(WireFormat wireFormat) {
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
Wed Mar 18 18:04:17 2009
@@ -23,6 +23,8 @@
import javax.jms.JMSException;
import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.stomp.FrameTranslator;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.stomp.ProtocolException;
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
Wed Mar 18 18:04:17 2009
@@ -25,6 +25,9 @@
import javax.jms.JMSException;
+import org.apache.activemq.broker.stomp.FrameTranslator;
+import org.apache.activemq.broker.stomp.LegacyFrameTranslator;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
Wed Mar 18 18:04:17 2009
@@ -24,6 +24,8 @@
import org.apache.activemq.broker.Destination;
import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.stomp.FrameTranslator;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Wed Mar 18 18:04:17 2009
@@ -78,6 +78,7 @@
private SingleFlowRelay<MessageDelivery> outboundQueue;
private HashMap<AsciiBuffer, ConsumerContext> allSentMessageIds = new
HashMap<AsciiBuffer, ConsumerContext>();
+ private Router router;
protected FrameTranslator translator(StompFrame frame) {
try {
@@ -113,7 +114,7 @@
public void onStompFrame(StompFrame frame) throws Exception {
ConsumerContext ctx = new ConsumerContext(frame);
consumers.put(ctx.stompDestination, ctx);
- connection.getBroker().getRouter().bind(ctx.destination, ctx);
+ router.bind(ctx.destination, ctx);
ack(frame);
}
});
@@ -407,7 +408,7 @@
// Consider doing some caching of this target list. Most producers
// always send to
// the same destination.
- Collection<DeliveryTarget> targets =
connection.getBroker().getRouter().route(messageDelivery);
+ Collection<DeliveryTarget> targets = router.route(messageDelivery);
final StompMessageDelivery smd = ((StompMessageDelivery)
messageDelivery);
String receiptId = smd.getReceiptId();
if (targets != null) {
@@ -481,6 +482,7 @@
public void setConnection(BrokerConnection connection) {
this.connection = connection;
+ this.router =
connection.getBroker().getDefaultVirtualHost().getRouter();
}
public void setWireFormat(WireFormat wireFormat) {
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
Wed Mar 18 18:04:17 2009
@@ -26,6 +26,11 @@
import junit.framework.TestCase;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageBroker;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
+import org.apache.activemq.broker.Router;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.dispatch.PriorityDispatcher;
import org.apache.activemq.metric.MetricAggregator;
@@ -71,9 +76,9 @@
protected MetricAggregator totalProducerRate = new
MetricAggregator().name("Aggregate Producer Rate").unit("items");
protected MetricAggregator totalConsumerRate = new
MetricAggregator().name("Aggregate Consumer Rate").unit("items");
- protected Broker sendBroker;
- protected Broker rcvBroker;
- protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+ protected MessageBroker sendBroker;
+ protected MessageBroker rcvBroker;
+ protected ArrayList<MessageBroker> brokers = new
ArrayList<MessageBroker>();
protected IDispatcher dispatcher;
protected final AtomicLong msgIdGenerator = new AtomicLong();
protected final AtomicBoolean stopping = new AtomicBoolean();
@@ -117,7 +122,7 @@
}
protected IDispatcher createDispatcher() {
- return
PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher",
Broker.MAX_PRIORITY, asyncThreadPoolSize);
+ return
PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher",
MessageBroker.MAX_PRIORITY, asyncThreadPoolSize);
}
public void test_1_1_0() throws Exception {
@@ -390,10 +395,10 @@
dests[i] = bean;
if (ptp) {
Queue queue = createQueue(sendBroker, dests[i]);
- sendBroker.addQueue(queue);
+ sendBroker.getDefaultVirtualHost().addQueue(queue);
if (multibroker) {
queue = createQueue(rcvBroker, dests[i]);
- rcvBroker.addQueue(queue);
+ rcvBroker.getDefaultVirtualHost().addQueue(queue);
}
}
}
@@ -460,7 +465,7 @@
abstract protected RemoteProducer cerateProducer();
- private Queue createQueue(Broker broker, Destination destination) {
+ private Queue createQueue(MessageBroker broker, Destination destination) {
Queue queue = new Queue();
queue.setBroker(broker);
queue.setDestination(destination);
@@ -471,8 +476,8 @@
return queue;
}
- private Broker createBroker(String name, String bindURI, String
connectUri) {
- Broker broker = new Broker();
+ private MessageBroker createBroker(String name, String bindURI, String
connectUri) {
+ MessageBroker broker = new MessageBroker();
broker.setName(name);
broker.setBindUri(bindURI);
broker.setConnectUri(connectUri);
@@ -482,7 +487,7 @@
private void stopServices() throws Exception {
stopping.set(true);
- for (Broker broker : brokers) {
+ for (MessageBroker broker : brokers) {
broker.stop();
}
for (RemoteProducer connection : producers) {
@@ -497,7 +502,7 @@
}
private void startServices() throws Exception {
- for (Broker broker : brokers) {
+ for (MessageBroker broker : brokers) {
broker.start();
}
for (RemoteConsumer connection : consumers) {
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
Wed Mar 18 18:04:17 2009
@@ -4,6 +4,8 @@
import java.util.concurrent.TimeUnit;
import org.apache.activemq.Connection;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
Wed Mar 18 18:04:17 2009
@@ -4,6 +4,8 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.Connection;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
import org.apache.activemq.flow.IFlowController;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
Wed Mar 18 18:04:17 2009
@@ -10,6 +10,7 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.RemoteConsumer;
import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
Wed Mar 18 18:04:17 2009
@@ -13,6 +13,7 @@
import org.apache.activemq.broker.MessageDelivery;
import org.apache.activemq.broker.RemoteProducer;
import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;