Author: chirino
Date: Fri Aug 19 20:36:42 2011
New Revision: 1159776
URL: http://svn.apache.org/viewvc?rev=1159776&view=rev
Log:
Fixing leaky abstraction: Changed signature of the locator field from
AtomicLong to AtomicReference[Array[Byte]] so that stores can be as creative as
they want when constructing locators.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Aug 19 20:36:42 2011
@@ -165,7 +165,7 @@ class BDBStore(var config:BDBStoreDTO) e
load_source.resume
- def load_message(messageKey: Long, locator:AtomicLong)(callback:
(Option[MessageRecord]) => Unit) = {
+ def load_message(messageKey: Long,
locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) =>
Unit) = {
message_load_latency_counter.start { end=>
load_source.merge((messageKey, { (result)=>
end()
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri
Aug 19 20:36:42 2011
@@ -49,5 +49,5 @@ message QueueEntryPB {
optional bytes attachment=5;
optional int32 redeliveries = 6;
optional sint64 expiration=7;
- optional int64 messageLocator=8;
+ optional bytes messageLocator=8;
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Fri Aug 19 20:36:42 2011
@@ -22,7 +22,7 @@ import protocol.Protocol
import org.apache.activemq.apollo.filter.Filterable
import org.apache.activemq.apollo.broker.store.StoreUOW
import org.apache.activemq.apollo.util.Log
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
object DeliveryProducer extends Log
@@ -175,7 +175,7 @@ class Delivery {
* After the store persists the message he may be able to supply us with
locator handle
* which will load the message faster than looking it up via the store key.
*/
- var storeLocator:AtomicLong = null
+ var storeLocator:AtomicReference[Array[Byte]] = null
/**
* The transaction the delivery is participating in.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Aug 19 20:36:42 2011
@@ -28,7 +28,8 @@ import org.fusesource.hawtdispatch.{List
import OptionSupport._
import security.SecurityContext
import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicInteger}
+import org.fusesource.hawtbuf.Buffer
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -829,7 +830,8 @@ class QueueEntry(val queue:Queue, val se
}
def init(qer:QueueEntryRecord):QueueEntry = {
- state = new Swapped(qer.message_key, new AtomicLong(qer.message_locator),
qer.size, qer.expiration)
+ val locator = new
AtomicReference[Array[Byte]](Option(qer.message_locator).map(_.toByteArray).getOrElse(null))
+ state = new Swapped(qer.message_key, locator, qer.size, qer.expiration)
this
}
@@ -888,7 +890,7 @@ class QueueEntry(val queue:Queue, val se
qer.queue_key = queue.store_id
qer.entry_seq = seq
qer.message_key = state.message_key
- qer.message_locator = Option(state.message_locator).map(_.get).getOrElse(0)
+ qer.message_locator = Option(state.message_locator).flatMap(x=>
Option(x.get)).map(new Buffer(_)).getOrElse(null)
qer.size = state.size
qer.expiration = expiration
qer
@@ -979,7 +981,7 @@ class QueueEntry(val queue:Queue, val se
*/
def message_key = -1L
- def message_locator: AtomicLong = null
+ def message_locator: AtomicReference[Array[Byte]] = null
/**
* Attempts to dispatch the current entry to the subscriptions position at
the entry.
@@ -1148,7 +1150,7 @@ class QueueEntry(val queue:Queue, val se
delivery.uow = queue.virtual_host.store.create_uow
val uow = delivery.uow
- delivery.storeLocator = new AtomicLong()
+ delivery.storeLocator = new AtomicReference[Array[Byte]]()
delivery.storeKey = uow.store(delivery.createMessageRecord )
store
if( asap ) {
@@ -1330,7 +1332,7 @@ class QueueEntry(val queue:Queue, val se
* entry is persisted, it can move into this state. This state only holds
onto the
* the massage key so that it can reload the message from the store quickly
when needed.
*/
- class Swapped(override val message_key:Long, override val
message_locator:AtomicLong, override val size:Int, override val
expiration:Long) extends EntryState {
+ class Swapped(override val message_key:Long, override val
message_locator:AtomicReference[Array[Byte]], override val size:Int, override
val expiration:Long) extends EntryState {
queue.individual_swapped_items += 1
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Fri Aug 19 20:36:42 2011
@@ -23,7 +23,7 @@ import org.apache.activemq.apollo.dto._
import security.SecurityContext
import store.StoreUOW
import util.continuations._
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -162,7 +162,7 @@ abstract class DeliveryProducerRoute(rou
} else {
copy.uow.retain
}
- copy.storeLocator = new AtomicLong()
+ copy.storeLocator = new AtomicReference[Array[Byte]]()
copy.storeKey = copy.uow.store(copy.createMessageRecord)
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
Fri Aug 19 20:36:42 2011
@@ -19,7 +19,7 @@ package org.apache.activemq.apollo.broke
import org.fusesource.hawtbuf.AsciiBuffer
import org.fusesource.hawtbuf.Buffer
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -32,6 +32,6 @@ class MessageRecord {
var buffer: Buffer = _
var zero_copy_buffer: ZeroCopyBuffer = _
var expiration = 0L
- var locator:AtomicLong = _
+ var locator:AtomicReference[Array[Byte]] = _
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
Fri Aug 19 20:36:42 2011
@@ -29,7 +29,7 @@ class QueueEntryRecord {
var queue_key = 0L
var entry_seq = 0L
var message_key = 0L
- var message_locator = 0L
+ var message_locator:Buffer = _
var attachment:Buffer = _
var size = 0
var expiration = 0L
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Aug 19 20:36:42 2011
@@ -20,7 +20,7 @@ import org.apache.activemq.apollo.dto.St
import org.apache.activemq.apollo.util._
import java.io.{InputStream, OutputStream}
import scala.util.continuations._
-import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
trait StreamManager[A] {
def using_queue_stream(func: (A)=>Unit)
@@ -109,7 +109,7 @@ trait Store extends ServiceTrait {
/**
* Loads a delivery with the associated id from persistent storage.
*/
- def load_message(messageKey:Long,
locator:AtomicLong)(callback:(Option[MessageRecord])=>Unit )
+ def load_message(messageKey:Long,
locator:AtomicReference[Array[Byte]])(callback:(Option[MessageRecord])=>Unit )
/**
* Exports the contents of the store to the provided streams. Each stream
should contain
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
Fri Aug 19 20:36:42 2011
@@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfterEach
import collection.mutable.ListBuffer
import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport,
LongCounter}
import java.util.concurrent.atomic._
+import org.fusesource.hawtbuf.Buffer
/**
* <p>Implements generic testing of Store implementations.</p>
@@ -99,7 +100,7 @@ abstract class StoreBenchmarkSupport ext
message.protocol = ascii("test-protocol")
message.buffer = ascii(content).buffer
message.size = message.buffer.length
- message.locator = new AtomicLong()
+ message.locator = new AtomicReference[Array[Byte]]()
batch.store(message)
}
@@ -191,7 +192,7 @@ abstract class StoreBenchmarkSupport ext
var keys = message_keys.toList
val metric = benchmarkCount(keys.size) {
val latch = new CountDownLatch(1)
- store.load_message(keys.head, new AtomicLong(0)) { msg=>
+ store.load_message(keys.head, null) { msg=>
assert(msg.isDefined, "message key not found: "+keys.head)
latch.countDown
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
Fri Aug 19 20:36:42 2011
@@ -139,7 +139,7 @@ abstract class StoreFunSuiteSupport exte
val A = add_queue("A")
val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
- val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head,
new AtomicLong())(cb) )
+ val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head,
null)(cb) )
expect(ascii("message 1").buffer) {
rc.get.buffer
}
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
Fri Aug 19 20:36:42 2011
@@ -178,7 +178,7 @@ class HawtDBStore(var config:HawtDBStore
}
}
- def load_message(messageKey: Long, locator:AtomicLong)(callback:
(Option[MessageRecord]) => Unit) = {
+ def load_message(messageKey: Long,
locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) =>
Unit) = {
message_load_latency_counter.start { end=>
load_source.merge((messageKey, { (result)=>
end()
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1159776&r1=1159775&r2=1159776&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Aug 19 20:36:42 2011
@@ -153,7 +153,7 @@ class JDBM2Store(var config:JDBM2StoreDT
load_source.resume
- def load_message(messageKey: Long, locator:AtomicLong)(callback:
(Option[MessageRecord]) => Unit) = {
+ def load_message(messageKey: Long,
locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) =>
Unit) = {
message_load_latency_counter.start { end=>
load_source.merge((messageKey, { (result)=>
end()