Author: chirino
Date: Fri Aug 19 20:37:08 2011
New Revision: 1159777
URL: http://svn.apache.org/viewvc?rev=1159777&view=rev
Log:
Extend the store interface so that the broker can store arbitrary key/values.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Aug 19 20:37:08 2011
@@ -31,6 +31,7 @@ import scala.Some
import java.sql.ClientInfoStatus
import com.sleepycat.je._
import javax.management.remote.rmi._RMIConnection_Stub
+import org.fusesource.hawtbuf.Buffer
object BDBClient extends Log
/**
@@ -160,6 +161,14 @@ class BDBClient(store: BDBStore) {
_queues_db
}
+ private var _map_db:Database = _
+ def map_db:Database = {
+ if( _map_db==null ) {
+ _map_db = environment.openDatabase(tx, "map", buffer_key_conf)
+ }
+ _map_db
+ }
+
def close(ok:Boolean) = {
if( _messages_db!=null ) {
_messages_db.close
@@ -173,6 +182,9 @@ class BDBClient(store: BDBStore) {
if( _entries_db!=null ) {
_entries_db.close
}
+ if( _map_db!=null ) {
+ _map_db.close
+ }
if(ok){
tx.commit
@@ -324,6 +336,15 @@ class BDBClient(store: BDBStore) {
import ctx._
var zcp_files_to_sync = Set[Int]()
uows.foreach { uow =>
+
+ for((key,value) <- uow.map_actions) {
+ if( value==null ) {
+ map_db.delete(tx, key)
+ } else {
+ map_db.put(tx, key, value)
+ }
+ }
+
uow.actions.foreach {
case (msg, action) =>
@@ -518,6 +539,14 @@ class BDBClient(store: BDBStore) {
}
}
+
+ def get(key: Buffer):Option[Buffer] = {
+ with_ctx() { ctx=>
+ import ctx._
+ map_db.get(tx, to_database_entry(key)).map(x=> to_buffer(x))
+ }
+ }
+
def getLastQueueKey:Long = {
with_ctx() { ctx=>
import ctx._
@@ -532,6 +561,16 @@ class BDBClient(store: BDBStore) {
import ctx._
import PBSupport._
+ streams.using_map_stream { stream =>
+ map_db.cursor(tx) { (key,value) =>
+ val record = new MapEntryPB.Bean
+ record.setKey(key)
+ record.setValue(value)
+ record.freeze().writeFramed(stream)
+ true
+ }
+ }
+
streams.using_queue_stream { queue_stream =>
queues_db.cursor(tx) { (_, value) =>
val record:QueueRecord = value
@@ -605,6 +644,12 @@ class BDBClient(store: BDBStore) {
var zcp_counter = 0
val max_ctx = zero_copy_buffer_allocator.contexts.size
+ streams.using_map_stream { stream=>
+ foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
+ map_db.put(tx, pb.getKey, pb.getValue)
+ }
+ }
+
streams.using_message_stream { message_stream=>
foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Aug 19 20:37:08 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.apollo.dto.{S
import org.apache.activemq.apollo.util.OptionSupport._
import java.io.{InputStream, OutputStream, File}
import scala.util.continuations._
+import org.fusesource.hawtbuf.Buffer
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -127,6 +128,12 @@ class BDBStore(var config:BDBStoreDTO) e
}
+ def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
+ read_executor {
+ callback(client.get(key))
+ }
+ }
+
/**
* Ges the last queue key identifier stored.
*/
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
Fri Aug 19 20:37:08 2011
@@ -35,6 +35,9 @@ object HelperTrait {
implicit def to_queue_record(entry: DatabaseEntry): QueueRecord =
entry.getData
implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new
DatabaseEntry(v)
+ implicit def to_buffer(entry: DatabaseEntry): Buffer = new
Buffer(entry.getData)
+ implicit def to_database_entry(v: Buffer): DatabaseEntry = new
DatabaseEntry(v.toByteArray)
+
implicit def decode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
val in = new DataByteArrayInputStream(entry.getData)
(in.readVarInt(), in.readVarLong(), in.readVarInt())
@@ -124,6 +127,11 @@ object HelperTrait {
}
+ val buffer_key_conf = new DatabaseConfig();
+ buffer_key_conf.setAllowCreate(true)
+ buffer_key_conf.setTransactional(true);
+ buffer_key_conf.setSortedDuplicates(false);
+
val long_key_conf = new DatabaseConfig();
long_key_conf.setAllowCreate(true)
long_key_conf.setTransactional(true);
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri
Aug 19 20:37:08 2011
@@ -51,3 +51,8 @@ message QueueEntryPB {
optional sint64 expiration=7;
optional bytes messageLocator=8;
}
+
+message MapEntryPB {
+ required bytes key = 1;
+ optional bytes value = 2;
+}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Fri Aug 19 20:37:08 2011
@@ -25,6 +25,7 @@ import atomic.{AtomicReference, AtomicIn
import org.apache.activemq.apollo.util._
import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO,
IntMetricDTO}
+import org.fusesource.hawtbuf.Buffer
/**
* <p>
@@ -87,6 +88,12 @@ trait DelayingStoreSupport extends Store
var flush_listeners = ListBuffer[() => Unit]()
var disable_delay = false
+ var map_actions = Map[Buffer, Buffer]()
+
+ def put(key: Buffer, value: Buffer) = {
+ map_actions += (key -> value)
+ }
+
def on_flush(callback: =>Unit) = {
if( this.synchronized {
if( flushed ) {
@@ -121,7 +128,7 @@ trait DelayingStoreSupport extends Store
def rm(msg:Long) = {
actions -= msg
- if( actions.isEmpty ) {
+ if( actions.isEmpty && map_actions.isEmpty ) {
cancel
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Aug 19 20:37:08 2011
@@ -21,8 +21,10 @@ import org.apache.activemq.apollo.util._
import java.io.{InputStream, OutputStream}
import scala.util.continuations._
import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import org.fusesource.hawtbuf.Buffer
trait StreamManager[A] {
+ def using_map_stream(func: (A)=>Unit)
def using_queue_stream(func: (A)=>Unit)
def using_message_stream(func: (A)=>Unit)
def using_queue_entry_stream(func: (A)=>Unit)
@@ -76,6 +78,11 @@ trait Store extends ServiceTrait {
def remove_queue(queueKey:Long)(callback:(Boolean)=>Unit):Unit
/**
+ * Gets a value of a previously stored map entry.
+ */
+ def get(key:Buffer)(callback:(Option[Buffer])=>Unit )
+
+ /**
* Loads the queue information for a given queue key.
*/
def get_queue(queueKey:Long)(callback:(Option[QueueRecord])=>Unit )
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Fri Aug 19 20:37:08 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.apollo.broke
import org.fusesource.hawtdispatch.{Retained}
import org.apache.activemq.apollo.broker.store._
+import org.fusesource.hawtbuf.Buffer
/**
* A store uow is used to perform persistent
@@ -56,6 +57,12 @@ trait StoreUOW extends Retained {
def dequeue(entry:QueueEntryRecord)
/**
+ * Creates or updates a map entry. Set value to null to
+ * remove the entry.
+ */
+ def put(key:Buffer, value:Buffer)
+
+ /**
* Marks this uow as needing to be completed
* as soon as possible. If not called, the Store
* implementation may delay completing the uow in
Modified:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Fri Aug 19 20:37:08 2011
@@ -91,6 +91,7 @@ class StoreExport extends Action {
def using_queue_stream(func: (OutputStream) => Unit) =
entry("queues.dat", func)
def using_queue_entry_stream(func: (OutputStream) => Unit) =
entry("queue_entries.dat", func)
def using_message_stream(func: (OutputStream) => Unit) =
entry("messages.dat", func)
+ def using_map_stream(func: (OutputStream) => Unit) =
entry("map.dat", func)
}
reset {
val rc = store.export_pb(manager)
Modified:
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
Fri Aug 19 20:37:08 2011
@@ -96,6 +96,7 @@ class StoreImport extends Action {
def using_queue_stream(func: (InputStream) => Unit) =
entry("queues.dat", func)
def using_queue_entry_stream(func: (InputStream) => Unit) =
entry("queue_entries.dat", func)
def using_message_stream(func: (InputStream) => Unit) =
entry("messages.dat", func)
+ def using_map_stream(func: (InputStream) => Unit) = entry("map.dat",
func)
}
reset {
val rc = store.import_pb(manager)
Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto Fri
Aug 19 20:37:08 2011
@@ -29,10 +29,7 @@ enum Type {
ADD_QUEUE = 10;
REMOVE_QUEUE = 11;
- ADD_MAP = 30;
- REMOVE_MAP = 31;
- PUT_MAP_ENTRY = 32;
- REMOVE_MAP_ENTRY = 33;
+ MAP_ENTRY = 32;
ADD_SUBSCRIPTION = 50;
REMOVE_SUBSCRIPTION = 51;
@@ -109,24 +106,9 @@ message RemoveSubscription {
required bytes name = 1 [java_override_type = "AsciiBuffer"];
}
-
-///////////////////////////////////////////////////////////////
-// Map related operations.
-///////////////////////////////////////////////////////////////
-message AddMap {
- optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}
-message RemoveMap {
- optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}
-message PutMapEntry {
- optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
- optional bytes id = 2 [java_override_type = "AsciiBuffer"];
- optional bytes value = 3;
-}
-message RemoveMapEntry {
- optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
- optional bytes id = 2 [java_override_type = "AsciiBuffer"];
+message MapEntry {
+ required bytes key=1;
+ optional bytes value = 2;
}
///////////////////////////////////////////////////////////////
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
Fri Aug 19 20:37:08 2011
@@ -228,11 +228,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
_store(update, callback)
}
- def store(txs: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
+ def store(uows: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
var batch = ListBuffer[TypeCreatable]()
- txs.foreach {
- tx =>
- tx.actions.foreach {
+ uows.foreach { uow =>
+
+ for((key,value) <- uow.map_actions) {
+ val entry = new MapEntry.Bean
+ entry.setKey(key)
+ entry.setValue(value)
+ batch += entry
+ }
+
+ uow.actions.foreach {
case (msg, action) =>
if (action.message_record != null) {
val update: AddMessage.Bean = action.message_record
@@ -273,6 +280,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
rc
}
+ def get(key: Buffer):Option[Buffer] = {
+ withTx { tx =>
+ val helper = new TxHelper(tx)
+ import helper._
+ Option(mapIndex.get(key))
+ }
+ }
+
def getQueue(queueKey: Long): Option[QueueRecord] = {
withTx { tx =>
val helper = new TxHelper(tx)
@@ -897,13 +912,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
cleanup(_tx);
info("Store purged.");
- case x: AddSubscription.Getter =>
- case x: RemoveSubscription.Getter =>
-
- case x: AddMap.Getter =>
- case x: RemoveMap.Getter =>
- case x: PutMapEntry.Getter =>
- case x: RemoveMapEntry.Getter =>
+ case x: MapEntry.Getter =>
+ val value = x.getValue
+ if( value==null ) {
+ mapIndex.remove(x.getKey)
+ } else {
+ mapIndex.put(x.getKey, value)
+ }
}
}
@@ -996,6 +1011,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
private case class Update(update: TypeCreatable, location: Location)
private class TxHelper(val _tx: Transaction) {
+ lazy val mapIndex = MAP_INDEX_FACTORY.open(_tx, rootBuffer.getMapIndexPage)
lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx,
rootBuffer.getQueueIndexPage)
lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx,
rootBuffer.getDataFileRefIndexPage)
lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx,
rootBuffer.getMessageKeyIndexPage)
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
Fri Aug 19 20:37:08 2011
@@ -28,6 +28,7 @@ import org.fusesource.hawtdispatch.ListE
import org.apache.activemq.apollo.util.OptionSupport._
import java.io.{InputStream, OutputStream}
import scala.util.continuations._
+import org.fusesource.hawtbuf.Buffer
object HawtDBStore extends Log {
val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -144,6 +145,11 @@ class HawtDBStore(var config:HawtDBStore
}
}
+ def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
+ executor_pool {
+ callback(client.get(key))
+ }
+ }
/**
* Ges the last queue key identifier stored.
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
Fri Aug 19 20:37:08 2011
@@ -140,6 +140,12 @@ object Helpers {
QUEUE_INDEX_FACTORY.setValueCodec(QueueRootRecord.FRAMED_CODEC);
QUEUE_INDEX_FACTORY.setDeferredEncoding(true);
+ // maps queue key -> QueueRootRecord
+ val MAP_INDEX_FACTORY = new BTreeIndexFactory[Buffer, Buffer]();
+ MAP_INDEX_FACTORY.setKeyCodec(BufferCodec.INSTANCE);
+ MAP_INDEX_FACTORY.setValueCodec(BufferCodec.INSTANCE);
+ MAP_INDEX_FACTORY.setDeferredEncoding(true);
+
// maps queue seq -> AddQueueEntry
val QUEUE_ENTRY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long,
AddQueueEntry.Buffer]();
QUEUE_ENTRY_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE);
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Fri Aug 19 20:37:08 2011
@@ -31,6 +31,8 @@ import jdbm.helper._
import PBSupport._
import org.fusesource.hawtbuf.proto.PBMessageFactory
import java.io._
+import org.fusesource.hawtbuf.Buffer
+
object JDBM2Client extends Log {
object MessageRecordSerializer extends Serializer[MessagePB.Buffer] {
@@ -60,6 +62,19 @@ object JDBM2Client extends Log {
}
}
+ object BufferSerializer extends Serializer[Buffer] {
+ def serialize(out: SerializerOutput, v: Buffer) = {
+ out.writePackedInt(v.length())
+ out.write(v.data, v.offset, v.length)
+ }
+
+ def deserialize(in: SerializerInput) = {
+ val rc = new Buffer(in.readPackedInt());
+ in.readFully(rc.data)
+ rc
+ }
+ }
+
object QueueEntryKeySerializer extends Serializer[(Long,Long)] {
def serialize(out: SerializerOutput, v: (Long,Long)) = {
out.writePackedLong(v._1)
@@ -150,6 +165,7 @@ class JDBM2Client(store: JDBM2Store) {
var messages_db:HTree[Long, MessagePB.Buffer] = _
var zerocp_db:HTree[Long, (Int, Long, Int)] = _
var message_refs_db:HTree[Long, java.lang.Integer] = _
+ var map_db:HTree[Buffer, Buffer] = _
var last_message_key = 0L
var last_queue_key = 0L
@@ -203,6 +219,7 @@ class JDBM2Client(store: JDBM2Store) {
transaction {
messages_db = init_htree("messages", value_serializer =
MessageRecordSerializer)
+ map_db = init_htree("map", value_serializer = BufferSerializer,
key_serializer = BufferSerializer)
zerocp_db = init_htree("lobs", value_serializer =
ZeroCopyValueSerializer)
message_refs_db = init_htree("message_refs")
queues_db = init_htree("queues", value_serializer =
QueueRecordSerializer)
@@ -353,6 +370,15 @@ class JDBM2Client(store: JDBM2Store) {
transaction {
var zcp_files_to_sync = Set[Int]()
uows.foreach { uow =>
+
+ for((key,value) <- uow.map_actions) {
+ if( value==null ) {
+ map_db.remove(key)
+ } else {
+ map_db.put(key, value)
+ }
+ }
+
uow.actions.foreach { case (msg, action) =>
val message_record = action.message_record
@@ -478,11 +504,24 @@ class JDBM2Client(store: JDBM2Store) {
def getLastQueueKey:Long = last_queue_key
+ def get(key: Buffer):Option[Buffer] = {
+ Option(map_db.find(key))
+ }
def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
try {
import PBSupport._
+ streams.using_map_stream { stream=>
+ map_db.cursor { (key, value) =>
+ val record = new MapEntryPB.Bean
+ record.setKey(key)
+ record.setValue(value)
+ record.freeze().writeFramed(stream)
+ true
+ }
+ }
+
streams.using_queue_stream { queue_stream=>
queues_db.cursor { (_, value) =>
val record:QueueRecord = value
@@ -552,6 +591,13 @@ class JDBM2Client(store: JDBM2Store) {
import PBSupport._
+ streams.using_map_stream { stream=>
+ foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
+ map_db.put(pb.getKey, pb.getValue)
+ check_flush(1, 10000)
+ }
+ }
+
streams.using_queue_stream { queue_stream=>
foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb =>
val record:QueueRecord = pb
Modified:
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Aug 19 20:37:08 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.apollo.dto.St
import org.apache.activemq.apollo.util.OptionSupport._
import java.io.{InputStream, OutputStream}
import scala.util.continuations._
+import org.fusesource.hawtbuf.Buffer
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -115,6 +116,12 @@ class JDBM2Store(var config:JDBM2StoreDT
}
+ def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
+ executor {
+ callback(client.get(key))
+ }
+ }
+
/**
* Ges the last queue key identifier stored.
*/