Author: chirino
Date: Mon Feb 20 21:59:41 2012
New Revision: 1291476
URL: http://svn.apache.org/viewvc?rev=1291476&view=rev
Log:
Simplify the OpenWire codec by extending from the AbstractProtocolCodec.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/pom.xml
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala?rev=1291476&r1=1291475&r2=1291476&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireCodec.scala
Mon Feb 20 21:59:41 2012
@@ -18,16 +18,13 @@
package org.apache.activemq.apollo.openwire
import org.apache.activemq.apollo.broker.store.MessageRecord
-import org.fusesource.hawtdispatch.transport.ProtocolCodec
import OpenwireConstants._
-import java.nio.ByteBuffer
-import java.nio.channels.{SocketChannel, WritableByteChannel,
ReadableByteChannel}
-import java.io.EOFException
import org.apache.activemq.apollo.broker.{Sizer, Message}
import org.apache.activemq.apollo.openwire.codec.OpenWireFormat
import org.apache.activemq.apollo.openwire.command._
import org.apache.activemq.apollo.broker.BufferConversions._
-import org.fusesource.hawtbuf.{DataByteArrayInputStream,
DataByteArrayOutputStream, AbstractVarIntSupport, Buffer}
+import org.fusesource.hawtdispatch.transport.AbstractProtocolCodec
+import org.fusesource.hawtbuf._
case class CachedEncoding(tight:Boolean, version:Int, buffer:Buffer) extends
CachedEncodingTrait
@@ -40,7 +37,7 @@ case class CachedEncoding(tight:Boolean,
object OpenwireCodec extends Sizer[Command] {
final val DB_VERSION = OpenWireFormat.DEFAULT_VERSION
- final val DB_TIGHT_ENCODING = true
+ final val DB_TIGHT_ENCODING = false
def encode(message: Message):MessageRecord = {
val rc = new MessageRecord
@@ -105,187 +102,47 @@ object OpenwireCodec extends Sizer[Comma
}
}
-class OpenwireCodec extends ProtocolCodec {
-
- implicit def toBuffer(value:Array[Byte]):Buffer = new Buffer(value)
-
- def protocol = PROTOCOL
-
- var write_buffer_size = 1024*64;
- var write_counter = 0L
- var write_channel:WritableByteChannel = null
-
- var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
- var write_buffer = ByteBuffer.allocate(0)
+class OpenwireCodec extends AbstractProtocolCodec {
val format = new OpenWireFormat(1);
- def full = next_write_buffer.size() >= (write_buffer_size >> 1)
- def is_empty = write_buffer.remaining() == 0
-
- def setWritableByteChannel(channel: WritableByteChannel) = {
- this.write_channel = channel
- if( this.write_channel.isInstanceOf[SocketChannel] ) {
-
this.write_channel.asInstanceOf[SocketChannel].socket().setSendBufferSize(write_buffer_size);
- }
+ protected def encode(command: AnyRef) = {
+ format.marshal(command, nextWriteBuffer)
}
- def getWriteCounter = write_counter
-
- def write(command: Any):ProtocolCodec.BufferState = {
- if ( full) {
- ProtocolCodec.BufferState.FULL
- } else {
- val was_empty = is_empty
- command match {
- case command:ActiveMQMessage=>
- command.getCachedEncoding match {
- case CachedEncoding(tight, version, buffer) =>
- // We might be able to re-use the origin format of the message.
- if( !format.isCacheEnabled &&
format.isTightEncodingEnabled==tight && format.getVersion==version ) {
- next_write_buffer.write(buffer)
- } else {
- format.marshal(command, next_write_buffer)
+ private final val readHeader:AbstractProtocolCodec.Action = new
AbstractProtocolCodec.Action {
+ def apply = {
+ val header = peekBytes(4)
+ if( header==null ) {
+ null
+ } else {
+ val length = header.bigEndianEditor().readInt()
+ nextDecodeAction = new AbstractProtocolCodec.Action {
+ def apply() = {
+ val frame = readBytes(4+length)
+ if( frame==null ) {
+ null
+ } else {
+ val command = format.unmarshal(frame)
+ nextDecodeAction = readHeader
+ // If value caching is NOT enabled, then we potentially re-use
the encode
+ // value of the message.
+ command match {
+ case message:ActiveMQMessage =>
+ message.setEncodedSize(length)
+ if( !format.isCacheEnabled ) {
+
message.setCachedEncoding(CachedEncoding(format.isTightEncodingEnabled,
format.getVersion, frame))
+ }
+ case _ =>
}
- case _ =>
- format.marshal(command, next_write_buffer)
+ command
+ }
}
-
- case command:Command=>
- format.marshal(command, next_write_buffer)
- }
- if( was_empty ) {
- ProtocolCodec.BufferState.WAS_EMPTY
- } else {
- ProtocolCodec.BufferState.NOT_EMPTY
- }
- }
- }
-
- def flush():ProtocolCodec.BufferState = {
- // if we have a pending write that is being sent over the socket...
- if ( write_buffer.remaining() != 0 ) {
- write_counter += write_channel.write(write_buffer)
- }
-
- // if it is now empty try to refill...
- if ( is_empty && next_write_buffer.size()!=0 ) {
- // size of next buffer is based on how much was used in the previous
buffer.
- val prev_size =
(write_buffer.position()+512).max(512).min(write_buffer_size)
- write_buffer = next_write_buffer.toBuffer().toByteBuffer()
- next_write_buffer = new DataByteArrayOutputStream(prev_size)
- }
-
- if ( is_empty ) {
- ProtocolCodec.BufferState.EMPTY
- } else {
- ProtocolCodec.BufferState.NOT_EMPTY
- }
- }
-
- var read_counter = 0L
- var read_buffer_size = 1024*64
- var read_channel:ReadableByteChannel = null
-
- var read_buffer:ByteBuffer = ByteBuffer.allocate(4)
- var read_waiting_on = 4
-
- var next_action:()=>Command = read_header
-
- def setReadableByteChannel(channel: ReadableByteChannel) = {
- this.read_channel = channel
- if( this.read_channel.isInstanceOf[SocketChannel] ) {
-
this.read_channel.asInstanceOf[SocketChannel].socket().setReceiveBufferSize(read_buffer_size);
- }
- }
-
- def unread(buffer: Array[Byte]) = {
- assert(read_counter == 0)
- read_buffer = buffer.toByteBuffer
- read_buffer.position(read_buffer.limit)
- read_counter += buffer.length
- read_waiting_on -= buffer.length
- if ( read_waiting_on <= 0 ) {
- read_buffer.flip
- }
- }
-
- def getReadCounter = read_counter
-
- override def read():Object = {
-
- var command:Object = null
- while( command==null ) {
- // do we need to read in more data???
- if ( read_waiting_on > 0 ) {
-
- // Try to fill the buffer with data from the socket..
- var p = read_buffer.position()
- var count = read_channel.read(read_buffer)
- if (count == -1) {
- throw new EOFException("Peer disconnected")
- } else if (count == 0) {
- return null
- }
- read_counter += count
- read_waiting_on -= count
-
- if ( read_waiting_on <= 0 ) {
- read_buffer.flip
- }
-
- } else {
- command = next_action()
- if ( read_waiting_on > 0 ) {
- val next_buffer =
ByteBuffer.allocate(read_buffer.remaining+read_waiting_on)
- next_buffer.put(read_buffer)
- read_buffer = next_buffer
}
+ nextDecodeAction.apply()
}
}
- return command
}
-
- def read_header:()=>Command = ()=> {
-
- read_buffer.mark
- val size = read_buffer.getInt
- read_buffer.reset
-
- read_waiting_on += (size)
-
- next_action = read_command(size+4)
- null
- }
-
- def read_command(size:Int) = ()=> {
-
- val buf = new Buffer(read_buffer.array, read_buffer.position, size)
- val rc = format.unmarshal(buf)
- read_buffer.position(read_buffer.position+size)
-
- read_waiting_on += 4
- next_action = read_header
- var command: Command = rc.asInstanceOf[Command]
-
- // If value caching is NOT enabled, then we potentially re-use the encode
- // value of the message.
- command match {
- case message:ActiveMQMessage =>
- message.setEncodedSize(size)
- if( !format.isCacheEnabled ) {
-
message.setCachedEncoding(CachedEncoding(format.isTightEncodingEnabled,
format.getVersion, buf))
- }
- case _ =>
- }
- command
- }
-
- def getLastWriteSize = 0
-
- def getLastReadSize = 0
-
- def getWriteBufferSize = write_buffer_size
-
- def getReadBufferSize = read_buffer_size
+
+ protected def initialDecodeAction = readHeader
}
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1291476&r1=1291475&r2=1291476&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Mon Feb 20 21:59:41 2012
@@ -615,7 +615,7 @@ class OpenwireProtocolHandler extends Pr
case class OpenwireDeliveryProducerRoute(addresses:Array[SimpleAddress])
extends DeliveryProducerRoute(host.router) {
- override def send_buffer_size = codec.write_buffer_size
+ override def send_buffer_size = codec.getReadBufferSize
override def connection = Some(OpenwireProtocolHandler.this.connection)
override def dispatch_queue = queue
@@ -917,7 +917,7 @@ class OpenwireProtocolHandler extends Pr
override def connection = Some(OpenwireProtocolHandler.this.connection)
def is_persistent = false
- override def receive_buffer_size = codec.write_buffer_size
+ override def receive_buffer_size = codec.getWriteBufferSize*4
def matches(delivery:Delivery) = {
if( delivery.message.protocol eq OpenwireProtocol ) {
Modified: activemq/activemq-apollo/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1291476&r1=1291475&r2=1291476&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Mon Feb 20 21:59:41 2012
@@ -97,7 +97,7 @@
<felix-version>1.0.0</felix-version>
<hawtdispatch-version>1.9-SNAPSHOT</hawtdispatch-version>
- <hawtbuf-version>1.8</hawtbuf-version>
+ <hawtbuf-version>1.9-SNAPSHOT</hawtbuf-version>
<stompjms-version>1.8</stompjms-version>
<bdb-version>5.0.34</bdb-version>