Author: chirino
Date: Mon Feb 16 16:02:25 2009
New Revision: 744953

URL: http://svn.apache.org/viewvc?rev=744953&view=rev
Log:
- The wire protocol used in the staeful interface of the ProtoWireFormatFactory 
did not match exactly what the non-statefull version did. Matched them up.
- Since to wireformat is doing the framing, swithced to using the 
toUnframedByteArray() methods to save a few bytes.


Modified:
    
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java

Modified: 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java?rev=744953&r1=744952&r2=744953&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
 (original)
+++ 
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/ProtoWireFormatFactory.java
 Mon Feb 16 16:02:25 2009
@@ -3,13 +3,12 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 
 import org.apache.activemq.flow.Commands.Destination;
 import org.apache.activemq.flow.Commands.FlowControl;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.StatefulWireFormat;
@@ -28,16 +27,28 @@
         public void marshal(Object value, DataOutput out) throws IOException {
             if( value.getClass() == Message.class ) {
                 out.writeByte(0);
-                ((Message)value).getProto().writeFramed((OutputStream)out);
+                Commands.Message proto = ((Message)value).getProto();
+                Buffer buffer = proto.toUnframedBuffer();
+                out.writeInt(buffer.getLength());
+                out.write(buffer.getData(), buffer.getOffset(), 
buffer.getLength());
             } else if( value.getClass() == String.class ) {
                 out.writeByte(1);
-                out.writeUTF((String) value);
+                String value2 = (String) value;
+                byte[] bytes = value2.getBytes("UTF-8");
+                out.writeInt(bytes.length);
+                out.write(bytes);
             } else if( value.getClass() == Destination.class ) {
                 out.writeByte(2);
-                ((Destination)value).writeFramed((OutputStream)out);
+                Destination proto = (Destination)value;
+                Buffer buffer = proto.toUnframedBuffer();
+                out.writeInt(buffer.getLength());
+                out.write(buffer.getData(), buffer.getOffset(), 
buffer.getLength());
             }else if( value.getClass() == FlowControl.class ) {
                 out.writeByte(3);
-                ((FlowControl)value).writeFramed((OutputStream)out);
+                FlowControl proto = (FlowControl)value;
+                Buffer buffer = proto.toUnframedBuffer();
+                out.writeInt(buffer.getLength());
+                out.write(buffer.getData(), buffer.getOffset(), 
buffer.getLength());
             } else {
                 throw new IOException("Unsupported type: "+value.getClass());
             }
@@ -45,20 +56,23 @@
 
         public Object unmarshal(DataInput in) throws IOException {
             byte type = in.readByte();
+            int size = in.readInt();
+            byte data[] = new byte[size];
+            in.readFully(data);
             switch(type) {
                 case 0:
                     Commands.Message m = new Commands.Message();
-                    m.mergeFramed((InputStream)in);
+                    m.mergeUnframed(data);
                     return new Message(m);
                 case 1:
-                    return in.readUTF();
+                    return new String(data, "UTF-8");
                 case 2:
                     Destination d = new Destination();
-                    d.mergeFramed((InputStream)in);
+                    d.mergeUnframed(data);
                     return d;
                 case 3:
                     FlowControl fc = new FlowControl();
-                    fc.mergeFramed((InputStream)in);
+                    fc.mergeUnframed(data);
                     return fc;
                 default:
                     throw new IOException("Unknonw type byte: ");
@@ -77,7 +91,7 @@
                 
                 if( value.getClass() == Message.class ) {
                        
-                       currentOut = 
ByteBuffer.wrap(((Message)value).getProto().toFramedByteArray());
+                       currentOut = 
ByteBuffer.wrap(((Message)value).getProto().toUnframedByteArray());
                        outType = 0;
                 } else if( value.getClass() == String.class ) {
                        outType = 1;
@@ -89,10 +103,10 @@
                     }
                 } else if( value.getClass() == Destination.class ) {
                        outType = 2;
-                    currentOut = 
ByteBuffer.wrap(((Destination)value).toFramedByteArray());
+                    currentOut = 
ByteBuffer.wrap(((Destination)value).toUnframedByteArray());
                 }else if( value.getClass() == FlowControl.class ) {
                        outType = 3;
-                    currentOut = 
ByteBuffer.wrap(((FlowControl)value).toFramedByteArray());
+                    currentOut = 
ByteBuffer.wrap(((FlowControl)value).toUnframedByteArray());
                 }else {
                     throw new IOException("Unsupported type: 
"+value.getClass());
                 }
@@ -181,7 +195,7 @@
                Commands.Message m = new Commands.Message();
                try
                {
-                       m.mergeFramed(currentIn.array());
+                       m.mergeUnframed(currentIn.array());
                }
                catch(Exception e)
                {
@@ -194,12 +208,12 @@
                break;
                case 2:
                        Destination d = new Destination();
-                       d.mergeFramed(currentIn.array());
+                       d.mergeUnframed(currentIn.array());
                        ret = d;
                        break;
                case 3:
                        FlowControl c = new FlowControl();
-                       c.mergeFramed(currentIn.array());
+                       c.mergeUnframed(currentIn.array());
                        ret = c;
                        break;
                default:


Reply via email to