Author: chirino
Date: Wed Feb 18 20:47:16 2009
New Revision: 745641
URL: http://svn.apache.org/viewvc?rev=745641&view=rev
Log:
- You can now force marshalling in the pipe transport
- Better toString in the SingleFlowRelay
- Producer can now send big messages
- Support doing a traditional sleep in the consumer to simulate think time.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SingleFlowRelay.java
Wed Feb 18 20:47:16 2009
@@ -45,4 +45,9 @@
// TODO Auto-generated method stub
return this;
}
+
+ @Override
+ public String toString() {
+ return getResourceName();
+ }
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Wed Feb 18 20:47:16 2009
@@ -35,7 +35,7 @@
public class MockBrokerTest extends TestCase {
- protected static final int PERFORMANCE_SAMPLES = 3;
+ protected static final int PERFORMANCE_SAMPLES = 30000;
protected static final int IO_WORK_AMOUNT = 0;
protected static final int FANIN_COUNT = 10;
@@ -52,6 +52,9 @@
// Set to use tcp IO
protected boolean tcp = true;
+ // set to force marshalling even in the NON tcp case.
+ protected boolean forceMarshalling = false;
+
protected String sendBrokerURI;
protected String receiveBrokerURI;
@@ -91,8 +94,13 @@
sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
} else {
- sendBrokerURI = "pipe://SendBroker";
- receiveBrokerURI = "pipe://ReceiveBroker";
+ if( forceMarshalling ) {
+ sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
+ receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
+ } else {
+ sendBrokerURI = "pipe://SendBroker";
+ receiveBrokerURI = "pipe://ReceiveBroker";
+ }
}
}
@@ -423,13 +431,12 @@
}
private void stopServices() throws Exception {
- for (MockBroker broker : brokers) {
- broker.stopServices();
- }
if (dispatcher != null) {
dispatcher.shutdown();
}
-
+ for (MockBroker broker : brokers) {
+ broker.stopServices();
+ }
}
private void startServices() throws Exception {
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
Wed Feb 18 20:47:16 2009
@@ -4,7 +4,9 @@
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -21,6 +23,11 @@
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +45,7 @@
private Thread thread;
private DispatchContext readContext;
private String name;
+ private WireFormat wireFormat;
public PipeTransport(Pipe<Object> pipe) {
this.pipe = pipe;
@@ -79,7 +87,11 @@
public void oneway(Object command) throws IOException {
try {
- pipe.write(command);
+ if( wireFormat!=null ) {
+ pipe.write(wireFormat.marshal(command));
+ } else {
+ pipe.write(command);
+ }
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
@@ -92,13 +104,20 @@
public boolean dispatch() {
while (true) {
-
- Object o = pipe.poll();
- if (o == null) {
- pipe.setReadReadyListener(this);
- return true;
- } else {
- listener.onCommand(o);
+ try {
+ Object o = pipe.poll();
+ if (o == null) {
+ pipe.setReadReadyListener(this);
+ return true;
+ } else {
+ if( wireFormat!=null ) {
+
listener.onCommand(wireFormat.unmarshal((ByteSequence)o));
+ } else {
+ listener.onCommand(o);
+ }
+ }
+ } catch (IOException e) {
+ listener.onException(e);
}
}
}
@@ -169,12 +188,17 @@
name = remoteAddress;
}
}
+
+ public void setWireFormat(WireFormat wireFormat) {
+ this.wireFormat = wireFormat;
+ }
}
private class PipeTransportServer implements TransportServer {
private URI connectURI;
private TransportAcceptListener listener;
private String name;
+ private WireFormatFactory wireFormatFactory;
public URI getConnectURI() {
return connectURI;
@@ -219,22 +243,39 @@
rc.setRemoteAddress(remoteAddress);
PipeTransport serverSide = new PipeTransport(pipe.connect());
serverSide.setRemoteAddress(remoteAddress);
+ if( wireFormatFactory!=null ) {
+ rc.setWireFormat(wireFormatFactory.createWireFormat());
+ serverSide.setWireFormat(wireFormatFactory.createWireFormat());
+ }
listener.onAccept(serverSide);
return rc;
}
+
+ public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
+ this.wireFormatFactory = wireFormatFactory;
+ }
}
@Override
public synchronized TransportServer doBind(URI uri) throws IOException {
- String node = uri.getHost();
- if (servers.containsKey(node)) {
- throw new IOException("Server allready bound: " + node);
- }
- PipeTransportServer server = new PipeTransportServer();
- server.setConnectURI(uri);
- server.setName(node);
- servers.put(node, server);
- return server;
+ try {
+ Map<String, String> options = new HashMap<String,
String>(URISupport.parseParamters(uri));
+
+ String node = uri.getHost();
+ if (servers.containsKey(node)) {
+ throw new IOException("Server allready bound: " + node);
+ }
+ PipeTransportServer server = new PipeTransportServer();
+ server.setConnectURI(uri);
+ server.setName(node);
+ if( options.containsKey("wireFormat") ) {
+ server.setWireFormatFactory(createWireFormatFactory(options));
+ }
+ servers.put(node, server);
+ return server;
+ } catch (URISyntaxException e) {
+ throw IOExceptionSupport.create(e);
+ }
}
private synchronized void unbind(PipeTransportServer server) {
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Wed Feb 18 20:47:16 2009
@@ -11,6 +11,8 @@
import org.apache.activemq.flow.Commands.Message.MessageBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.StatefulWireFormat;
import org.apache.activemq.wireformat.WireFormat;
@@ -223,10 +225,14 @@
}
public ByteSequence marshal(Object value) throws IOException {
- return null;
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+ marshal(value, os);
+ return os.toByteSequence();
}
+
public Object unmarshal(ByteSequence data) throws IOException {
- return null;
+ DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+ return unmarshal(is);
}
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
Wed Feb 18 20:47:16 2009
@@ -1,15 +1,12 @@
package org.apache.activemq.flow;
-import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
public class RemoteConsumer extends RemoteConnection{
@@ -20,6 +17,8 @@
private long thinkTime;
private Destination destination;
private String selector;
+
+ private boolean schedualWait;
public void start() throws Exception {
consumerRate.name("Consumer " + name + " Rate");
@@ -32,6 +31,7 @@
DispatchableTransport dt = ((DispatchableTransport)transport);
dt.setName(name);
dt.setDispatcher(getDispatcher());
+ schedualWait = true;
}
transport.setTransportListener(this);
transport.start();
@@ -44,22 +44,34 @@
}
protected void messageReceived(final ISourceController<Message>
controller, final Message elem) {
- if (thinkTime > 0) {
- getDispatcher().schedule(new Runnable(){
-
- public void run() {
- consumerRate.increment();
- controller.elementDispatched(elem);
+ if( schedualWait ) {
+ if (thinkTime > 0) {
+ getDispatcher().schedule(new Runnable(){
+
+ public void run() {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+
+ }, thinkTime, TimeUnit.MILLISECONDS);
+
+ }
+ else
+ {
+ consumerRate.increment();
+ controller.elementDispatched(elem);
+ }
+
+ } else {
+ if( thinkTime>0 ) {
+ try {
+ Thread.sleep(thinkTime);
+ } catch (InterruptedException e) {
}
-
- }, thinkTime, TimeUnit.MILLISECONDS);
-
+ }
+ consumerRate.increment();
+ controller.elementDispatched(elem);
}
- else
- {
- consumerRate.increment();
- controller.elementDispatched(elem);
- }
}
public void setName(String name) {
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=745641&r1=745640&r2=745641&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Wed Feb 18 20:47:16 2009
@@ -1,8 +1,6 @@
package org.apache.activemq.flow;
-import java.io.IOException;
import java.net.URI;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
@@ -12,12 +10,12 @@
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.transport.DispatchableTransport;
-import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportListener;
public class RemoteProducer extends RemoteConnection implements Dispatchable,
FlowUnblockListener<Message>{
+ private static final int FILLER_SIZE = 100;
+
private final MetricCounter rate = new MetricCounter();
private AtomicLong messageIdGenerator;
@@ -30,8 +28,17 @@
private MetricAggregator totalProducerRate;
Message next;
private DispatchContext dispatchContext;
+
+ private String filler;
public void start() throws Exception {
+
+ StringBuilder sb = new StringBuilder(FILLER_SIZE);
+ for( int i=0; i < FILLER_SIZE; ++i) {
+ sb.append('a'+(i%26));
+ }
+ filler = sb.toString();
+
rate.name("Producer " + name + " Rate");
totalProducerRate.add(rate);
@@ -75,7 +82,7 @@
priority = counter % priorityMod == 0 ? 0 : priority;
}
- next = new Message(messageIdGenerator.getAndIncrement(),
producerId, name + ++counter, null, destination, priority);
+ next = new Message(messageIdGenerator.getAndIncrement(),
producerId, createPayload(), null, destination, priority);
if (property != null) {
next.setProperty(property);
}
@@ -95,6 +102,10 @@
next = null;
}
}
+
+ private String createPayload() {
+ return name + ++counter+filler;
+ }
public void setName(String name) {
this.name = name;