Author: chirino
Date: Mon Mar 16 19:58:58 2009
New Revision: 754990
URL: http://svn.apache.org/viewvc?rev=754990&view=rev
Log:
Fixed up flow control for the Stomp protocol
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
Mon Mar 16 19:58:58 2009
@@ -38,10 +38,11 @@
protected int outputResumeThreshold = 900;
protected int inputWindowSize = 1000;
protected int inputResumeThreshold = 500;
+ protected boolean useAsyncWriteThread = true;
private IDispatcher dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
- private ExecutorService blockingWriter;
+ private ExecutorService blockingWriter;
private ExceptionListener exceptionListener;
@@ -58,7 +59,9 @@
}
dt.setDispatcher(getDispatcher());
} else {
- blockingWriter = Executors.newSingleThreadExecutor();
+ if( useAsyncWriteThread ) {
+ blockingWriter = Executors.newSingleThreadExecutor();
+ }
}
transport.start();
}
@@ -77,9 +80,16 @@
}
public final void write(final Object o) {
+ write(o, null);
+ }
+
+ public final void write(final Object o, final Runnable onCompleted) {
if (blockingWriter==null) {
try {
transport.oneway(o);
+ if( onCompleted!=null ) {
+ onCompleted.run();
+ }
} catch (IOException e) {
onException(e);
}
@@ -90,6 +100,9 @@
if (!stopping.get()) {
try {
transport.oneway(o);
+ if( onCompleted!=null ) {
+ onCompleted.run();
+ }
} catch (IOException e) {
onException(e);
}
@@ -172,4 +185,12 @@
this.exceptionListener = exceptionListener;
}
+ public boolean isUseAsyncWriteThread() {
+ return useAsyncWriteThread;
+ }
+
+ public void setUseAsyncWriteThread(boolean useAsyncWriteThread) {
+ this.useAsyncWriteThread = useAsyncWriteThread;
+ }
+
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
Mon Mar 16 19:58:58 2009
@@ -34,7 +34,6 @@
public StompMessageDelivery(StompFrame frame, Destination destiantion) {
this.frame = frame;
this.destination = destiantion;
- this.frame.setAction(Stomp.Responses.MESSAGE);
this.receiptId =
frame.getHeaders().remove(Stomp.Headers.RECEIPT_REQUESTED);
}
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Mon Mar 16 19:58:58 2009
@@ -105,6 +105,7 @@
String dest =
frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
Destination destination =
translator(frame).convertToDestination(StompProtocolHandler.this, dest);
+ frame.setAction(Stomp.Responses.MESSAGE);
StompMessageDelivery md = new StompMessageDelivery(frame,
destination);
while (!inboundController.offer(md, null)) {
inboundController.waitForFlowUnblock();
@@ -165,9 +166,13 @@
limiter = new
SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(),
connection.getOutputWindowSize());
outboundQueue = new SingleFlowRelay<MessageDelivery>(outboundFlow,
outboundFlow.getFlowName(), limiter);
outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
- public void drain(final MessageDelivery message,
ISourceController<MessageDelivery> controller) {
+ public void drain(final MessageDelivery message, final
ISourceController<MessageDelivery> controller) {
StompFrame msg = message.asType(StompFrame.class);
- connection.write(msg);
+ connection.write(msg, new Runnable() {
+ public void run() {
+ controller.elementDispatched(message);
+ }
+ });
};
});
@@ -187,6 +192,9 @@
actionHander.onStompFrame(command);
} catch (Exception error) {
try {
+
+ error.printStackTrace();
+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter stream = new PrintWriter(new
OutputStreamWriter(baos, "UTF-8"));
error.printStackTrace(stream);
@@ -343,22 +351,25 @@
if (stompMessage == null) {
return false;
}
-
- Message msg = message.asType(Message.class);
- if (msg == null) {
- return false;
- }
-
- // TODO: abstract the Selector bits so that it is not openwire
specific.
- MessageEvaluationContext selectorContext = new
MessageEvaluationContext();
- selectorContext.setMessageReference(msg);
- selectorContext.setDestination(msg.getDestination());
- try {
- return (selector == null || selector.matches(selectorContext));
- } catch (JMSException e) {
- e.printStackTrace();
- return false;
- }
+
+ return true;
+
+// TODO: implement selector bits.
+// Message msg = message.asType(Message.class);
+// if (msg == null) {
+// return false;
+// }
+//
+// // TODO: abstract the Selector bits so that it is not openwire
specific.
+// MessageEvaluationContext selectorContext = new
MessageEvaluationContext();
+// selectorContext.setMessageReference(msg);
+// selectorContext.setDestination(msg.getDestination());
+// try {
+// return (selector == null ||
selector.matches(selectorContext));
+// } catch (JMSException e) {
+// e.printStackTrace();
+// return false;
+// }
}
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteConsumer.java
Mon Mar 16 19:58:58 2009
@@ -23,6 +23,8 @@
private FlowController<MessageDelivery> inboundController;
private String stompDestination;
+ public StompRemoteConsumer() {
+ }
protected void setupSubscription() throws Exception, IOException {
if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java?rev=754990&r1=754989&r2=754990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/stomp/StompRemoteProducer.java
Mon Mar 16 19:58:58 2009
@@ -19,6 +19,9 @@
private String stompDestination;
+ StompRemoteProducer() {
+ }
+
protected void setupProducer() throws Exception, IOException {
if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
stompDestination = "/queue/"+destination.getName().toString();
@@ -40,9 +43,13 @@
outboundController = outboundQueue.getFlowController(flow);
outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
- public void drain(MessageDelivery message,
ISourceController<MessageDelivery> controller) {
+ public void drain(final MessageDelivery message, final
ISourceController<MessageDelivery> controller) {
StompFrame msg = message.asType(StompFrame.class);
- write(msg);
+ write(msg, new Runnable(){
+ public void run() {
+ controller.elementDispatched(message);
+ }
+ });
}
});
}
@@ -77,7 +84,11 @@
headers.put(property, property);
}
- StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers,
toContent(createPayload()));
+ byte[] content = toContent(createPayload());
+
+ headers.put(Stomp.Headers.CONTENT_LENGTH, ""+content.length);
+
+ StompFrame fram = new StompFrame(Stomp.Commands.SEND, headers,
content);
next = new StompMessageDelivery(fram, getDestination());
}