Author: chirino
Date: Fri Feb 13 05:19:23 2009
New Revision: 743990
URL: http://svn.apache.org/viewvc?rev=743990&view=rev
Log:
Switching to a protobuf based wireformat
Added:
activemq/sandbox/activemq-flow/src/main/proto/
activemq/sandbox/activemq-flow/src/main/proto/test.proto
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
Removed:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Destination.java
Modified:
activemq/sandbox/activemq-flow/pom.xml
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Modified: activemq/sandbox/activemq-flow/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Fri Feb 13 05:19:23 2009
@@ -57,4 +57,20 @@
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.activemq.protobuf</groupId>
+ <artifactId>activemq-protobuf</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
Added: activemq/sandbox/activemq-flow/src/main/proto/test.proto
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/test.proto?rev=743990&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/test.proto (added)
+++ activemq/sandbox/activemq-flow/src/main/proto/test.proto Fri Feb 13
05:19:23 2009
@@ -0,0 +1,37 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.activemq.flow;
+
+option java_multiple_files = false;
+option java_outer_classname = "Commands";
+option deferred_decode = true;
+
+message Destination {
+ optional string name = 1;
+ optional bool ptp = 3;
+}
+
+message Message {
+ optional string msg = 1;
+ optional Destination dest=2;
+ optional int32 hopCount=3;
+ optional int64 msgId=4;
+ optional int32 producerId=5;
+ optional int32 priority=6;
+ repeated string property=7;
+}
+
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
Fri Feb 13 05:19:23 2009
@@ -22,15 +22,6 @@
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.flow.AbstractLimitedFlowSource;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.IFlowDrain;
-import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.IFlowSource;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.IFlowResource.FlowLifeCycleListener;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.queue.ExclusivePriorityQueue;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
Fri Feb 13 05:19:23 2009
@@ -3,7 +3,6 @@
*/
package org.apache.activemq.flow;
-import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
class BrokerConnection extends AbstractTestConnection implements
MockBroker.DeliveryTarget {
private final Pipe<Message> pipe;
@@ -37,7 +36,7 @@
protected void messageReceived(Message m, ISourceController<Message>
controller) {
m = new Message(m);
- m.hopCount++;
+ m.incrementHopCount();
local.router.route(controller, m);
}
@@ -53,7 +52,7 @@
public boolean match(Message message) {
// Avoid loops:
- if (message.hopCount > 0) {
+ if (message.getHopCount() > 0) {
return false;
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
Fri Feb 13 05:19:23 2009
@@ -17,8 +17,8 @@
package org.apache.activemq.flow;
import java.io.Serializable;
-import java.util.HashSet;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.queue.Mapper;
public class Message implements Serializable {
@@ -27,7 +27,7 @@
public static final Mapper<Integer, Message> PRIORITY_MAPPER = new
Mapper<Integer, Message>() {
public Integer map(Message element) {
- return element.priority;
+ return element.getPriority();
}
};
@@ -40,34 +40,25 @@
public static final short TYPE_FLOW_OPEN = 2;
public static final short TYPE_FLOW_CLOSE = 3;
- final String msg;
- transient final Flow flow;
- final Destination dest;
- int hopCount;
- HashSet<String> matchProps;
- final long msgId;
- final int producerId;
- final int priority;
+ transient Flow flow;
+ private Commands.Message message = new Commands.Message();
Message(long msgId, int producerId, String msg, Flow flow, Destination
dest, int priority) {
- this.msgId = msgId;
- this.producerId = producerId;
- this.msg = msg;
+ this.message.setMsgId(msgId);
+ this.message.setProducerId(producerId);
+ this.message.setMsg(msg);
+ this.message.setDest(dest);
+ this.message.setPriority(priority);
this.flow = flow;
- this.dest = dest;
- this.priority = priority;
- hopCount = 0;
}
Message(Message m) {
- this.msgId = m.msgId;
- this.producerId = m.producerId;
- this.msg = m.msg;
+ this.message = m.message;
this.flow = m.flow;
- this.dest = m.dest;
- this.matchProps = m.matchProps;
- this.priority = m.priority;
- hopCount = m.hopCount;
+ }
+
+ public Message(Commands.Message m) {
+ this.message=m;
}
public short type() {
@@ -75,17 +66,16 @@
}
public void setProperty(String matchProp) {
- if (matchProps == null) {
- matchProps = new HashSet<String>();
- }
- matchProps.add(matchProp);
+ Commands.Message clone = message.clone();
+ clone.addProperty(matchProp);
+ message = clone;
}
public boolean match(String matchProp) {
- if (matchProps == null) {
+ if (!message.hasProperty()) {
return false;
}
- return matchProps.contains(matchProp);
+ return message.getPropertyList().contains(matchProp);
}
public boolean isSystem() {
@@ -93,15 +83,17 @@
}
public void incrementHopCount() {
- hopCount++;
+ Commands.Message clone = message.clone();
+ clone.setHopCount(message.getHopCount());
+ message = clone;
}
public final int getHopCount() {
- return hopCount;
+ return message.getHopCount();
}
public final Destination getDestination() {
- return dest;
+ return message.getDest();
}
public Flow getFlow() {
@@ -113,18 +105,22 @@
}
public int getPriority() {
- return priority;
+ return message.getPriority();
}
public String toString() {
- return "Message: " + msg + " flow + " + flow + " dest: " + dest;
+ return "Message: " + message.getMsg() + " flow + " + flow + " dest: "
+ message.getDest();
}
public long getMsgId() {
- return msgId;
+ return message.getMsgId();
}
public int getProducerId() {
- return producerId;
+ return message.getProducerId();
+ }
+
+ public Commands.Message getProto() {
+ return message;
}
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
Fri Feb 13 05:19:23 2009
@@ -9,6 +9,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
@@ -41,7 +42,7 @@
}
public void subscribe(Destination destination, DeliveryTarget
deliveryTarget) {
- if (destination.ptp) {
+ if (destination.getPtp()) {
queues.get(destination).addConsumer(deliveryTarget);
} else {
router.bind(deliveryTarget, destination);
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
Fri Feb 13 05:19:23 2009
@@ -25,6 +25,7 @@
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.dispatch.PriorityPooledDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.Period;
import org.apache.activemq.queue.Mapper;
@@ -323,8 +324,8 @@
if (multibroker) {
if( tcp ) {
- sendBroker = createBroker("SendBroker",
"tcp://localhost:10000?wireFormat=test");
- rcvBroker = createBroker("RcvBroker",
"tcp://localhost:20000?wireFormat=test");
+ sendBroker = createBroker("SendBroker",
"tcp://localhost:10000?wireFormat=proto");
+ rcvBroker = createBroker("RcvBroker",
"tcp://localhost:20000?wireFormat=proto");
} else {
sendBroker = createBroker("SendBroker", "pipe://SendBroker");
rcvBroker = createBroker("RcvBroker", "pipe://RcvBroker");
@@ -333,7 +334,7 @@
brokers.add(rcvBroker);
} else {
if( tcp ) {
- sendBroker = rcvBroker = createBroker("Broker",
"tcp://localhost:10000?wireFormat=test");
+ sendBroker = rcvBroker = createBroker("Broker",
"tcp://localhost:10000?wireFormat=proto");
} else {
sendBroker = rcvBroker = createBroker("Broker",
"pipe://Broker");
}
@@ -344,7 +345,9 @@
Destination[] dests = new Destination[destCount];
for (int i = 0; i < destCount; i++) {
- dests[i] = new Destination("dest" + (i + 1), ptp);
+ dests[i] = new Destination();
+ dests[i].setName("dest" + (i + 1));
+ dests[i].setPtp(ptp);
if (ptp) {
MockQueue queue = createQueue(sendBroker, dests[i]);
sendBroker.addQueue(queue);
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
Fri Feb 13 05:19:23 2009
@@ -5,6 +5,7 @@
import java.util.HashMap;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.Mapper;
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=743990&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
Fri Feb 13 05:19:23 2009
@@ -0,0 +1,73 @@
+package org.apache.activemq.flow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.flow.Commands.Destination;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class ProtoWireFormatFactory implements WireFormatFactory {
+
+ static public class TestWireFormat implements WireFormat {
+
+ public void marshal(Object value, DataOutput out) throws IOException {
+ if( value.getClass() == Message.class ) {
+ out.writeByte(0);
+ ((Message)value).getProto().writeFramed((OutputStream)out);
+ } else if( value.getClass() == String.class ) {
+ out.writeByte(1);
+ out.writeUTF((String) value);
+ } else if( value.getClass() == Destination.class ) {
+ out.writeByte(2);
+ ((Destination)value).writeFramed((OutputStream)out);
+ } else {
+ throw new IOException("Unsupported type: "+value.getClass());
+ }
+ }
+
+ public Object unmarshal(DataInput in) throws IOException {
+ byte type = in.readByte();
+ switch(type) {
+ case 0:
+ Commands.Message m = new Commands.Message();
+ m.mergeFramed((InputStream)in);
+ return new Message(m);
+ case 1:
+ return in.readUTF();
+ case 2:
+ Destination d = new Destination();
+ d.mergeFramed((InputStream)in);
+ return d;
+ default:
+ throw new IOException("Unknonw type byte: ");
+ }
+ }
+
+ public int getVersion() {
+ return 0;
+ }
+ public void setVersion(int version) {
+ }
+
+ public boolean inReceive() {
+ return false;
+ }
+
+ public ByteSequence marshal(Object value) throws IOException {
+ return null;
+ }
+ public Object unmarshal(ByteSequence data) throws IOException {
+ return null;
+ }
+ }
+
+ public WireFormat createWireFormat() {
+ return new TestWireFormat();
+ }
+
+}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
Fri Feb 13 05:19:23 2009
@@ -6,6 +6,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
import org.apache.activemq.queue.ExclusivePriorityQueue;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConsumer.java
Fri Feb 13 05:19:23 2009
@@ -4,6 +4,7 @@
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.transport.Transport;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
Fri Feb 13 05:19:23 2009
@@ -5,6 +5,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
import org.apache.activemq.transport.Transport;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743990&r1=743989&r2=743990&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Fri Feb 13 05:19:23 2009
@@ -7,6 +7,7 @@
import java.util.Collection;
import java.util.HashMap;
+import org.apache.activemq.flow.Commands.Destination;
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
public class Router {
Added:
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto?rev=743990&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
(added)
+++
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto
Fri Feb 13 05:19:23 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.ProtoWireFormatFactory