Author: chirino
Date: Thu Jan 6 20:56:17 2011
New Revision: 1056071
URL: http://svn.apache.org/viewvc?rev=1056071&view=rev
Log:
Re structuring how direct buffers work.
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
- copied, changed from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
- copied, changed from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
- copied, changed from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
- copied, changed from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java
Removed:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueStatus.java
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pool-factory.index
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBDirectBufferPool.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBDirectBufferPoolFactory.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/DirectBufferPool.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/DirectBufferPoolFactory.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Thu Jan 6 20:56:17 2011
@@ -22,7 +22,6 @@ import _root_.scala.collection.JavaConve
import org.fusesource.hawtdispatch._
import java.util.concurrent.TimeUnit
-import org.apache.activemq.apollo.broker.store.{Store, StoreFactory}
import org.apache.activemq.apollo.util._
import path.PathFilter
import ReporterLevel._
@@ -33,6 +32,7 @@ import org.apache.activemq.apollo.util.O
import org.apache.activemq.apollo.util.path.{Path, PathParser}
import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO,
VirtualHostDTO}
import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer}
+import org.apache.activemq.apollo.broker.store.{DirectBufferAllocator, Store,
StoreFactory}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -81,7 +81,6 @@ class VirtualHost(val broker: Broker, va
var names:List[String] = Nil;
var store:Store = null
- var direct_buffer_pool:DirectBufferPool = null
val queue_id_counter = new LongCounter
val session_counter = new AtomicLong(0)
@@ -106,7 +105,6 @@ class VirtualHost(val broker: Broker, va
}
} |>>: dispatch_queue
-
override protected def _start(on_completed:Runnable):Unit = {
val tracker = new LoggingTracker("virtual host startup", dispatch_queue)
@@ -129,19 +127,6 @@ class VirtualHost(val broker: Broker, va
store = StoreFactory.create(config.store)
- // val memory_pool_config: String = null
- var direct_buffer_pool_config: String = "hawtdb:activemq.tmp"
-
- if( direct_buffer_pool_config!=null && (store!=null &&
!store.supports_direct_buffers) ) {
- warn("The direct buffer pool will not be used because the configured
store does not support them.")
- direct_buffer_pool_config = null
- }
-
- if( direct_buffer_pool_config!=null ) {
- direct_buffer_pool =
DirectBufferPoolFactory.create(direct_buffer_pool_config)
- direct_buffer_pool.start
- }
-
if( store!=null ) {
store.configure(config.store, LoggingReporter(VirtualHost))
val store_startup_done = tracker.task("store startup")
@@ -204,11 +189,6 @@ class VirtualHost(val broker: Broker, va
router.queues.valuesIterator.foreach { queue=>
tracker.stop(queue)
}
- if( direct_buffer_pool!=null ) {
- direct_buffer_pool.stop
- direct_buffer_pool = null
- }
-
if( store!=null ) {
tracker.stop(store);
}
Added:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala?rev=1056071&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
Thu Jan 6 20:56:17 2011
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store
+
+import org.fusesource.hawtdispatch._
+import java.nio.channels.{FileChannel, WritableByteChannel,
ReadableByteChannel}
+import java.nio.ByteBuffer
+import java.io._
+import org.apache.activemq.apollo.util._
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait DirectBufferAllocator {
+ def alloc(size:Int):DirectBuffer
+}
+
+/**
+ * <p>
+ * A DirectBuffer is a reference counted buffer on
+ * temp storage designed to be accessed with direct io.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait DirectBuffer extends Retained {
+
+ def size:Int
+
+ def remaining(from_position: Int): Int
+
+ def read(target: OutputStream):Unit
+
+ def read(src: Int, target: WritableByteChannel): Int
+
+ def write(src:ReadableByteChannel, target:Int): Int
+
+ def write(src:ByteBuffer, target:Int):Int
+
+ def link(target:File):Long
+}
+
+trait FileDirectBufferTrait extends DirectBuffer {
+
+ def offset:Long
+ def channel:FileChannel
+
+ def remaining(pos: Int): Int = size-pos
+
+ def read(src: Int, target: WritableByteChannel): Int = {
+ assert(retained > 0)
+ val count: Int = remaining(src)
+ assert(count>=0)
+ channel.transferTo(offset+src, count, target).toInt
+ }
+
+ def read(target: OutputStream): Unit = {
+ assert(retained > 0)
+ val b = ByteBuffer.allocate(size.min(1024*4))
+ var pos = 0
+ while( remaining(pos)> 0 ) {
+ val count = channel.read(b, offset+pos)
+ if( count == -1 ) {
+ throw new EOFException()
+ }
+ target.write(b.array, 0, count)
+ pos += count
+ b.clear
+ }
+ }
+
+ def write(src: ReadableByteChannel, target:Int): Int = {
+ assert(retained > 0)
+ val count: Int = remaining(target)
+ assert(count>=0)
+ channel.transferFrom(src, offset+target, count).toInt
+ }
+
+ def write(src: ByteBuffer, target: Int): Int = {
+ assert(retained > 0)
+ val diff = src.remaining - remaining(target)
+ if( diff > 0 ) {
+ src.limit(src.limit-diff)
+ }
+ try {
+ channel.write(src, offset+target).toInt
+ } finally {
+ if( diff > 0 ) {
+ src.limit(src.limit+diff)
+ }
+ }
+ }
+
+ def link(target: File): Long = {
+ assert(retained > 0)
+ // TODO: implement with a real file system hard link
+ // to get copy on write goodness.
+ import FileSupport._
+ using(new FileOutputStream(target).getChannel) { target=>
+ val count = channel.transferTo(offset, size, target)
+ assert( count == size )
+ }
+ return 0;
+ }
+}
+
+case class Allocation(size:Long, offset:Long) extends Ordered[Allocation] {
+
+ var _free_func: (Allocation)=>Unit = _
+
+ def free() = {
+ _free_func(this)
+ }
+
+ def compare(that: Allocation): Int = {
+ var rc = longWrapper(size).compareTo(that.size)
+ if( rc!=0 ) {
+ rc
+ } else {
+ longWrapper(offset).compareTo(that.offset)
+ }
+ }
+
+ // split the allocation..
+ def split(request:Long):(Allocation, Allocation) = {
+ assert(request < size)
+ var first = Allocation(offset, request)
+ var second = Allocation(offset+request, size-request)
+ (first, second)
+ }
+
+ // join the allocation..
+ def join(that:Allocation):Allocation = {
+ assert( that.offset == offset+size)
+ Allocation(offset, size+that.size)
+ }
+
+}
+
+trait Allocator {
+ def alloc(request:Long):Allocation
+
+ def chain(that:Allocator):Allocator = new Allocator() {
+ def alloc(request: Long): Allocation = {
+ val rc = Allocator.this.alloc(request)
+ if( rc == null ) {
+ that.alloc(request)
+ } else {
+ rc
+ }
+ }
+ }
+}
+
+class TreeAllocator(range:Allocation) extends Allocator {
+
+ // list of the free allocation areas. Sorted by size then offset
+ val free_by_size = new TreeMap[Allocation, Zilch]()
+ // list of the free allocation areas sorted by offset.
+ val free_by_offset = new TreeMap[Long, Allocation]()
+
+ {
+ val allocation = range.copy()
+ free_by_offset.put(allocation.offset, allocation)
+ free_by_size.put(allocation, null)
+ }
+
+ def alloc(request:Long):Allocation = {
+ var spot_entry = free_by_size.ceilingEntry(Allocation(request, 0))
+ if( spot_entry== null ) {
+ return null
+ }
+
+ val allocation = spot_entry.getKey
+ free_by_size.remove(allocation)
+
+ // might be the perfect size
+ if( allocation.size == request ) {
+ allocation._free_func = free
+ allocation
+ } else {
+ // split the allocation..
+ var (first, second) = allocation.split(request)
+
+ free_by_offset.remove(first.offset)
+ free_by_offset.put(second.offset, second)
+
+ // put the free part in the free map.
+ free_by_size.put(second, null)
+
+ first._free_func = free
+ first
+ }
+ }
+
+ def free(allocation:Allocation):Unit = {
+
+
+ var prev_e = free_by_offset.floorEntry(allocation.offset)
+ var next_e = if( prev_e!=null ) {
+ prev_e.next
+ } else {
+ free_by_offset.ceilingEntry(allocation.offset)
+ }
+
+ val prev = Option(prev_e).map(_.getValue).map( a=> if(a.offset+a.size ==
allocation.offset) a else null ).getOrElse(null)
+ val next = Option(prev_e).map(_.getValue).map( a=>
if(allocation.offset+allocation.size == a.offset) a else null ).getOrElse(null)
+
+ (prev, next) match {
+ case (null, null)=>
+ allocation._free_func = null
+ free_by_size.put(allocation, null)
+ free_by_offset.put(allocation.offset, allocation)
+
+ case (prev, null)=>
+ val joined = prev.join(allocation)
+ free_by_size.remove(prev)
+ free_by_size.put(joined, null)
+ free_by_offset.put(joined.offset, joined)
+
+ case (null, next)=>
+ val joined = allocation.join(next)
+ free_by_size.remove(next)
+ free_by_size.put(joined, null)
+
+ free_by_offset.remove(next.offset)
+ free_by_offset.put(joined.offset, joined)
+
+ case (prev, next)=>
+ val joined = prev.join(allocation.join(next))
+ free_by_size.remove(prev)
+ free_by_size.remove(next)
+ free_by_size.put(joined, null)
+
+ free_by_offset.remove(next.offset)
+ free_by_offset.put(joined.offset, joined)
+ }
+ }
+}
+
+/**
+ * Helps minimize the active page set.
+ */
+class ActiveAllocator(val range:Allocation) extends Allocator {
+
+ // the cold allocated start with all the free space..
+ val inactive = new TreeAllocator(range)
+
+ // the hot is clear of any free space.
+ val active = new TreeAllocator(range)
+
+ active.free_by_offset.clear
+ active.free_by_size.clear
+
+ // allocate out of the hot area first since
+ // that should result in less vm swapping
+ val chain = active.chain(inactive)
+
+ def alloc(request:Long):Allocation = {
+ var rc = chain.alloc(request)
+ if( rc!=null ) {
+ rc._free_func = free
+ }
+ rc
+ }
+
+ def free(allocation:Allocation):Unit = {
+ // put stuff back in the hot tree.
+ active.free(allocation)
+ }
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class FileDirectBufferAllocator(val directory:File) extends
DirectBufferAllocator {
+
+ // we use thread local allocators to
+ class AllocatorContext(val queue:DispatchQueue) {
+
+ val allocator = new TreeAllocator(Allocation(0, Long.MaxValue))
+ var channel:FileChannel = new RandomAccessFile(queue.getLabel,
"rw").getChannel
+
+ class AllocationBuffer(val allocation:Allocation) extends BaseRetained
with FileDirectBufferTrait {
+ def channel: FileChannel = AllocatorContext.this.channel
+ def offset: Long = allocation.offset
+ def size: Int = allocation.size.toInt
+
+ override def dispose: Unit = {
+ super.dispose
+ // since we might not get disposed from the same thread
+ // that did the allocation..
+ queue <<| ^{
+ allocation.free()
+ }
+ }
+ }
+
+ def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+ val allocation = allocator.alloc(size)
+ assert(allocation!=null)
+ new AllocationBuffer(allocation)
+ }
+ }
+
+ val _current_allocator_context = new ThreadLocal[AllocatorContext]()
+
+ protected def start() = {
+ directory.mkdirs
+ }
+
+ def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+ ctx.alloc(size)
+ }
+
+ def with_allocator_context[T](func: (AllocatorContext)=>T):T = {
+ if( getCurrentThreadQueue == null ) {
+ getGlobalQueue().future(func(current_allocator_context))()
+ } else {
+ func(current_allocator_context)
+ }
+ }
+
+ def current_allocator_context:AllocatorContext = {
+ val thread_queue = getCurrentThreadQueue
+ assert(thread_queue != null)
+ var rc = _current_allocator_context.get
+ if( rc==null ) {
+ rc = new AllocatorContext(thread_queue)
+ _current_allocator_context.set(rc)
+ }
+ rc
+ }
+}
\ No newline at end of file
Copied:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
(from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
Thu Jan 6 20:56:17 2011
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -14,22 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
-import org.apache.activemq.apollo.util.DirectBuffer;
-import org.fusesource.hawtbuf.AsciiBuffer;
-import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.AsciiBuffer
+import org.fusesource.hawtbuf.Buffer
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class MessageRecord {
+class MessageRecord {
- public long key = -1;
- public AsciiBuffer protocol;
- public int size;
- public Buffer buffer;
- public DirectBuffer direct_buffer = null;
- public long expiration = 0;
+ var key = -1L
+ var protocol: AsciiBuffer = _
+ var size = 0
+ var buffer: Buffer = _
+ var direct_buffer: DirectBuffer = _
+ var expiration = 0L
}
Copied:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
(from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
Thu Jan 6 20:56:17 2011
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueEntryRange {
- public long first_entry_seq;
- public long last_entry_seq;
- public int count;
- public int size;
+class QueueEntryRange {
+ var first_entry_seq = 0L
+ var last_entry_seq = 0L
+ var count = 0
+ var size = 0
}
\ No newline at end of file
Copied:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
(from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
Thu Jan 6 20:56:17 2011
@@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@@ -14,20 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
-import org.fusesource.hawtbuf.Buffer;
+
+
+import org.fusesource.hawtbuf.Buffer
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueEntryRecord {
+class QueueEntryRecord {
- public long queue_key;
- public long entry_seq;
- public long message_key;
- public Buffer attachment;
- public int size;
- public short redeliveries;
+ var queue_key = 0L
+ var entry_seq = 0L
+ var message_key = 0L
+ var attachment:Buffer = _
+ var size = 0
+ var redeliveries:Short = 0
}
Copied:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
(from r1056021,
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
Thu Jan 6 20:56:17 2011
@@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
+
+;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
@@ -23,8 +25,8 @@ import org.fusesource.hawtbuf.Buffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class QueueRecord {
- public long key = -1;
- public AsciiBuffer binding_kind;
- public Buffer binding_data;
+class QueueRecord {
+ var key = -1L
+ var binding_kind: AsciiBuffer = _
+ var binding_data: Buffer = _
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Thu Jan 6 20:56:17 2011
@@ -43,7 +43,7 @@ trait Store extends ServiceTrait {
* @returns true if the store implementation can handle accepting
* MessageRecords with DirectBuffers in them.
*/
- def supports_direct_buffers() = false
+ def direct_buffer_allocator():DirectBufferAllocator = null
/**
* Creates a store uow which is used to perform persistent
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Thu Jan 6 20:56:17 2011
@@ -27,10 +27,10 @@ import _root_.scala.collection.JavaConve
import java.io.{EOFException, DataOutput, DataInput, IOException}
import java.nio.channels.{SocketChannel, WritableByteChannel,
ReadableByteChannel}
import org.apache.activemq.apollo.transport._
-import org.apache.activemq.apollo.broker.store.MessageRecord
import _root_.org.fusesource.hawtbuf._
import Buffer._
import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker.store.{DirectBuffer,
DirectBufferAllocator, MessageRecord}
object StompCodec extends Log {
val READ_BUFFFER_SIZE = 1024*64;
@@ -157,7 +157,7 @@ class StompCodec extends ProtocolCodec w
import StompCodec._
override protected def log: Log = StompCodec
- var memory_pool:DirectBufferPool = null
+ var direct_buffer_allocator:DirectBufferAllocator = null
implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
implicit def wrap(x: Byte) = {
@@ -178,12 +178,11 @@ class StompCodec extends ProtocolCodec w
var write_channel:WritableByteChannel = null
var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
- var next_write_direct:ByteBuffer = null
- var next_write_direct_frame:StompFrame = null
+ var next_write_direct:DirectBuffer = null
var write_buffer = ByteBuffer.allocate(0)
- var write_direct:ByteBuffer = null
- var write_direct_frame:StompFrame = null
+ var write_direct:DirectBuffer = null
+ var write_direct_pos = 0
def is_full = next_write_direct!=null || next_write_buffer.size() >=
(write_buffer_size >> 2)
def is_empty = write_buffer.remaining() == 0 && write_direct==null
@@ -252,9 +251,8 @@ class StompCodec extends ProtocolCodec w
frame.content match {
case x:DirectContent=>
- next_write_direct = x.direct_buffer.buffer.duplicate
- next_write_direct.clear
- next_write_direct_frame = frame
+ assert(next_write_direct==null)
+ next_write_direct = x.direct_buffer
case x:BufferContent=>
x.content.writeTo(os)
END_OF_FRAME_BUFFER.writeTo(os)
@@ -272,25 +270,28 @@ class StompCodec extends ProtocolCodec w
write_counter += write_channel.write(write_buffer)
}
if ( write_buffer.remaining() == 0 && write_direct!=null ) {
- write_counter += write_channel.write(write_direct)
- if( write_direct.remaining() == 0 ) {
+ val count = write_direct.read(write_direct_pos, write_channel)
+ write_direct_pos += count
+ write_counter += count
+
+ if( write_direct.remaining(write_direct_pos) == 0 ) {
+ write_direct.release
write_direct = null
- write_direct_frame.release
- write_direct_frame = null
+ write_direct_pos = 0
+
+ write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data)
}
}
// if it is now empty try to refill...
- if ( is_empty && next_write_buffer.size()!=0 ) {
+ if ( is_empty && write_direct==null ) {
// size of next buffer is based on how much was used in the previous
buffer.
val prev_size =
(write_buffer.position()+512).max(512).min(write_buffer_size)
write_buffer = next_write_buffer.toBuffer().toByteBuffer()
write_direct = next_write_direct
- write_direct_frame = next_write_direct_frame
next_write_buffer = new DataByteArrayOutputStream(prev_size)
next_write_direct = null
- next_write_direct_frame = null
}
if ( is_empty ) {
@@ -317,6 +318,10 @@ class StompCodec extends ProtocolCodec w
var read_buffer = ByteBuffer.allocate(read_buffer_size)
var read_end = 0
var read_start = 0
+
+ var read_direct:DirectBuffer = null
+ var read_direct_pos = 0
+
var next_action:FrameReader = read_action
def setReadableByteChannel(channel: ReadableByteChannel) = {
@@ -339,7 +344,13 @@ class StompCodec extends ProtocolCodec w
var command:Object = null
while( command==null ) {
// do we need to read in more data???
- if (read_end == read_buffer.position()) {
+ if( read_direct!=null && read_direct.remaining(read_direct_pos) > 0) {
+ val count = read_direct.write(read_channel, read_direct_pos)
+ if (count == -1) {
+ throw new EOFException("Peer disconnected")
+ }
+ read_direct_pos += count
+ } else if (read_end == read_buffer.position() ) {
// do we need a new data buffer to read data into??
if (read_buffer.remaining() == 0) {
@@ -469,40 +480,23 @@ class StompCodec extends ProtocolCodec w
// lets try to keep the content of big message outside of the JVM's
garbage collection
// to keep the number of GCs down when moving big messages.
def is_message = action == SEND || action == MESSAGE
- if( length > 1024 && memory_pool!=null && is_message) {
-
- val ma = memory_pool.alloc(length+1)
+ if( length > 1024 && direct_buffer_allocator!=null && is_message) {
- val read_limit = buffer.position
- if( (read_limit-read_start) < length+1 ) {
- // buffer did not contain the fully stomp body
+ read_direct = direct_buffer_allocator.alloc(length)
- ma.buffer.put( buffer.array, read_start, read_limit-read_start )
+ val dup = buffer.duplicate
+ dup.position(read_start)
+ dup.limit(buffer.position)
- read_buffer = ma.buffer
- read_end = read_limit-read_start
- read_start = 0
-
- next_action = read_binary_body_direct(action, headers, ma)
-
- } else {
- // The current buffer already read in all the data...
-
- if( buffer.array()(read_start+length)!= 0 ) {
- throw new IOException("Expected null termintor after
"+length+" content bytes")
- }
+ // copy in the body the was read so far...
+ read_direct_pos = read_direct.write(dup, 0)
- // copy the body out to the direct buffer
- ma.buffer.put( buffer.array, read_start, read_limit-read_start )
-
- // and reposition to reuse non-direct space.
- buffer.position(read_start)
- read_end = read_start
-
- next_action = read_action
- rc = new StompFrame(ascii(action), headers.toList,
DirectContent(ma))
- }
+ // since it was copied.. reposition to re-use the copied area..
+ dup.compact
+ buffer.position(buffer.position - read_direct_pos)
+ read_end = read_start
+ next_action = read_binary_body_direct(action, headers, length)
} else {
next_action = read_binary_body(action, headers, length)
}
@@ -526,9 +520,17 @@ class StompCodec extends ProtocolCodec w
None
}
+ def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer,
contentLength:Int):FrameReader = (buffer)=> {
+ if( read_direct.remaining(read_direct_pos)==0 ) {
+ next_action = read_direct_terminator(action, headers, contentLength,
read_direct)
+ read_direct = null
+ read_direct_pos = 0
+ }
+ null
+ }
- def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer,
ma:DirectBuffer):FrameReader = (buffer)=> {
- if( read_content_direct(ma) ) {
+ def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer,
contentLength:Int, ma:DirectBuffer):FrameReader = (buffer)=> {
+ if( read_frame_terminator(buffer, contentLength) ) {
next_action = read_action
new StompFrame(ascii(action), headers.toList, DirectContent(ma))
} else {
@@ -536,22 +538,17 @@ class StompCodec extends ProtocolCodec w
}
}
- def read_content_direct(ma:DirectBuffer) = {
- val read_limit = ma.buffer.position
- if( read_limit < ma.size ) {
+ def read_frame_terminator(buffer:ByteBuffer, contentLength:Int):Boolean = {
+ val read_limit = buffer.position
+ if( (read_limit-read_start) < 1 ) {
read_end = read_limit
false
} else {
- ma.buffer.position(ma.size-1)
- if( ma.buffer.get != 0 ) {
- throw new IOException("Expected null termintor after
"+(ma.size-1)+" content bytes")
+ if( buffer.array()(read_start)!= 0 ) {
+ throw new IOException("Expected null termintor after
"+contentLength+" content bytes")
}
- ma.buffer.rewind
- ma.buffer.limit(ma.size-1)
-
- read_buffer = ByteBuffer.allocate(read_buffer_size)
- read_end = 0
- read_start = 0
+ read_end = read_start+1
+ read_start = read_end
true
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Thu Jan 6 20:56:17 2011
@@ -25,6 +25,7 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
import org.fusesource.hawtdispatch.BaseRetained
import java.io.{OutputStream, DataOutput}
+import org.apache.activemq.apollo.broker.store.DirectBuffer
/**
*
@@ -207,11 +208,10 @@ case class DirectContent(direct_buffer:D
def writeTo(os:OutputStream) = {
val buff = new Array[Byte](1024*4)
- val source = direct_buffer.buffer.duplicate
var remaining = direct_buffer.size-1
while( remaining> 0 ) {
val c = remaining.min(buff.length)
- source.get(buff, 0, c)
+ direct_buffer.read(os)
os.write(buff, 0, c)
remaining -= c
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Jan 6 20:56:17 2011
@@ -571,9 +571,9 @@ class StompProtocolHandler extends Proto
connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList))
- if( this.host.direct_buffer_pool!=null ) {
+ if( this.host.store!=null &&
this.host.store.direct_buffer_allocator!=null ) {
val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
- wf.memory_pool = this.host.direct_buffer_pool
+ wf.direct_buffer_allocator = this.host.store.direct_buffer_allocator
}
}