Author: chirino
Date: Thu Feb 19 17:30:11 2009
New Revision: 745938

URL: http://svn.apache.org/viewvc?rev=745938&view=rev
Log:
Added a proto2 wire which uses the new externalizable style encoding/decoding.  
Just so we can benchmark protobuf encoding vs. more standard java style 
externalizable encoding.

Added:
    
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
    
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
Modified:
    
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java

Added: 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java?rev=745938&view=auto
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
 (added)
+++ 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Proto2WireFormatFactory.java
 Thu Feb 19 17:30:11 2009
@@ -0,0 +1,272 @@
+package org.apache.activemq.flow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.flow.Commands.Destination.DestinationBean;
+import org.apache.activemq.flow.Commands.Destination.DestinationBuffer;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBean;
+import org.apache.activemq.flow.Commands.FlowControl.FlowControlBuffer;
+import org.apache.activemq.flow.Commands.Message.MessageBean;
+import org.apache.activemq.flow.Commands.Message.MessageBuffer;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.StatefulWireFormat;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+public class Proto2WireFormatFactory implements WireFormatFactory {
+
+    public class TestWireFormat implements StatefulWireFormat {
+        private ByteBuffer currentOut;
+        private byte outType;
+        
+        private ByteBuffer currentIn;
+        private byte inType;
+        
+        public void marshal(Object value, DataOutput out) throws IOException {
+            if( value.getClass() == Message.class ) {
+                out.writeByte(0);
+                DataByteArrayOutputStream baos = new 
DataByteArrayOutputStream();
+                MessageBean proto = ((Message)value).getProto().copy();
+                proto.writeExternal(baos);
+                out.writeInt(baos.size());
+                out.write(baos.getData(), 0, baos.size());
+            } else if( value.getClass() == String.class ) {
+                out.writeByte(1);
+                String value2 = (String) value;
+                byte[] bytes = value2.getBytes("UTF-8");
+                out.writeInt(bytes.length);
+                out.write(bytes);
+            } else if( value.getClass() == DestinationBuffer.class ) {
+                out.writeByte(2);
+                DataByteArrayOutputStream baos = new 
DataByteArrayOutputStream();
+                DestinationBean proto = ((DestinationBuffer)value).copy();
+                proto.writeExternal(baos);
+                out.writeInt(baos.size());
+                out.write(baos.getData(), 0, baos.size());
+            }else if( value.getClass() == FlowControlBuffer.class ) {
+                out.writeByte(3);
+                DataByteArrayOutputStream baos = new 
DataByteArrayOutputStream();
+                FlowControlBean proto = ((FlowControlBuffer)value).copy();
+                proto.writeExternal(baos);
+                out.writeInt(baos.size());
+                out.write(baos.getData(), 0, baos.size());
+
+            } else {
+                throw new IOException("Unsupported type: "+value.getClass());
+            }
+        }
+
+        public Object unmarshal(DataInput in) throws IOException {
+            byte type = in.readByte();
+            int size = in.readInt();
+            switch(type) {
+                case 0: {
+                    MessageBean proto = new MessageBean();
+                    proto.readExternal(in);
+                    return new Message(proto.freeze());
+                }
+                case 1: {
+                    byte data[] = new byte[size];
+                    in.readFully(data);
+                    return new String(data, "UTF-8");
+                } case 2: {
+                    DestinationBean proto = new DestinationBean();
+                    proto.readExternal(in);
+                    return proto.freeze();
+                } case 3: {
+                    FlowControlBean proto = new FlowControlBean();
+                    proto.readExternal(in);
+                    return proto.freeze();
+                }
+                default:
+                    throw new IOException("Unknonw type byte: ");
+            }
+        }
+
+        public boolean marshal(Object value, ByteBuffer target) throws 
IOException
+        {
+            if(currentOut == null)
+            {
+                //Ensure room for type byte and length byte:
+                if(target.remaining() < 5)
+                {
+                    return false;
+                }
+                
+                if( value.getClass() == Message.class ) {
+                    DataByteArrayOutputStream baos = new 
DataByteArrayOutputStream();
+                    MessageBean proto = ((Message)value).getProto().copy();
+                    proto.writeExternal(baos);
+                       currentOut = ByteBuffer.wrap(baos.getData(), 0, 
baos.size());
+                       outType = 0;
+                } else if( value.getClass() == String.class ) {
+                       outType = 1;
+                    try {
+                        currentOut = 
ByteBuffer.wrap(((String)value).getBytes("utf-8"));
+                    } catch (UnsupportedEncodingException e) {
+                        //Shouldn't happen.
+                        throw IOExceptionSupport.create(e);
+                    }
+                } else if( value.getClass() == DestinationBuffer.class ) {
+                       outType = 2;
+                    
+                       DataByteArrayOutputStream baos = new 
DataByteArrayOutputStream();
+                       DestinationBean proto = 
((DestinationBuffer)value).copy();
+                    proto.writeExternal(baos);
+                    currentOut = ByteBuffer.wrap(baos.getData(), 0, 
baos.size());
+                }else if( value.getClass() == FlowControlBuffer.class ) {
+                       outType = 3;
+                    DataByteArrayOutputStream baos = new 
DataByteArrayOutputStream();
+                    FlowControlBean proto = ((FlowControlBuffer)value).copy();
+                    proto.writeExternal(baos);
+                }else {
+                    throw new IOException("Unsupported type: 
"+value.getClass());
+                }
+                
+                //Write type:
+                target.put(outType);
+                //Write length:
+                target.putInt(currentOut.remaining());
+                if(currentOut.remaining() > 1024*1024)
+                {
+                    throw new IOException("Packet exceeded max memory size!");
+                }
+            }
+            
+            //Avoid overflow:
+            if(currentOut.remaining() > target.remaining())
+            {
+                int limit = currentOut.limit();
+                currentOut.limit(currentOut.position() + target.remaining());
+                target.put(currentOut);
+                currentOut.limit(limit);
+            }
+            else
+            {
+                target.put(currentOut);
+            }
+            
+            if(!currentOut.hasRemaining())
+            {
+                currentOut = null;
+                return true;
+            }
+            return false;
+        }
+  
+        /**
+         * Unmarshals an object. When the object is read it is returned.
+         * @param source
+         * @return The object when unmarshalled, null otherwise
+         */
+        public Object unMarshal(ByteBuffer source) throws IOException
+        {
+            if(currentIn == null)
+            {
+                if(source.remaining() < 5)
+                {
+                    return null;
+                }
+                
+                inType = source.get();
+                int length = source.getInt();
+                if(length > 1024*1024)
+                {
+                    throw new IOException("Packet exceeded max memory size!");
+                }
+                currentIn = ByteBuffer.wrap(new byte[length]);
+                
+            }
+            
+            if(!source.hasRemaining())
+            {
+               return null;
+            }
+            
+            if(source.remaining() > currentIn.remaining())
+            {
+               int limit = source.limit();
+               source.limit(source.position() + currentIn.remaining());
+               currentIn.put(source);
+               source.limit(limit);
+            }
+            else
+            {
+               currentIn.put(source);
+            }
+            
+            //If we haven't finished the packet return to get more data:
+            if(currentIn.hasRemaining())
+            {
+               return null;
+            }
+            
+            Object ret = null;
+            switch(inType) {
+            case 0: {
+                DataByteArrayInputStream in = new 
DataByteArrayInputStream(currentIn.array());
+               MessageBean proto = new MessageBean();
+               proto.readExternal(in);
+               ret = new Message(proto.freeze());
+               break;
+            }
+            case 1: {
+               ret = new String(currentIn.array(), "utf-8");
+               break;
+            }
+               case 2: {
+                DataByteArrayInputStream in = new 
DataByteArrayInputStream(currentIn.array());
+                DestinationBean proto = new DestinationBean();
+                proto.readExternal(in);
+                       ret = proto.freeze();
+                       break;
+               }
+               case 3: {
+                DataByteArrayInputStream in = new 
DataByteArrayInputStream(currentIn.array());
+                FlowControlBean proto = new FlowControlBean();
+                proto.readExternal(in);
+                ret = proto.freeze();
+                       break;
+               }
+               default:
+                       throw new IOException("Unknown type byte: " + inType);
+            }
+            
+            currentIn = null;
+            return ret;
+        }
+        
+        public int getVersion() {
+            return 0;
+        }
+        public void setVersion(int version) {
+        }
+
+        public boolean inReceive() {
+            return false;
+        }
+
+        public ByteSequence marshal(Object value) throws IOException {
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+            marshal(value, os);
+            return os.toByteSequence();
+        }
+        
+        public Object unmarshal(ByteSequence data) throws IOException {
+            DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+            return unmarshal(is);
+        }
+    }
+
+       public WireFormat createWireFormat() {
+               return new TestWireFormat();
+       }       
+
+}

Modified: 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=745938&r1=745937&r2=745938&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
 Thu Feb 19 17:30:11 2009
@@ -11,19 +11,21 @@
 import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 
 public class Router {
-    final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new 
HashMap<Destination, Collection<DeliveryTarget>>();
+    final HashMap<String, Collection<DeliveryTarget>> lookupTable = new 
HashMap<String, Collection<DeliveryTarget>>();
 
     final synchronized void bind(DeliveryTarget dt, Destination destination) {
-        Collection<DeliveryTarget> targets = lookupTable.get(destination);
+        String key = destination.getName();
+        Collection<DeliveryTarget> targets = lookupTable.get(key);
         if (targets == null) {
             targets = new ArrayList<DeliveryTarget>();
-            lookupTable.put(destination, targets);
+            lookupTable.put(key, targets);
         }
         targets.add(dt);
     }
 
     final void route(ISourceController<Message> source, Message msg) {
-        Collection<DeliveryTarget> targets = 
lookupTable.get(msg.getDestination());
+        String key = msg.getDestination().getName();
+        Collection<DeliveryTarget> targets = lookupTable.get(key);
         if( targets == null ) 
             return;
         for (DeliveryTarget dt : targets) {

Added: 
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2?rev=745938&view=auto
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
 (added)
+++ 
activemq/sandbox/activemq-flow/src/test/resources/META-INF/services/org/apache/activemq/wireformat/proto2
 Thu Feb 19 17:30:11 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.flow.Proto2WireFormatFactory


Reply via email to