Author: chirino
Date: Thu Feb 19 17:30:11 2009
New Revision: 745938
URL: http://svn.apache.org/viewvc?rev=745938&view=rev
Log:
Added a proto2 wire which uses the new externalizable style encoding/decoding.
Just so we can benchmark protobuf encoding vs. more standard java style
externalizable encoding.
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=745938&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
(added)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
Thu Feb 19 17:30:11 2009
@@ -0,0 +1,272 @@
+package org.apache.activemq.flow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.StatefulWireFormat;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class Proto2WireFormatFactory implements WireFormatFactory {
+
+ public class TestWireFormat implements StatefulWireFormat {
+ private ByteBuffer currentOut;
+ private byte outType;
+
+ private ByteBuffer currentIn;
+ private byte inType;
+
+ public void marshal(Object value, DataOutput out) throws IOException {
+ if( value.getClass() == Message.class ) {
+ out.writeByte(0);
+ DataByteArrayOutputStream baos = new
DataByteArrayOutputStream();
+ MessageBean proto = ((Message)value).getProto().copy();
+ proto.writeExternal(baos);
+ out.writeInt(baos.size());
+ out.write(baos.getData(), 0, baos.size());
+ } else if( value.getClass() == String.class ) {
+ out.writeByte(1);
+ String value2 = (String) value;
+ byte[] bytes = value2.getBytes("UTF-8");
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ } else if( value.getClass() == DestinationBuffer.class ) {
+ out.writeByte(2);
+ DataByteArrayOutputStream baos = new
DataByteArrayOutputStream();
+ DestinationBean proto = ((DestinationBuffer)value).copy();
+ proto.writeExternal(baos);
+ out.writeInt(baos.size());
+ out.write(baos.getData(), 0, baos.size());
+ }else if( value.getClass() == FlowControlBuffer.class ) {
+ out.writeByte(3);
+ DataByteArrayOutputStream baos = new
DataByteArrayOutputStream();
+ FlowControlBean proto = ((FlowControlBuffer)value).copy();
+ proto.writeExternal(baos);
+ out.writeInt(baos.size());
+ out.write(baos.getData(), 0, baos.size());
+
+ } else {
+ throw new IOException("Unsupported type: "+value.getClass());
+ }
+ }
+
+ public Object unmarshal(DataInput in) throws IOException {
+ byte type = in.readByte();
+ int size = in.readInt();
+ switch(type) {
+ case 0: {
+ MessageBean proto = new MessageBean();
+ proto.readExternal(in);
+ return new Message(proto.freeze());
+ }
+ case 1: {
+ byte data[] = new byte[size];
+ in.readFully(data);
+ return new String(data, "UTF-8");
+ } case 2: {
+ DestinationBean proto = new DestinationBean();
+ proto.readExternal(in);
+ return proto.freeze();
+ } case 3: {
+ FlowControlBean proto = new FlowControlBean();
+ proto.readExternal(in);
+ return proto.freeze();
+ }
+ default:
+ throw new IOException("Unknonw type byte: ");
+ }
+ }
+
+ public boolean marshal(Object value, ByteBuffer target) throws
IOException
+ {
+ if(currentOut == null)
+ {
+ //Ensure room for type byte and length byte:
+ if(target.remaining() < 5)
+ {
+ return false;
+ }
+
+ if( value.getClass() == Message.class ) {
+ DataByteArrayOutputStream baos = new
DataByteArrayOutputStream();
+ MessageBean proto = ((Message)value).getProto().copy();
+ proto.writeExternal(baos);
+ currentOut = ByteBuffer.wrap(baos.getData(), 0,
baos.size());
+ outType = 0;
+ } else if( value.getClass() == String.class ) {
+ outType = 1;
+ try {
+ currentOut =
ByteBuffer.wrap(((String)value).getBytes("utf-8"));
+ } catch (UnsupportedEncodingException e) {
+ //Shouldn't happen.
+ throw IOExceptionSupport.create(e);
+ }
+ } else if( value.getClass() == DestinationBuffer.class ) {
+ outType = 2;
+
+ DataByteArrayOutputStream baos = new
DataByteArrayOutputStream();
+ DestinationBean proto =
((DestinationBuffer)value).copy();
+ proto.writeExternal(baos);
+ currentOut = ByteBuffer.wrap(baos.getData(), 0,
baos.size());
+ }else if( value.getClass() == FlowControlBuffer.class ) {
+ outType = 3;
+ DataByteArrayOutputStream baos = new
DataByteArrayOutputStream();
+ FlowControlBean proto = ((FlowControlBuffer)value).copy();
+ proto.writeExternal(baos);
+ }else {
+ throw new IOException("Unsupported type:
"+value.getClass());
+ }
+
+ //Write type:
+ target.put(outType);
+ //Write length:
+ target.putInt(currentOut.remaining());
+ if(currentOut.remaining() > 1024*1024)
+ {
+ throw new IOException("Packet exceeded max memory size!");
+ }
+ }
+
+ //Avoid overflow:
+ if(currentOut.remaining() > target.remaining())
+ {
+ int limit = currentOut.limit();
+ currentOut.limit(currentOut.position() + target.remaining());
+ target.put(currentOut);
+ currentOut.limit(limit);
+ }
+ else
+ {
+ target.put(currentOut);
+ }
+
+ if(!currentOut.hasRemaining())
+ {
+ currentOut = null;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Unmarshals an object. When the object is read it is returned.
+ * @param source
+ * @return The object when unmarshalled, null otherwise
+ */
+ public Object unMarshal(ByteBuffer source) throws IOException
+ {
+ if(currentIn == null)
+ {
+ if(source.remaining() < 5)
+ {
+ return null;
+ }
+
+ inType = source.get();
+ int length = source.getInt();
+ if(length > 1024*1024)
+ {
+ throw new IOException("Packet exceeded max memory size!");
+ }
+ currentIn = ByteBuffer.wrap(new byte[length]);
+
+ }
+
+ if(!source.hasRemaining())
+ {
+ return null;
+ }
+
+ if(source.remaining() > currentIn.remaining())
+ {
+ int limit = source.limit();
+ source.limit(source.position() + currentIn.remaining());
+ currentIn.put(source);
+ source.limit(limit);
+ }
+ else
+ {
+ currentIn.put(source);
+ }
+
+ //If we haven't finished the packet return to get more data:
+ if(currentIn.hasRemaining())
+ {
+ return null;
+ }
+
+ Object ret = null;
+ switch(inType) {
+ case 0: {
+ DataByteArrayInputStream in = new
DataByteArrayInputStream(currentIn.array());
+ MessageBean proto = new MessageBean();
+ proto.readExternal(in);
+ ret = new Message(proto.freeze());
+ break;
+ }
+ case 1: {
+ ret = new String(currentIn.array(), "utf-8");
+ break;
+ }
+ case 2: {
+ DataByteArrayInputStream in = new
DataByteArrayInputStream(currentIn.array());
+ DestinationBean proto = new DestinationBean();
+ proto.readExternal(in);
+ ret = proto.freeze();
+ break;
+ }
+ case 3: {
+ DataByteArrayInputStream in = new
DataByteArrayInputStream(currentIn.array());
+ FlowControlBean proto = new FlowControlBean();
+ proto.readExternal(in);
+ ret = proto.freeze();
+ break;
+ }
+ default:
+ throw new IOException("Unknown type byte: " + inType);
+ }
+
+ currentIn = null;
+ return ret;
+ }
+
+ public int getVersion() {
+ return 0;
+ }
+ public void setVersion(int version) {
+ }
+
+ public boolean inReceive() {
+ return false;
+ }
+
+ public ByteSequence marshal(Object value) throws IOException {
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+ marshal(value, os);
+ return os.toByteSequence();
+ }
+
+ public Object unmarshal(ByteSequence data) throws IOException {
+ DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+ return unmarshal(is);
+ }
+ }
+
+ public WireFormat createWireFormat() {
+ return new TestWireFormat();
+ }
+
+}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=745938&r1=745937&r2=745938&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Thu Feb 19 17:30:11 2009
@@ -11,19 +11,21 @@
import org.apache.activemq.flow.MockBroker.DeliveryTarget;
public class Router {
- final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new
HashMap<Destination, Collection<DeliveryTarget>>();
+ final HashMap<String, Collection<DeliveryTarget>> lookupTable = new
HashMap<String, Collection<DeliveryTarget>>();
final synchronized void bind(DeliveryTarget dt, Destination destination) {
- Collection<DeliveryTarget> targets = lookupTable.get(destination);
+ String key = destination.getName();
+ Collection<DeliveryTarget> targets = lookupTable.get(key);
if (targets == null) {
targets = new ArrayList<DeliveryTarget>();
- lookupTable.put(destination, targets);
+ lookupTable.put(key, targets);
}
targets.add(dt);
}
final void route(ISourceController<Message> source, Message msg) {
- Collection<DeliveryTarget> targets =
lookupTable.get(msg.getDestination());
+ String key = msg.getDestination().getName();
+ Collection<DeliveryTarget> targets = lookupTable.get(key);
if( targets == null )
return;
for (DeliveryTarget dt : targets) {
Added:
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2?rev=745938&view=auto
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
(added)
+++
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
Thu Feb 19 17:30:11 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.Proto2WireFormatFactory