Author: chirino
Date: Sat Feb 18 14:49:24 2012
New Revision: 1245934
URL: http://svn.apache.org/viewvc?rev=1245934&view=rev
Log:
Add option which attempts to Snappy compress logged messages to reduce disk IO.
Added:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Modified: activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/pom.xml Sat Feb 18 14:49:24
2012
@@ -68,6 +68,19 @@
<version>${leveldbjni-version}</version>
</dependency>
+ <!-- For Optional Snappy Compression -->
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.3</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.iq80.snappy</groupId>
+ <artifactId>snappy</artifactId>
+ <version>0.2</version>
+ <optional>true</optional>
+ </dependency>
<!-- Since we implement a jade template to display the LevelDB status -->
<dependency>
Added:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala?rev=1245934&view=auto
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
(added)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/SnappyTrait.scala
Sat Feb 18 14:49:24 2012
@@ -0,0 +1,139 @@
+package org.apache.activemq.apollo.broker.store
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.nio.ByteBuffer
+import org.xerial.snappy.{Snappy => Xerial}
+import org.iq80.snappy.{Snappy => Iq80}
+import org.fusesource.hawtbuf.Buffer
+
+/**
+ * <p>
+ * A Snappy abstraction which attempts uses the iq80 implementation and falls
back
+ * to the xerial Snappy implementation it cannot be loaded. You can change the
+ * load order by setting the 'leveldb.snappy' system property. Example:
+ *
+ * <code>
+ * -Dleveldb.snappy=xerial,iq80
+ * </code>
+ *
+ * The system property can also be configured with the name of a class which
+ * implements the Snappy.SPI interface.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+package object leveldb {
+
+ final val Snappy = {
+ var attempt:SnappyTrait = null
+ System.getProperty("leveldb.snappy", "iq80,xerial").split(",").foreach { x
=>
+ if( attempt==null ) {
+ try {
+ var name = x.trim();
+ name = name.toLowerCase match {
+ case "xerial" =>
"org.apache.activemq.apollo.broker.store.leveldb.XerialSnappy"
+ case "iq80" =>
"org.apache.activemq.apollo.broker.store.leveldb.IQ80Snappy"
+ case _ => name
+ }
+ attempt =
Thread.currentThread().getContextClassLoader().loadClass(name).newInstance().asInstanceOf[SnappyTrait];
+ } catch {
+ case _ =>
+ }
+ }
+ }
+ attempt
+ }
+
+
+ trait SnappyTrait {
+
+ def uncompressed_length(input: Buffer):Int
+ def uncompress(input: Buffer, output:Buffer): Int
+
+ def max_compressed_length(length: Int): Int
+ def compress(input: Buffer, output: Buffer): Int
+
+ def compress(input: Buffer):Buffer = {
+ val compressed = new Buffer(max_compressed_length(input.length))
+ compressed.length = compress(input, compressed)
+ compressed
+ }
+
+ def compress(text: String): Buffer = {
+ val uncompressed = new Buffer(text.getBytes("UTF-8"))
+ val compressed = new Buffer(max_compressed_length(uncompressed.length))
+ compressed.length = compress(uncompressed, compressed)
+ return compressed
+ }
+
+ def uncompress(input: Buffer):Buffer = {
+ val uncompressed = new Buffer(uncompressed_length(input))
+ uncompressed.length = uncompress(input, uncompressed)
+ uncompressed
+ }
+
+ def uncompress(compressed: ByteBuffer, uncompressed: ByteBuffer): Int = {
+ val input = if (compressed.hasArray) {
+ new Buffer(compressed.array, compressed.arrayOffset +
compressed.position, compressed.remaining)
+ } else {
+ val t = new Buffer(compressed.remaining)
+ compressed.mark
+ compressed.get(t.data)
+ compressed.reset
+ t
+ }
+
+ val output = if (uncompressed.hasArray) {
+ new Buffer(uncompressed.array, uncompressed.arrayOffset +
uncompressed.position, uncompressed.capacity()-uncompressed.position)
+ } else {
+ new Buffer(uncompressed_length(input))
+ }
+
+ output.length = uncompress(input, output)
+
+ if (uncompressed.hasArray) {
+ uncompressed.limit(uncompressed.position + output.length)
+ } else {
+ val p = uncompressed.position
+ uncompressed.limit(uncompressed.capacity)
+ uncompressed.put(output.data, output.offset, output.length)
+ uncompressed.flip.position(p)
+ }
+ return output.length
+ }
+ }
+
+}
+package leveldb {
+
+ class XerialSnappy extends SnappyTrait {
+ override def uncompress(compressed: ByteBuffer, uncompressed: ByteBuffer)
= Xerial.uncompress(compressed, uncompressed)
+ def uncompressed_length(input: Buffer) =
Xerial.uncompressedLength(input.data, input.offset, input.length)
+ def uncompress(input: Buffer, output: Buffer) =
Xerial.uncompress(input.data, input.offset, input.length, output.data,
output.offset)
+ def max_compressed_length(length: Int) = Xerial.maxCompressedLength(length)
+ def compress(input: Buffer, output: Buffer) = Xerial.compress(input.data,
input.offset, input.length, output.data, output.offset)
+ override def compress(text: String) = new Buffer(Xerial.compress(text))
+ }
+
+ class IQ80Snappy extends SnappyTrait {
+ def uncompressed_length(input: Buffer) =
Iq80.getUncompressedLength(input.data, input.offset)
+ def uncompress(input: Buffer, output: Buffer): Int =
Iq80.uncompress(input.data, input.offset, input.length, output.data,
output.offset)
+ def compress(input: Buffer, output: Buffer): Int =
Iq80.compress(input.data, input.offset, input.length, output.data,
output.offset)
+ def max_compressed_length(length: Int) = Iq80.maxCompressedLength(length)
+ }
+}
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
Sat Feb 18 14:49:24 2012
@@ -70,6 +70,9 @@ public class LevelDBStoreDTO extends Sto
@XmlAttribute(name="index_compression")
public String index_compression;
+ @XmlAttribute(name="log_compression")
+ public String log_compression;
+
@XmlAttribute(name="index_factory")
public String index_factory;
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Sat Feb 18 14:49:24 2012
@@ -66,11 +66,13 @@ object LevelDBClient extends Log {
final val LOG_ADD_QUEUE = 1.toByte
final val LOG_REMOVE_QUEUE = 2.toByte
final val LOG_ADD_MESSAGE = 3.toByte
- final val LOG_REMOVE_MESSAGE = 4.toByte
final val LOG_ADD_QUEUE_ENTRY = 5.toByte
final val LOG_REMOVE_QUEUE_ENTRY = 6.toByte
final val LOG_MAP_ENTRY = 7.toByte
+ final val LOG_ADD_MESSAGE_SNAPPY = (LOG_ADD_MESSAGE+100).toByte
+ final val LOG_MAP_ENTRY_SNAPPY = (LOG_MAP_ENTRY+100).toByte
+
final val LOG_SUFFIX = ".log"
final val INDEX_SUFFIX = ".index"
@@ -198,6 +200,7 @@ class LevelDBClient(store: LevelDBStore)
var log:RecordLog = _
+ var snappy_compress_logs = false
var index:RichDB = _
var index_options:Options = _
@@ -256,6 +259,10 @@ class LevelDBClient(store: LevelDBStore)
case _ => CompressionType.SNAPPY
}) )
+ if( Option(config.log_compression).map(_.toLowerCase).getOrElse("snappy")
== "snappy" && Snappy!=null ) {
+ snappy_compress_logs = true
+ }
+
index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024*1024*256L))
index_options.logger(new Logger() {
def log(msg: String) = trace(msg.stripSuffix("\n"))
@@ -412,7 +419,7 @@ class LevelDBClient(store: LevelDBStore)
val record = QueuePB.FACTORY.parseUnframed(data)
index.put(encode_key(queue_prefix, record.getKey), data)
- case LOG_REMOVE_QUEUE =>
+ case LOG_REMOVE_QUEUE=>
replay_operations+=1
val ro = new ReadOptions
ro.fillCache(false)
@@ -430,7 +437,7 @@ class LevelDBClient(store: LevelDBStore)
true
}
- case LOG_MAP_ENTRY =>
+ case LOG_MAP_ENTRY | LOG_MAP_ENTRY_SNAPPY =>
replay_operations+=1
val entry = MapEntryPB.FACTORY.parseUnframed(data)
if (entry.getValue == null) {
@@ -724,7 +731,7 @@ class LevelDBClient(store: LevelDBStore)
def add_queue(record: QueueRecord, callback:Runnable) = {
retry_using_index {
log.appender { appender =>
- val value:Buffer = PBSupport.encode_queue_record(record)
+ val value:Buffer = PBSupport.to_pb(record).freeze().toUnframedBuffer
appender.append(LOG_ADD_QUEUE, value)
index.put(encode_key(queue_prefix, record.key), value)
}
@@ -795,7 +802,9 @@ class LevelDBClient(store: LevelDBStore)
entry.setValue(value)
batch.put(encode_key(map_prefix, key), value.toByteArray)
}
- appender.append(LOG_MAP_ENTRY,
entry.freeze().toUnframedByteArray)
+ var log_data = entry.freeze().toUnframedBuffer
+
+ appender.append(LOG_MAP_ENTRY, log_data)
}
uow.actions.foreach { case (msg, action) =>
@@ -804,10 +813,25 @@ class LevelDBClient(store: LevelDBStore)
var log_info:LogInfo = null
if (message_record != null) {
- val message_data =
PBSupport.encode_message_record(message_record)
- val len = message_data.length
- val p = appender.append(LOG_ADD_MESSAGE, message_data)
- locator = (p._1, len)
+
+ val pb = new MessagePB.Bean
+ pb.setProtocol(message_record.protocol)
+ pb.setSize(message_record.size)
+ pb.setValue(message_record.buffer)
+ var message_data = pb.freeze().toUnframedBuffer
+
+ val p = if( snappy_compress_logs ) {
+ val compressed = Snappy.compress(message_data)
+ if( compressed.length < message_data.length ) {
+ message_data = compressed
+ appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
+ locator = (p._1, message_data.length)
log_info = p._2
message_record.locator.set(locator);
}
@@ -819,6 +843,7 @@ class LevelDBClient(store: LevelDBStore)
assert(locator!=null)
val (pos, len) = locator
val key = encode_key(queue_entry_prefix, entry.queue_key,
entry.entry_seq)
+
appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
batch.delete(key)
log_ref_decrement(pos, log_info)
@@ -890,8 +915,13 @@ class LevelDBClient(store: LevelDBStore)
val (_, locator, callback) = x
val record = metric_load_from_index_counter.time {
val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
- log.read(pos, len).map { data =>
- val rc = PBSupport.decode_message_record(data)
+ log.read(pos, len).map { case (kind, data) =>
+
+ val msg_data = kind match {
+ case LOG_ADD_MESSAGE => data
+ case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+ }
+ val rc =
PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
rc.locator = locator
assert( rc.protocol!=null )
rc
@@ -919,9 +949,14 @@ class LevelDBClient(store: LevelDBStore)
val (_, locator, callback) = x
val record:Option[MessageRecord] =
metric_load_from_index_counter.time {
val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
- log.read(pos, len).map { x =>
- val rc:MessageRecord = PBSupport.decode_message_record(x)
+ log.read(pos, len).map { case (kind, data) =>
+ val msg_data = kind match {
+ case LOG_ADD_MESSAGE => data
+ case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+ }
+ val rc =
PBSupport.from_pb(MessagePB.FACTORY.parseUnframed(msg_data))
rc.locator = locator
+ assert( rc.protocol!=null )
rc
}
}
@@ -951,7 +986,7 @@ class LevelDBClient(store: LevelDBStore)
ro.fillCache(false)
ro.verifyChecksums(verify_checksums)
index.get(encode_key(queue_prefix, queue_key), ro).map{ x=>
- PBSupport.decode_queue_record(x)
+ PBSupport.from_pb(QueuePB.FACTORY.parseUnframed(x))
}
}
}
@@ -1182,9 +1217,12 @@ class LevelDBClient(store: LevelDBStore)
index.cursor_prefixed(Array(tmp_prefix)) { (key, value) =>
val (_, pos) = decode_long_key(key)
val len = decode_vlong(value).toInt
- log.read(pos, len).foreach { value =>
- // Set the message key to be the position in the log.
- val record = MessagePB.FACTORY.parseUnframed(value).copy
+ log.read(pos, len).foreach { case (kind, data) =>
+ val msg_data = kind match {
+ case LOG_ADD_MESSAGE => data
+ case LOG_ADD_MESSAGE_SNAPPY => Snappy.uncompress(data)
+ }
+ val record = MessagePB.FACTORY.parseUnframed(msg_data).copy()
record.setMessageKey(pos)
manager.store_message(record)
}
@@ -1253,8 +1291,18 @@ class LevelDBClient(store: LevelDBStore)
while(manager.getNext match {
case record:MessagePB.Buffer =>
- val message_data = record.toUnframedBuffer
- val (pos, _) = appender.append(LOG_ADD_MESSAGE, message_data)
+ var message_data = record.toUnframedBuffer
+ val (pos, _) = if( snappy_compress_logs ) {
+ val compressed = Snappy.compress(message_data)
+ if( compressed.length < message_data.length ) {
+ message_data = compressed
+ appender.append(LOG_ADD_MESSAGE_SNAPPY, message_data)
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
+ } else {
+ appender.append(LOG_ADD_MESSAGE, message_data)
+ }
index.put(encode_key(tmp_prefix, record.getMessageKey),
encode_locator(pos, message_data.length))
true
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala?rev=1245934&r1=1245933&r2=1245934&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/RecordLog.scala
Sat Feb 18 14:49:24 2012
@@ -227,9 +227,12 @@ case class RecordLog(directory: File, lo
check_read_flush(offset+LOG_HEADER_SIZE+length)
- if(verify_checksums) {
+ val record = new Buffer(LOG_HEADER_SIZE+length)
+ if( channel.read(record.toByteBuffer, offset) != record.length ) {
+ throw new IOException("short record at position: "+record_position+"
in file: "+file+", offset: "+offset)
+ }
- val record = new Buffer(LOG_HEADER_SIZE+length)
+ if(verify_checksums) {
def record_is_not_changing = {
using(open) { fd =>
@@ -242,10 +245,6 @@ case class RecordLog(directory: File, lo
}
}
- if( channel.read(record.toByteBuffer, offset) != record.length ) {
- assert( record_is_not_changing )
- throw new IOException("short record at position: "+record_position+"
in file: "+file+", offset: "+offset)
- }
val is = new DataByteArrayInputStream(record)
val prefix = is.readByte()
@@ -254,7 +253,7 @@ case class RecordLog(directory: File, lo
throw new IOException("invalid record at position:
"+record_position+" in file: "+file+", offset: "+offset)
}
- val id = is.readByte()
+ val kind = is.readByte()
val expectedChecksum = is.readInt()
val expectedLength = is.readInt()
val data = is.readBuffer(length)
@@ -267,13 +266,11 @@ case class RecordLog(directory: File, lo
}
}
- data
+ (kind, data)
} else {
- val data = new Buffer(length)
- if( channel.read(data.toByteBuffer, offset+LOG_HEADER_SIZE) !=
data.length ) {
- throw new IOException("short record at position: "+record_position+"
in file: "+file+", offset: "+offset)
- }
- data
+ val kind = record.get(1)
+ record.moveHead(LOG_HEADER_SIZE)
+ (kind, record)
}
}