Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=754964&r1=754963&r2=754964&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Mon Mar 16 17:25:23 2009 @@ -1,510 +1,20 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.activemq.broker.openwire; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.broker.BrokerTestBase; +import org.apache.activemq.broker.RemoteConsumer; +import org.apache.activemq.broker.RemoteProducer; -import junit.framework.TestCase; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.Destination; -import org.apache.activemq.broker.MessageDelivery; -import org.apache.activemq.broker.Queue; -import org.apache.activemq.broker.Router; -import org.apache.activemq.dispatch.IDispatcher; -import org.apache.activemq.dispatch.PriorityDispatcher; -import org.apache.activemq.metric.MetricAggregator; -import org.apache.activemq.metric.Period; -import org.apache.activemq.protobuf.AsciiBuffer; -import org.apache.activemq.queue.Mapper; - -public class OpenwireBrokerTest extends TestCase { - - protected static final int PERFORMANCE_SAMPLES = 3; - - protected static final int IO_WORK_AMOUNT = 0; - protected static final int FANIN_COUNT = 10; - protected static final int FANOUT_COUNT = 10; - - protected static final int PRIORITY_LEVELS = 10; - protected static final boolean USE_INPUT_QUEUES = true; - - // Set to put senders and consumers on separate brokers. - protected boolean multibroker = false; - - // Set to mockup up ptp: - protected boolean ptp = false; - - // Set to use tcp IO - protected boolean tcp = true; - // set to force marshalling even in the NON tcp case. - protected boolean forceMarshalling = false; - - protected String sendBrokerBindURI; - protected String receiveBrokerBindURI; - protected String sendBrokerConnectURI; - protected String receiveBrokerConnectURI; - - // Set's the number of threads to use: - protected final int asyncThreadPoolSize = Runtime.getRuntime().availableProcessors(); - protected boolean usePartitionedQueue = false; - - protected int producerCount; - protected int consumerCount; - protected int destCount; - - protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items"); - protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items"); - - protected Broker sendBroker; - protected Broker rcvBroker; - protected ArrayList<Broker> brokers = new ArrayList<Broker>(); - protected IDispatcher dispatcher; - protected final AtomicLong msgIdGenerator = new AtomicLong(); - protected final AtomicBoolean stopping = new AtomicBoolean(); - - final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>(); - final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>(); - - static public final Mapper<AsciiBuffer, MessageDelivery> KEY_MAPPER = new Mapper<AsciiBuffer, MessageDelivery>() { - public AsciiBuffer map(MessageDelivery element) { - return element.getMsgId(); - } - }; - static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() { - public Integer map(MessageDelivery element) { - // we modulo 10 to have at most 10 partitions which the producers - // gets split across. - return (int) (element.getProducerId().hashCode() % 10); - } - }; +public class OpenwireBrokerTest extends BrokerTestBase { @Override - protected void setUp() throws Exception { - dispatcher = createDispatcher(); - dispatcher.start(); - if (tcp) { - sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi"; - receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi"; - sendBrokerConnectURI = "tcp://localhost:10000"; - receiveBrokerConnectURI = "tcp://localhost:20000"; - } else { - if (forceMarshalling) { - sendBrokerBindURI = "pipe://SendBroker"; - receiveBrokerBindURI = "pipe://ReceiveBroker"; - } else { - sendBrokerBindURI = "pipe://SendBroker"; - receiveBrokerBindURI = "pipe://ReceiveBroker"; - } - sendBrokerConnectURI = sendBrokerBindURI; - receiveBrokerConnectURI = receiveBrokerBindURI; - } + protected RemoteProducer cerateProducer() { + return new OpenwireRemoteProducer(); } - protected IDispatcher createDispatcher() { - return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize); + @Override + protected RemoteConsumer createConsumer() { + return new OpenwireRemoteConsumer(); } - public void test_1_1_0() throws Exception { - producerCount = 1; - destCount = 1; - - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - public void test_1_1_1() throws Exception { - producerCount = 1; - destCount = 1; - consumerCount = 1; - - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - public void test_10_1_10() throws Exception { - producerCount = FANIN_COUNT; - consumerCount = FANOUT_COUNT; - destCount = 1; - - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - public void test_10_1_1() throws Exception { - producerCount = FANIN_COUNT; - destCount = 1; - consumerCount = 1; - - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - public void test_1_1_10() throws Exception { - producerCount = 1; - destCount = 1; - consumerCount = FANOUT_COUNT; - - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - public void test_2_2_2() throws Exception { - producerCount = 2; - destCount = 2; - consumerCount = 2; - - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - public void test_10_10_10() throws Exception { - producerCount = 10; - destCount = 10; - consumerCount = 10; - - createConnections(); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - - /** - * Tests 2 producers sending to 1 destination with 2 consumres, but with - * consumers set to select only messages from each producer. 1 consumers is - * set to slow, the other producer should be able to send quickly. - * - * @throws Exception - */ - public void test_2_2_2_SlowConsumer() throws Exception { - producerCount = 2; - destCount = 2; - consumerCount = 2; - - createConnections(); - consumers.get(0).setThinkTime(50); - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - public void test_2_2_2_Selector() throws Exception { - producerCount = 2; - destCount = 2; - consumerCount = 2; - - createConnections(); - - // Add properties to match producers to their consumers - for (int i = 0; i < consumerCount; i++) { - String property = "match" + i; - consumers.get(i).setSelector(property); - producers.get(i).setProperty(property); - } - - // Start 'em up. - startServices(); - try { - reportRates(); - } finally { - stopServices(); - } - } - - /** - * Test sending with 1 high priority sender. The high priority sender should - * have higher throughput than the other low priority senders. - * - * @throws Exception - */ - public void test_2_1_1_HighPriorityProducer() throws Exception { - - producerCount = 2; - destCount = 1; - consumerCount = 1; - - createConnections(); - RemoteProducer producer = producers.get(0); - producer.setPriority(1); - producer.getRate().setName("High Priority Producer Rate"); - - consumers.get(0).setThinkTime(1); - - // Start 'em up. - startServices(); - try { - - System.out.println("Checking rates for test: " + getName()); - for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { - Period p = new Period(); - Thread.sleep(1000 * 5); - System.out.println(producer.getRate().getRateSummary(p)); - System.out.println(totalProducerRate.getRateSummary(p)); - System.out.println(totalConsumerRate.getRateSummary(p)); - totalProducerRate.reset(); - totalConsumerRate.reset(); - } - - } finally { - stopServices(); - } - } - - /** - * Test sending with 1 high priority sender. The high priority sender should - * have higher throughput than the other low priority senders. - * - * @throws Exception - */ - public void test_2_1_1_MixedHighPriorityProducer() throws Exception { - producerCount = 2; - destCount = 1; - consumerCount = 1; - - createConnections(); - RemoteProducer producer = producers.get(0); - producer.setPriority(1); - producer.setPriorityMod(3); - producer.getRate().setName("High Priority Producer Rate"); - - consumers.get(0).setThinkTime(1); - - // Start 'em up. - startServices(); - try { - - System.out.println("Checking rates for test: " + getName()); - for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { - Period p = new Period(); - Thread.sleep(1000 * 5); - System.out.println(producer.getRate().getRateSummary(p)); - System.out.println(totalProducerRate.getRateSummary(p)); - System.out.println(totalConsumerRate.getRateSummary(p)); - totalProducerRate.reset(); - totalConsumerRate.reset(); - } - - } finally { - stopServices(); - } - } - - private void reportRates() throws InterruptedException { - System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic")); - for (int i = 0; i < PERFORMANCE_SAMPLES; i++) { - Period p = new Period(); - Thread.sleep(1000 * 5); - System.out.println(totalProducerRate.getRateSummary(p)); - System.out.println(totalConsumerRate.getRateSummary(p)); - totalProducerRate.reset(); - totalConsumerRate.reset(); - } - } - - private void createConnections() throws IOException, URISyntaxException { - - if (multibroker) { - sendBroker = createBroker("SendBroker", sendBrokerBindURI, sendBrokerConnectURI); - rcvBroker = createBroker("RcvBroker", receiveBrokerBindURI, receiveBrokerConnectURI); - brokers.add(sendBroker); - brokers.add(rcvBroker); - } else { - sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI); - brokers.add(sendBroker); - } - - Destination[] dests = new Destination[destCount]; - - for (int i = 0; i < destCount; i++) { - Destination.SingleDestination bean = new Destination.SingleDestination(); - bean.setName(new AsciiBuffer("dest" + (i + 1))); - bean.setDomain(ptp ? Router.QUEUE_DOMAIN : Router.TOPIC_DOMAIN); - dests[i] = bean; - if (ptp) { - Queue queue = createQueue(sendBroker, dests[i]); - sendBroker.addQueue(queue); - if (multibroker) { - queue = createQueue(rcvBroker, dests[i]); - rcvBroker.addQueue(queue); - } - } - } - - for (int i = 0; i < producerCount; i++) { - Destination destination = dests[i % destCount]; - RemoteProducer producer = createProducer(i, destination); - producers.add(producer); - } - - for (int i = 0; i < consumerCount; i++) { - Destination destination = dests[i % destCount]; - RemoteConsumer consumer = createConsumer(i, destination); - consumers.add(consumer); - } - - // Create MultiBroker connections: - // if (multibroker) { - // Pipe<Message> pipe = new Pipe<Message>(); - // sendBroker.createBrokerConnection(rcvBroker, pipe); - // rcvBroker.createBrokerConnection(sendBroker, pipe.connect()); - // } - } - - private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException { - RemoteConsumer consumer = new RemoteConsumer() { - public void onException(Exception error) { - if( !stopping.get() ) { - System.err.println("Consumer Async Error:"); - error.printStackTrace(); - } - } - }; - consumer.setUri(new URI(rcvBroker.getConnectUri())); - consumer.setDestination(destination); - consumer.setName("consumer" + (i + 1)); - consumer.setTotalConsumerRate(totalConsumerRate); - consumer.setDispatcher(dispatcher); - return consumer; - } - - private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException { - RemoteProducer producer = new RemoteProducer() { - public void onException(Exception error) { - if( !stopping.get() ) { - System.err.println("Producer Async Error:"); - error.printStackTrace(); - } - } - }; - producer.setUri(new URI(sendBroker.getConnectUri())); - producer.setProducerId(id + 1); - producer.setName("producer" + (id + 1)); - producer.setDestination(destination); - producer.setMessageIdGenerator(msgIdGenerator); - producer.setTotalProducerRate(totalProducerRate); - producer.setDispatcher(dispatcher); - return producer; - } - - private Queue createQueue(Broker broker, Destination destination) { - Queue queue = new Queue(); - queue.setBroker(broker); - queue.setDestination(destination); - queue.setKeyExtractor(KEY_MAPPER); - if (usePartitionedQueue) { - queue.setPartitionMapper(PARTITION_MAPPER); - } - return queue; - } - - private Broker createBroker(String name, String bindURI, String connectUri) { - Broker broker = new Broker(); - broker.setName(name); - broker.setBindUri(bindURI); - broker.setConnectUri(connectUri); - broker.setDispatcher(dispatcher); - return broker; - } - - private void stopServices() throws Exception { - stopping.set(true); - for (Broker broker : brokers) { - broker.stop(); - } - for (RemoteProducer connection : producers) { - connection.stop(); - } - for (RemoteConsumer connection : consumers) { - connection.stop(); - } - if (dispatcher != null) { - dispatcher.shutdown(); - } - } - - private void startServices() throws Exception { - for (Broker broker : brokers) { - broker.start(); - } - for (RemoteConsumer connection : consumers) { - connection.start(); - } - - for (RemoteProducer connection : producers) { - connection.start(); - } - } }
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,104 @@ +package org.apache.activemq.broker.openwire; + +import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createConsumerInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo; + +import java.io.IOException; + +import org.apache.activemq.WindowLimiter; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.RemoteConsumer; +import org.apache.activemq.broker.Router; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.ISinkController.FlowControllable; +import org.apache.activemq.transport.InactivityMonitor; +import org.apache.activemq.transport.tcp.TcpTransport; + +public class OpenwireRemoteConsumer extends RemoteConsumer { + + protected final Object inboundMutex = new Object(); + private FlowController<MessageDelivery> inboundController; + + private ActiveMQDestination activemqDestination; + private ConnectionInfo connectionInfo; + private SessionInfo sessionInfo; + private ConsumerInfo consumerInfo; + + private Message lastMessage; + + protected void initialize() { + // Setup the input processing.. + final Flow flow = new Flow("client-"+name+"-inbound", false); + inputResumeThreshold = inputWindowSize/2; + WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, inputResumeThreshold) { + protected void sendCredit(int credit) { + MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE); + write(ack); + } + }; + inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>() { + public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) { + messageReceived(controller, elem); + } + public String toString() { + return flow.getFlowName(); + } + public IFlowSink<MessageDelivery> getFlowSink() { + return null; + } + public IFlowSource<MessageDelivery> getFlowSource() { + return null; + } + }, flow, limiter, inboundMutex); + + } + + protected void setupSubscription() throws Exception, IOException { + if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) { + activemqDestination = new ActiveMQQueue(destination.getName().toString()); + } else { + activemqDestination = new ActiveMQTopic(destination.getName().toString()); + } + + connectionInfo = createConnectionInfo(name); + transport.oneway(connectionInfo); + sessionInfo = createSessionInfo(connectionInfo); + transport.oneway(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, activemqDestination); + consumerInfo.setPrefetchSize(inputWindowSize); + transport.oneway(consumerInfo); + } + + public void onCommand(Object command) { + try { + if (command.getClass() == WireFormatInfo.class) { + } else if (command.getClass() == BrokerInfo.class) { + System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName()); + } else if (command.getClass() == MessageDispatch.class) { + MessageDispatch msg = (MessageDispatch) command; + lastMessage = msg.getMessage(); + inboundController.add(new OpenWireMessageDelivery(msg.getMessage()), null); + } else { + onException(new Exception("Unrecognized command: " + command)); + } + } catch (Exception e) { + onException(e); + } + } +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,106 @@ +package org.apache.activemq.broker.openwire; + +import static org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createMessage; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createProducerInfo; +import static org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo; + +import java.io.IOException; + +import javax.jms.JMSException; + +import org.apache.activemq.WindowLimiter; +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.RemoteProducer; +import org.apache.activemq.broker.Router; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.IFlowDrain; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.queue.SingleFlowRelay; + +public class OpenwireRemoteProducer extends RemoteProducer { + private ConnectionInfo connectionInfo; + private SessionInfo sessionInfo; + private ProducerInfo producerInfo; + private ActiveMQDestination activemqDestination; + private WindowLimiter<MessageDelivery> outboundLimiter; + + protected void setupProducer() throws Exception, IOException { + if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) { + activemqDestination = new ActiveMQQueue(destination.getName().toString()); + } else { + activemqDestination = new ActiveMQTopic(destination.getName().toString()); + } + + connectionInfo = createConnectionInfo(name); + transport.oneway(connectionInfo); + sessionInfo = createSessionInfo(connectionInfo); + transport.oneway(sessionInfo); + producerInfo = createProducerInfo(sessionInfo); + producerInfo.setWindowSize(outputWindowSize); + transport.oneway(producerInfo); + } + + protected void initialize() { + Flow flow = new Flow("client-"+name+"-outbound", false); + outputResumeThreshold = outputWindowSize/2; + outboundLimiter = new WindowLimiter<MessageDelivery>(true, flow, outputWindowSize, outputResumeThreshold); + SingleFlowRelay<MessageDelivery> outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), outboundLimiter); + this.outboundQueue = outboundQueue; + + outboundController = outboundQueue.getFlowController(flow); + outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() { + public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) { + Message msg = message.asType(Message.class); + write(msg); + } + }); + } + + public void onCommand(Object command) { + try { + if (command.getClass() == WireFormatInfo.class) { + } else if (command.getClass() == BrokerInfo.class) { + System.out.println("Producer "+name+" connected to "+((BrokerInfo)command).getBrokerName()); + } else if (command.getClass() == ProducerAck.class) { + ProducerAck fc = (ProducerAck) command; + synchronized (outboundQueue) { + outboundLimiter.onProtocolCredit(fc.getSize()); + } + } else { + onException(new Exception("Unrecognized command: " + command)); + } + } catch (Exception e) { + onException(e); + } + } + + protected void createNextMessage() { + int priority = this.priority; + if (priorityMod > 0) { + priority = counter % priorityMod == 0 ? 0 : priority; + } + + ActiveMQTextMessage msg = createMessage(producerInfo, activemqDestination, priority, createPayload()); + if (property != null) { + try { + msg.setStringProperty(property, property); + } catch (JMSException e) { + new RuntimeException(e); + } + } + next = new OpenWireMessageDelivery(msg); + } +} + Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompBrokerTest.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,31 @@ +package org.apache.activemq.broker.openwire.stomp; + +import org.apache.activemq.broker.BrokerTestBase; +import org.apache.activemq.broker.RemoteConsumer; +import org.apache.activemq.broker.RemoteProducer; + +public class StompBrokerTest extends BrokerTestBase { + + @Override + protected void setUp() throws Exception { + super.setUp(); + if (tcp) { + sendBrokerBindURI = "tcp://localhost:10000?wireFormat=multi&transport.useInactivityMonitor=false"; + receiveBrokerBindURI = "tcp://localhost:20000?wireFormat=multi&transport.useInactivityMonitor=false"; + sendBrokerConnectURI = "tcp://localhost:10000?wireFormat=stomp&useInactivityMonitor=false"; + receiveBrokerConnectURI = "tcp://localhost:20000?wireFormat=stomp&useInactivityMonitor=false"; + } + } + + @Override + protected RemoteProducer cerateProducer() { + return new StompRemoteProducer(); + } + + @Override + protected RemoteConsumer createConsumer() { + return new StompRemoteConsumer(); + } + + +} Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,88 @@ +package org.apache.activemq.broker.openwire.stomp; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.RemoteConsumer; +import org.apache.activemq.broker.Router; +import org.apache.activemq.broker.stomp.StompMessageDelivery; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.FlowController; +import org.apache.activemq.flow.IFlowSink; +import org.apache.activemq.flow.IFlowSource; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.flow.ISinkController.FlowControllable; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; + +public class StompRemoteConsumer extends RemoteConsumer { + + protected final Object inboundMutex = new Object(); + private FlowController<MessageDelivery> inboundController; + private String stompDestination; + + + protected void setupSubscription() throws Exception, IOException { + if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) { + stompDestination = "/queue/"+destination.getName().toString(); + } else { + stompDestination = "/topic/"+destination.getName().toString(); + } + + StompFrame frame = new StompFrame(Stomp.Commands.CONNECT); + transport.oneway(frame); + + HashMap<String, String> headers = new HashMap<String, String>(); + headers.put(Stomp.Headers.Subscribe.DESTINATION, stompDestination); + headers.put(Stomp.Headers.Subscribe.ID, "0001"); + headers.put(Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO); + + frame = new StompFrame(Stomp.Commands.SUBSCRIBE, headers); + transport.oneway(frame); + + } + + protected void initialize() { + // Setup the input processing.. + final Flow flow = new Flow("client-"+name+"-inbound", false); + inputResumeThreshold = inputWindowSize/2; + SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold); + inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>() { + public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) { + messageReceived(controller, elem); + } + public String toString() { + return flow.getFlowName(); + } + public IFlowSink<MessageDelivery> getFlowSink() { + return null; + } + public IFlowSource<MessageDelivery> getFlowSource() { + return null; + } + }, flow, limiter, inboundMutex); + } + + public void onCommand(Object command) { + try { + if (command.getClass() == StompFrame.class) { + StompFrame frame = (StompFrame) command; + if( Stomp.Responses.MESSAGE.equals(frame.getAction()) ) { + StompMessageDelivery md = new StompMessageDelivery(frame, getDestination()); + while(!inboundController.offer(md, null) ) { + inboundController.waitForFlowUnblock(); + } + } else if( Stomp.Responses.CONNECTED.equals(frame.getAction()) ) { + } else { + onException(new Exception("Unrecognized stomp command: " + frame.getAction())); + } + } else { + onException(new Exception("Unrecognized command: " + command)); + } + } catch (Exception e) { + onException(e); + } + } +} \ No newline at end of file Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java?rev=754964&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java (added) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java Mon Mar 16 17:25:23 2009 @@ -0,0 +1,93 @@ +package org.apache.activemq.broker.openwire.stomp; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.activemq.broker.MessageDelivery; +import org.apache.activemq.broker.RemoteProducer; +import org.apache.activemq.broker.Router; +import org.apache.activemq.broker.stomp.StompMessageDelivery; +import org.apache.activemq.flow.Flow; +import org.apache.activemq.flow.IFlowDrain; +import org.apache.activemq.flow.ISourceController; +import org.apache.activemq.flow.SizeLimiter; +import org.apache.activemq.queue.SingleFlowRelay; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; + +public class StompRemoteProducer extends RemoteProducer { + + private String stompDestination; + + protected void setupProducer() throws Exception, IOException { + if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) { + stompDestination = "/queue/"+destination.getName().toString(); + } else { + stompDestination = "/topic/"+destination.getName().toString(); + } + + StompFrame frame = new StompFrame(Stomp.Commands.CONNECT); + transport.oneway(frame); + + } + + protected void initialize() { + Flow flow = new Flow("client-"+name+"-outbound", false); + outputResumeThreshold = outputWindowSize/2; + SizeLimiter<MessageDelivery> outboundLimiter = new SizeLimiter<MessageDelivery>(outputWindowSize, outputResumeThreshold); + SingleFlowRelay<MessageDelivery> outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), outboundLimiter); + this.outboundQueue = outboundQueue; + + outboundController = outboundQueue.getFlowController(flow); + outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() { + public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) { + StompFrame msg = message.asType(StompFrame.class); + write(msg); + } + }); + } + + public void onCommand(Object command) { + try { + if (command.getClass() == StompFrame.class) { + StompFrame frame = (StompFrame) command; + if( Stomp.Responses.CONNECTED.equals(frame.getAction()) ) { + + } else { + onException(new Exception("Unrecognized stomp command: " + frame.getAction())); + } + } else { + onException(new Exception("Unrecognized command: " + command)); + } + } catch (Exception e) { + onException(e); + } + } + + protected void createNextMessage() { + int priority = this.priority; + if (priorityMod > 0) { + priority = counter % priorityMod == 0 ? 0 : priority; + } + + HashMap<String, String> headers = new HashMap<String, String>(5); + headers.put(Stomp.Headers.Send.DESTINATION, stompDestination); + + if (property != null) { + headers.put(property, property); + } + + StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers, toContent(createPayload())); + next = new StompMessageDelivery(fram, getDestination()); + } + + private byte[] toContent(String data) { + byte rc[] = new byte[data.length()]; + char[] chars = data.toCharArray(); + for (int i = 0; i < chars.length; i++) { + rc[i] = (byte)(chars[i] & 0xFF); + } + return rc; + } +} +
