Author: chirino
Date: Tue Feb 17 14:59:46 2009
New Revision: 745111
URL: http://svn.apache.org/viewvc?rev=745111&view=rev
Log:
Switching to the alternative protobuf api
Modified:
activemq/sandbox/activemq-flow/pom.xml
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Modified: activemq/sandbox/activemq-flow/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Tue Feb 17 14:59:46 2009
@@ -62,6 +62,9 @@
<plugin>
<groupId>org.apache.activemq.protobuf</groupId>
<artifactId>activemq-protobuf</artifactId>
+ <configuration>
+ <type>alt</type>
+ </configuration>
<executions>
<execution>
<goals>
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
Tue Feb 17 14:59:46 2009
@@ -19,6 +19,8 @@
import java.io.Serializable;
import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
import org.apache.activemq.queue.Mapper;
public class Message implements Serializable {
@@ -41,14 +43,16 @@
public static final short TYPE_FLOW_CLOSE = 3;
transient Flow flow;
- private Commands.Message message = new Commands.Message();
+ private MessageBuffer message;
Message(long msgId, int producerId, String msg, Flow flow, Destination
dest, int priority) {
- this.message.setMsgId(msgId);
- this.message.setProducerId(producerId);
- this.message.setMsg(msg);
- this.message.setDest(dest);
- this.message.setPriority(priority);
+ MessageBean message = new MessageBean();
+ message.setMsgId(msgId);
+ message.setProducerId(producerId);
+ message.setMsg(msg);
+ message.setDest(dest);
+ message.setPriority(priority);
+ this.message = message.freeze();
this.flow = flow;
}
@@ -57,7 +61,7 @@
this.flow = m.flow;
}
- public Message(Commands.Message m) {
+ public Message(MessageBuffer m) {
this.message=m;
}
@@ -66,9 +70,7 @@
}
public void setProperty(String matchProp) {
- Commands.Message clone = message.clone();
- clone.addProperty(matchProp);
- message = clone;
+ message = message.copy().addProperty(matchProp).freeze();
}
public boolean match(String matchProp) {
@@ -83,9 +85,7 @@
}
public void incrementHopCount() {
- Commands.Message clone = message.clone();
- clone.setHopCount(message.getHopCount());
- message = clone;
+ message = message.copy().setHopCount(message.getHopCount()).freeze();
}
public final int getHopCount() {
@@ -120,7 +120,7 @@
return message.getProducerId();
}
- public Commands.Message getProto() {
+ public MessageBuffer getProto() {
return message;
}
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Tue Feb 17 14:59:46 2009
@@ -26,6 +26,8 @@
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.dispatch.PriorityPooledDispatcher;
import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.Period;
import org.apache.activemq.queue.Mapper;
@@ -49,7 +51,7 @@
boolean ptp = false;
// Set to use tcp IO
- boolean tcp = false;
+ boolean tcp = true;
// Set's the number of threads to use:
private final int asyncThreadPoolSize =
Runtime.getRuntime().availableProcessors();
@@ -339,12 +341,13 @@
brokers.add(sendBroker);
}
- Destination[] dests = new Destination[destCount];
+ DestinationBuffer[] dests = new DestinationBuffer[destCount];
for (int i = 0; i < destCount; i++) {
- dests[i] = new Destination();
- dests[i].setName("dest" + (i + 1));
- dests[i].setPtp(ptp);
+ DestinationBean bean = new DestinationBean();
+ bean.setName("dest" + (i + 1));
+ bean.setPtp(ptp);
+ dests[i] = bean.freeze();
if (ptp) {
MockQueue queue = createQueue(sendBroker, dests[i]);
sendBroker.addQueue(queue);
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Tue Feb 17 14:59:46 2009
@@ -6,8 +6,9 @@
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import org.apache.activemq.flow.Commands.Destination;
-import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
@@ -27,7 +28,7 @@
public void marshal(Object value, DataOutput out) throws IOException {
if( value.getClass() == Message.class ) {
out.writeByte(0);
- Commands.Message proto = ((Message)value).getProto();
+ MessageBuffer proto = ((Message)value).getProto();
Buffer buffer = proto.toUnframedBuffer();
out.writeInt(buffer.getLength());
out.write(buffer.getData(), buffer.getOffset(),
buffer.getLength());
@@ -37,15 +38,15 @@
byte[] bytes = value2.getBytes("UTF-8");
out.writeInt(bytes.length);
out.write(bytes);
- } else if( value.getClass() == Destination.class ) {
+ } else if( value.getClass() == DestinationBuffer.class ) {
out.writeByte(2);
- Destination proto = (Destination)value;
+ DestinationBuffer proto = (DestinationBuffer)value;
Buffer buffer = proto.toUnframedBuffer();
out.writeInt(buffer.getLength());
out.write(buffer.getData(), buffer.getOffset(),
buffer.getLength());
- }else if( value.getClass() == FlowControl.class ) {
+ }else if( value.getClass() == FlowControlBuffer.class ) {
out.writeByte(3);
- FlowControl proto = (FlowControl)value;
+ FlowControlBuffer proto = (FlowControlBuffer)value;
Buffer buffer = proto.toUnframedBuffer();
out.writeInt(buffer.getLength());
out.write(buffer.getData(), buffer.getOffset(),
buffer.getLength());
@@ -61,18 +62,15 @@
in.readFully(data);
switch(type) {
case 0:
- Commands.Message m = new Commands.Message();
- m.mergeUnframed(data);
+ MessageBuffer m = MessageBuffer.parseUnframed(data);
return new Message(m);
case 1:
return new String(data, "UTF-8");
case 2:
- Destination d = new Destination();
- d.mergeUnframed(data);
+ DestinationBuffer d =
DestinationBuffer.parseUnframed(data);
return d;
case 3:
- FlowControl fc = new FlowControl();
- fc.mergeUnframed(data);
+ FlowControlBuffer fc =
FlowControlBuffer.parseUnframed(data);
return fc;
default:
throw new IOException("Unknonw type byte: ");
@@ -101,12 +99,12 @@
//Shouldn't happen.
throw IOExceptionSupport.create(e);
}
- } else if( value.getClass() == Destination.class ) {
+ } else if( value.getClass() == DestinationBuffer.class ) {
outType = 2;
- currentOut =
ByteBuffer.wrap(((Destination)value).toUnframedByteArray());
- }else if( value.getClass() == FlowControl.class ) {
+ currentOut =
ByteBuffer.wrap(((DestinationBuffer)value).toUnframedByteArray());
+ }else if( value.getClass() == FlowControlBuffer.class ) {
outType = 3;
- currentOut =
ByteBuffer.wrap(((FlowControl)value).toUnframedByteArray());
+ currentOut =
ByteBuffer.wrap(((FlowControlBuffer)value).toUnframedByteArray());
}else {
throw new IOException("Unsupported type:
"+value.getClass());
}
@@ -192,28 +190,18 @@
Object ret = null;
switch(inType) {
case 0:
- Commands.Message m = new Commands.Message();
- try
- {
- m.mergeUnframed(currentIn.array());
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
+ MessageBuffer m =
MessageBuffer.parseUnframed(currentIn.array());
ret = new Message(m);
break;
case 1:
ret = new String(currentIn.array(), "utf-8");
break;
case 2:
- Destination d = new Destination();
- d.mergeUnframed(currentIn.array());
+ DestinationBuffer d =
DestinationBuffer.parseUnframed(currentIn.array());
ret = d;
break;
case 3:
- FlowControl c = new FlowControl();
- c.mergeUnframed(currentIn.array());
+ FlowControlBuffer c =
FlowControlBuffer.parseUnframed(currentIn.array());
ret = c;
break;
default:
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=745111&r1=745110&r2=745111&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Tue Feb 17 14:59:46 2009
@@ -8,6 +8,9 @@
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
import org.apache.activemq.queue.SingleFlowRelay;
@@ -77,12 +80,12 @@
} else if (command.getClass() == Message.class) {
Message msg = (Message) command;
inboundController.add(msg, null);
- } else if (command.getClass() == Destination.class) {
+ } else if (command.getClass() == DestinationBuffer.class) {
// This is a subscription request
Destination destination = (Destination) command;
broker.subscribe(destination, this);
- } else if (command.getClass() == FlowControl.class) {
+ } else if (command.getClass() == FlowControlBuffer.class) {
// This is a subscription request
FlowControl fc = (FlowControl) command;
synchronized (outputQueue) {
@@ -306,9 +309,9 @@
if (!clientMode) {
available += size;
if (available >= capacity - resumeThreshold) {
- FlowControl fc = new FlowControl();
+ FlowControlBean fc = new FlowControlBean();
fc.setCredit(available);
- write(fc);
+ write(fc.freeze());
// System.out.println(RemoteConnection.this.name +
// " Send Release " + available + this);
available = 0;