Author: chirino
Date: Thu Mar 12 20:32:50 2009
New Revision: 753003
URL: http://svn.apache.org/viewvc?rev=753003&view=rev
Log:
Consumer actually gets messages now... just need to debug the flow control now.
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
Removed:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/Openwire2Support.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
Thu Mar 12 20:32:50 2009
@@ -21,8 +21,8 @@
queues.get(name).addConsumer(deliveryTarget);
}
- public Collection<DeliveryTarget> route(MessageDelivery name) {
- Queue queue = queues.get(name);
+ public Collection<DeliveryTarget> route(MessageDelivery delivery) {
+ Queue queue = queues.get(delivery.getDestination().getName());
if( queue!=null ) {
ArrayList<DeliveryTarget> rc = new ArrayList<DeliveryTarget>(1);
rc.add(queue);
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
Thu Mar 12 20:32:50 2009
@@ -25,8 +25,8 @@
targets.add(target);
}
- public Collection<DeliveryTarget> route(MessageDelivery name) {
- return topicsTargets.get(name);
+ public Collection<DeliveryTarget> route(MessageDelivery delivery) {
+ return topicsTargets.get(delivery.getDestination().getName());
}
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
Thu Mar 12 20:32:50 2009
@@ -60,7 +60,6 @@
import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.transport.DispatchableTransport;
public class OpenwireBrokerConnection extends BrokerConnection {
@@ -363,25 +362,20 @@
}
};
queue = new SingleFlowRelay<MessageDelivery>(flow, name +
"-outbound", limiter);
- if (transport instanceof DispatchableTransport) {
- queue.setDrain(new IFlowDrain<MessageDelivery>() {
- public void drain(MessageDelivery message,
ISourceController<MessageDelivery> controller) {
- write(message);
- }
- });
-
- } else {
- queue.setDrain(new IFlowDrain<MessageDelivery>() {
- public void drain(final MessageDelivery message,
ISourceController<MessageDelivery> controller) {
- write(message);
- };
- });
- }
+ queue.setDrain(new IFlowDrain<MessageDelivery>() {
+ public void drain(final MessageDelivery message,
ISourceController<MessageDelivery> controller) {
+ Message msg = message.asType(Message.class);
+ MessageDispatch md = new MessageDispatch();
+ md.setConsumerId(info.getConsumerId());
+ md.setMessage(msg);
+ md.setDestination(msg.getDestination());
+ write(md);
+ };
+ });
}
public IFlowSink<MessageDelivery> getSink() {
- // TODO Auto-generated method stub
- return null;
+ return queue;
}
public boolean match(MessageDelivery message) {
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=753003&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
Thu Mar 12 20:32:50 2009
@@ -0,0 +1,82 @@
+package org.apache.activemq.broker.openwire;
+
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class OpenwireSupport {
+
+ static private long idGenerator;
+ static private long msgIdGenerator;
+
+ public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ public static RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+ return consumerInfo.createRemoveCommand();
+ }
+
+ public static ProducerInfo createProducerInfo(SessionInfo sessionInfo)
throws Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
+ }
+
+ public static SessionInfo createSessionInfo(ConnectionInfo connectionInfo)
throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ public static ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" +
(++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo,
ActiveMQDestination destination) {
+ return createMessage(producerInfo, destination, 4, null);
+ }
+
+ public static ActiveMQTextMessage createMessage(ProducerInfo producerInfo,
ActiveMQDestination destination, int priority, String payload) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setJMSPriority(priority);
+ message.setProducerId(producerInfo.getProducerId());
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ if( payload!=null ) {
+ try {
+ message.setText(payload);
+ } catch (MessageNotWriteableException e) {
+ }
+ }
+ return message;
+ }
+
+ public static MessageAck createAck(ConsumerInfo consumerInfo, Message msg,
int count, byte ackType) {
+ MessageAck ack = new MessageAck();
+ ack.setAckType(ackType);
+ ack.setConsumerId(consumerInfo.getConsumerId());
+ ack.setDestination(msg.getDestination());
+ ack.setLastMessageId(msg.getMessageId());
+ ack.setMessageCount(count);
+ return ack;
+ }
+
+}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
Thu Mar 12 20:32:50 2009
@@ -1,8 +1,8 @@
package org.apache.activemq.broker.openwire;
-import static
org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
-import static
org.apache.activemq.broker.openwire.Openwire2Support.createConsumerInfo;
-import static
org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
+import static
org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo;
+import static
org.apache.activemq.broker.openwire.OpenwireSupport.createConsumerInfo;
+import static
org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@@ -17,6 +17,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.flow.Flow;
@@ -108,9 +109,9 @@
if (command.getClass() == WireFormatInfo.class) {
} else if (command.getClass() == BrokerInfo.class) {
System.out.println("Consumer "+name+" connected to
"+((BrokerInfo)command).getBrokerName());
- } else if (command.getClass() == MessageDelivery.class) {
- MessageDelivery msg = (MessageDelivery) command;
- inboundController.add(msg, null);
+ } else if (command.getClass() == MessageDispatch.class) {
+ MessageDispatch msg = (MessageDispatch) command;
+ inboundController.add(new
OpenWireMessageDelivery(msg.getMessage()), null);
} else {
onException(new Exception("Unrecognized command: " + command));
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753003&r1=753002&r2=753003&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Thu Mar 12 20:32:50 2009
@@ -1,9 +1,9 @@
package org.apache.activemq.broker.openwire;
-import static
org.apache.activemq.broker.openwire.Openwire2Support.createConnectionInfo;
-import static
org.apache.activemq.broker.openwire.Openwire2Support.createMessage;
-import static
org.apache.activemq.broker.openwire.Openwire2Support.createProducerInfo;
-import static
org.apache.activemq.broker.openwire.Openwire2Support.createSessionInfo;
+import static
org.apache.activemq.broker.openwire.OpenwireSupport.createConnectionInfo;
+import static
org.apache.activemq.broker.openwire.OpenwireSupport.createMessage;
+import static
org.apache.activemq.broker.openwire.OpenwireSupport.createProducerInfo;
+import static
org.apache.activemq.broker.openwire.OpenwireSupport.createSessionInfo;
import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;