Added: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala?rev=1162574&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala (added) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helper.scala Sun Aug 28 19:13:04 2011 @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker.store.hawtdb + +import org.fusesource.hawtbuf._ +import codec._ +import java.io.DataOutput +import org.fusesource.hawtdb.api.{SortedIndex, BTreeIndexFactory} + +object Helper { + + def encode_long(a1:Long) = { + val out = new DataByteArrayOutputStream( + AbstractVarIntSupport.computeVarLongSize(a1) + ) + out.writeVarLong(a1) + out.toBuffer + } + + def decode_long(bytes:Buffer):Long = { + val in = new DataByteArrayInputStream(bytes) + in.readVarLong() + } + + def encode(a1:Byte, a2:Long) = { + val out = new DataByteArrayOutputStream(9) + out.writeByte(a1.toInt) + out.writeLong(a2) + out.toBuffer + } + + def encode(a1:Byte, a2:Buffer) = { + val out = new DataByteArrayOutputStream(1+a2.length) + out.writeByte(a1.toInt) + a2.writeTo(out.asInstanceOf[DataOutput]) + out.toBuffer + } + + def decode_long_key(bytes:Buffer):(Byte, Long) = { + val in = new DataByteArrayInputStream(bytes) + (in.readByte(), in.readLong()) + } + + def encode(a1:Byte, a2:Long, a3:Long) = { + val out = new DataByteArrayOutputStream(17) + out.writeByte(a1) + out.writeLong(a2) + out.writeLong(a3) + out.toBuffer + } + + def decode_long_long_key(bytes:Buffer):(Byte,Long,Long) = { + val in = new DataByteArrayInputStream(bytes) + (in.readByte(), in.readLong(), in.readLong()) + } + + def encode(a1:Byte, a2:Int) = { + val out = new DataByteArrayOutputStream(5) + out.writeByte(a1) + out.writeInt(a2) + out.toBuffer + } + + def decode_int_key(bytes:Buffer):(Byte,Int) = { + val in = new DataByteArrayInputStream(bytes) + (in.readByte(), in.readInt()) + } + + val INDEX_FACTORY = new BTreeIndexFactory[Buffer, Buffer](); + INDEX_FACTORY.setKeyCodec(BufferCodec.INSTANCE); + INDEX_FACTORY.setValueCodec(BufferCodec.INSTANCE); + INDEX_FACTORY.setDeferredEncoding(true); + + final class RichBTreeIndex(val db: SortedIndex[Buffer,Buffer]) { + + def get(key:Buffer):Option[Buffer] = Option(db.get(key)) + def delete(key:Buffer) = db.remove(key) + def put(key:Buffer, value:Buffer) = Option(db.put(key, value)) + + def cursor_keys(func: Buffer => Boolean): Unit = { + val iterator = db.iterator() + while( iterator.hasNext && func(iterator.next().getKey) ) { + } + } + + def cursor_range_keys(start_included:Buffer, end_excluded:Buffer)(func:Buffer => Boolean): Unit = { + import org.fusesource.hawtdb.api.Predicates._ + val iterator = db.iterator(and(gte(start_included), lt(end_excluded))) + while( iterator.hasNext && func(iterator.next().getKey) ) { + } + } + + def cursor_range(start_included:Buffer, end_excluded:Buffer)(func: (Buffer,Buffer) => Boolean): Unit = { + def call(entry:java.util.Map.Entry[Buffer,Buffer]) = func(entry.getKey, entry.getValue) + import org.fusesource.hawtdb.api.Predicates._ + val iterator = db.iterator(and(gte(start_included), lt(end_excluded))) + while( iterator.hasNext && call(iterator.next()) ) { + } + } + + def last_key(prefix:Buffer): Option[Buffer] = { + var rc:Option[Buffer] = None + cursor_keys_prefixed(prefix) { key => + rc = Some(key) + true + } + rc + } + + def cursor_prefixed(prefix:Buffer)(func: (Buffer,Buffer) => Boolean): Unit = { + val iterator = db.iterator(prefix) + def check(entry:java.util.Map.Entry[Buffer,Buffer]) = { + entry.getKey.startsWith(prefix) && func(entry.getKey, entry.getValue) + } + while( iterator.hasNext && check(iterator.next()) ) { + } + } + + def cursor_keys_prefixed(prefix:Buffer)(func: Buffer => Boolean): Unit = { + val iterator = db.iterator(prefix) + def check(entry:java.util.Map.Entry[Buffer,Buffer]) = { + entry.getKey.startsWith(prefix) && func(entry.getKey) + } + while( iterator.hasNext && check(iterator.next()) ) { + } + } + } + +}
Added: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala?rev=1162574&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala (added) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/RecordLog.scala Sun Aug 28 19:13:04 2011 @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker.store.hawtdb + +import java.{lang=>jl} +import java.{util=>ju} + +import org.apache.activemq.apollo.util._ +import java.io._ +import java.util.zip.CRC32 +import java.util.Map.Entry +import java.util.Arrays +import collection.mutable.{HashMap, HashSet} +import collection.immutable.TreeMap +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.TimeUnit +import org.fusesource.hawtdispatch.BaseRetained +import java.nio.ByteBuffer +import org.fusesource.hawtbuf.{Buffer, DataByteArrayOutputStream, AbstractVarIntSupport} + +object RecordLog { + + // The log files contain a sequence of variable length log records: + // record := + // '*L' : int8*2 // 2 byte constant + // checksum : uint32 // crc32c of the data[] + // length : uint32 // the length the the data + // data : int8*length + // + // The log records are used to aggregate multiple data records + // as a single write to the file system. + + // + // The data is composed of multiple records too: + // data := + // kind : int8 + // length : varInt + // body : int8*length + // + // The kind field is an aid to the app layer. It cannot be set to + // '*'. + + val LOG_HEADER_PREFIX = Array('*', 'L').map(_.toByte) + val LOG_HEADER_SIZE = 10 // BATCH_HEADER_PREFIX (2) + checksum (4) + length (4) + +} + +case class RecordLog(directory: File, log_suffix:String) { + import FileSupport._ + import RecordLog._ + + directory.mkdirs() + + var write_buffer_size = 1024 * 1024 * 4 + var log_size = 1024 * 1024 * 100 + private var current_appender:LogAppender = _ + + case class LogInfo(file:File, position:Long, length:AtomicLong) { + def limit = position+length.get + } + + var log_infos = TreeMap[Long, LogInfo]() + object log_mutex + + def delete(id:Long) = { + log_mutex.synchronized { + // We can't delete the current appender. + if( current_appender.start != id ) { + log_infos.get(id).foreach { info => + on_delete(info.file) + log_infos = log_infos.filterNot(_._1 == id) + } + } + } + } + + protected def on_delete(file:File) = { + file.delete() + } + + class LogAppender(val file:File, val start:Long) { + + val fos = new FileOutputStream(file) + def channel = fos.getChannel + def os:OutputStream = fos + + val outbound = new DataByteArrayOutputStream() + + var batch_length = 0 + val length = new AtomicLong(0) + var limit = start + + // set the file size ahead of time so that we don't have to sync the file + // meta-data on every log sync. + channel.position(log_size) + channel.write(ByteBuffer.wrap(Array(0.toByte))) + channel.force(true) + channel.position(0) + + def sync = { + // only need to update the file metadata if the file size changes.. + channel.force(length.get() > log_size) + } + + def flush { + if( batch_length!= 0 ) { + + // Update the buffer with the log header info now that we + // can calc the length and checksum info + val buffer = outbound.toBuffer + + assert(buffer.length()==LOG_HEADER_SIZE+batch_length) + + outbound.reset() + outbound.write(LOG_HEADER_PREFIX) + + val checksum = new CRC32 + checksum.update(buffer.data, buffer.offset + LOG_HEADER_SIZE, buffer.length - LOG_HEADER_SIZE) + var actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt + + outbound.writeInt( actual_checksum ) + outbound.writeInt(batch_length) + + // Actually write the record to the file.. + buffer.writeTo(os); + + length.addAndGet( buffer.length() ) + + batch_length = 0 + outbound.reset() + } + } + + /** + * returns the offset position of the data record. + */ + def append(id:Byte, data: Buffer): Long = { + assert(id != LOG_HEADER_PREFIX(0)) + if( batch_length!=0 && (batch_length + data.length > write_buffer_size) ) { + flush + } + if( batch_length==0 ) { + // first data pos record is offset by the log header. + outbound.skip(LOG_HEADER_SIZE); + limit += LOG_HEADER_SIZE + } + val rc = limit; + + val start = outbound.position + outbound.writeByte(id); + outbound.writeVarInt(data.length) + outbound.write(data); + val count = outbound.position - start + + limit += count + batch_length += count + rc + } + + def close = { + flush + channel.truncate(length.get()) + os.close() + } + } + + case class LogReader(file:File, start:Long) { + + val is = new RandomAccessFile(file, "r") + + val var_support = new AbstractVarIntSupport { + def writeByte(p1: Int) = sys.error("Not supported") + def readByte(): Byte = is.readByte() + }; + + def read(pos:Long) = this.synchronized { + is.seek(pos-start) + val id = is.read() + if( id == LOG_HEADER_PREFIX(0) ) { + (id, null, pos+LOG_HEADER_SIZE) + } else { + val length = var_support.readVarInt() + val data = new Buffer(length) + is.readFully(data.data) + (id, data, is.getFilePointer) + } + } + + def close = this.synchronized { + is.close() + } + + def next_position(verify_checksums:Boolean=true):Long = this.synchronized { + var offset = 0; + val prefix = new Array[Byte](LOG_HEADER_PREFIX.length) + var done = false + while(!done) { + try { + is.seek(offset) + is.readFully(prefix) + if( !Arrays.equals(prefix, LOG_HEADER_PREFIX) ) { + throw new IOException("Missing header prefix"); + } + val expected_checksum = is.readInt(); + + val length = is.readInt(); + if (verify_checksums) { + val data = new Array[Byte](length) + is.readFully(data) + + val checksum = new CRC32 + checksum.update(data) + val actual_checksum = (checksum.getValue & 0xFFFFFFFF).toInt + + if( expected_checksum != actual_checksum ) { + throw new IOException("Data checksum missmatch"); + } + } + offset += LOG_HEADER_SIZE + length + + } catch { + case e:IOException => + done = true + } + } + start + offset + } + } + + def create_log_appender(position: Long) = { + new LogAppender(next_log(position), position) + } + + def create_appender(position: Long): Any = { + current_appender = create_log_appender(position) + log_mutex.synchronized { + log_infos += position -> new LogInfo(current_appender.file, position, current_appender.length) + } + } + + def open = { + log_mutex.synchronized { + log_infos = HawtDBClient.find_sequence_files(directory, log_suffix).map { case (position,file) => + position -> LogInfo(file, position, new AtomicLong(file.length())) + } + + val append_pos = if( log_infos.isEmpty ) { + 0L + } else { + val (_, file) = log_infos.last + val r = LogReader(file.file, file.position) + try { + val rc = r.next_position() + file.length.set(rc - file.position) + if( file.file.length != file.length.get() ) { + // we need to truncate. + using(new RandomAccessFile(file.file, "rw")) ( _.setLength(file.length.get()) ) + } + rc + } finally { + r.close + } + } + + create_appender(append_pos) + } + } + def close = { + log_mutex.synchronized { + current_appender.close + } + } + + def appender_limit = current_appender.limit + def appender_start = current_appender.start + + def next_log(position:Long) = HawtDBClient.create_sequence_file(directory, position, log_suffix) + + def appender[T](func: (LogAppender)=>T):T= { + try { + func(current_appender) + } finally { + current_appender.flush + log_mutex.synchronized { + if ( current_appender.length.get >= log_size ) { + current_appender.close + on_log_rotate() + create_appender(current_appender.limit) + } + } + } + } + + var on_log_rotate: ()=>Unit = ()=>{} + + val next_reader_id = new LongCounter() + val reader_cache_files = new HashMap[File, HashSet[Long]]; + val reader_cache_readers = new LRUCache[Long, LogReader](100) { + protected override def onCacheEviction(entry: Entry[Long, LogReader]) = { + var key = entry.getKey + var value = entry.getValue + value.close + + val set = reader_cache_files.get(value.file).get + set.remove(key) + if( set.isEmpty ) { + reader_cache_files.remove(value.file) + } + } + } + + + private def get_reader[T](pos:Long)(func: (LogReader)=>T) = { + val infos = log_mutex.synchronized(log_infos) + val info = infos.range(0L, pos+1).lastOption.map(_._2) + info.map { info => + // Checkout a reader from the cache... + val (set, reader_id, reader) = reader_cache_files.synchronized { + var set = reader_cache_files.getOrElseUpdate(info.file, new HashSet); + if( set.isEmpty ) { + val reader_id = next_reader_id.getAndIncrement() + val reader = new LogReader(info.file, info.position) + set.add(reader_id) + reader_cache_readers.put(reader_id, reader) + (set, reader_id, reader) + } else { + val reader_id = set.head + set.remove(reader_id) + (set, reader_id, reader_cache_readers.get(reader_id)) + } + } + + try { + func(reader) + } finally { + // check him back in.. + reader_cache_files.synchronized { + set.add(reader_id) + } + } + } + } + + def read(pos:Long) = { + get_reader(pos)(_.read(pos)) + } + +} Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java?rev=1162574&r1=1162573&r2=1162574&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreDTO.java Sun Aug 28 19:13:04 2011 @@ -34,54 +34,55 @@ public class HawtDBStoreDTO extends Stor @XmlAttribute public File directory; - @XmlAttribute(name="archive_directory") - public File archive_directory; + @XmlAttribute(name="fail_if_locked") + public Boolean fail_if_locked; - @XmlAttribute(name="index_flush_interval") - public Long index_flush_interval; + @XmlAttribute(name="gc_interval") + public Integer gc_interval; - @XmlAttribute(name="cleanup_interval") - public Long cleanup_interval; + @XmlAttribute(name="read_threads") + public Integer read_threads; - @XmlAttribute(name="journal_log_size") - public Integer journal_log_size; + @XmlAttribute(name="verify_checksums") + public Boolean verify_checksums; - @XmlAttribute(name="journal_batch_size") - public Integer journal_batch_size; + @XmlAttribute(name="log_size") + public Integer log_size; - @XmlAttribute(name="index_cache_size") - public Integer index_cache_size; + @XmlAttribute(name="log_write_buffer_size") + public Integer log_write_buffer_size; @XmlAttribute(name="index_page_size") - public Short index_page_size; + public Integer index_page_size; - @XmlAttribute(name="fail_if_locked") - public Boolean fail_if_locked; + @XmlAttribute(name="index_cache_size") + public Long index_cache_size; @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof HawtDBStoreDTO)) return false; if (!super.equals(o)) return false; HawtDBStoreDTO that = (HawtDBStoreDTO) o; - if (archive_directory != null ? !archive_directory.equals(that.archive_directory) : that.archive_directory != null) + if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false; - if (cleanup_interval != null ? !cleanup_interval.equals(that.cleanup_interval) : that.cleanup_interval != null) - return false; - if (directory != null ? !directory.equals(that.directory) : that.directory != null) return false; if (fail_if_locked != null ? !fail_if_locked.equals(that.fail_if_locked) : that.fail_if_locked != null) return false; - if (index_cache_size != null ? !index_cache_size.equals(that.index_cache_size) : that.index_cache_size != null) + if (gc_interval != null ? !gc_interval.equals(that.gc_interval) : that.gc_interval != null) return false; - if (index_flush_interval != null ? !index_flush_interval.equals(that.index_flush_interval) : that.index_flush_interval != null) + if (index_cache_size != null ? !index_cache_size.equals(that.index_cache_size) : that.index_cache_size != null) return false; if (index_page_size != null ? !index_page_size.equals(that.index_page_size) : that.index_page_size != null) return false; - if (journal_batch_size != null ? !journal_batch_size.equals(that.journal_batch_size) : that.journal_batch_size != null) + if (log_size != null ? !log_size.equals(that.log_size) : that.log_size != null) + return false; + if (log_write_buffer_size != null ? !log_write_buffer_size.equals(that.log_write_buffer_size) : that.log_write_buffer_size != null) return false; - if (journal_log_size != null ? !journal_log_size.equals(that.journal_log_size) : that.journal_log_size != null) + if (read_threads != null ? !read_threads.equals(that.read_threads) : that.read_threads != null) + return false; + if (verify_checksums != null ? !verify_checksums.equals(that.verify_checksums) : that.verify_checksums != null) return false; return true; @@ -91,14 +92,14 @@ public class HawtDBStoreDTO extends Stor public int hashCode() { int result = super.hashCode(); result = 31 * result + (directory != null ? directory.hashCode() : 0); - result = 31 * result + (archive_directory != null ? archive_directory.hashCode() : 0); - result = 31 * result + (index_flush_interval != null ? index_flush_interval.hashCode() : 0); - result = 31 * result + (cleanup_interval != null ? cleanup_interval.hashCode() : 0); - result = 31 * result + (journal_log_size != null ? journal_log_size.hashCode() : 0); - result = 31 * result + (journal_batch_size != null ? journal_batch_size.hashCode() : 0); - result = 31 * result + (index_cache_size != null ? index_cache_size.hashCode() : 0); - result = 31 * result + (index_page_size != null ? index_page_size.hashCode() : 0); result = 31 * result + (fail_if_locked != null ? fail_if_locked.hashCode() : 0); + result = 31 * result + (gc_interval != null ? gc_interval.hashCode() : 0); + result = 31 * result + (read_threads != null ? read_threads.hashCode() : 0); + result = 31 * result + (verify_checksums != null ? verify_checksums.hashCode() : 0); + result = 31 * result + (log_size != null ? log_size.hashCode() : 0); + result = 31 * result + (log_write_buffer_size != null ? log_write_buffer_size.hashCode() : 0); + result = 31 * result + (index_page_size != null ? index_page_size.hashCode() : 0); + result = 31 * result + (index_cache_size != null ? index_cache_size.hashCode() : 0); return result; } } Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java?rev=1162574&r1=1162573&r2=1162574&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.java Sun Aug 28 19:13:04 2011 @@ -41,4 +41,22 @@ public class HawtDBStoreStatusDTO extend @XmlElement(name="message_load_batch_size") public IntMetricDTO message_load_batch_size; + @XmlElement(name="last_checkpoint_pos") + public long index_snapshot_pos; + + @XmlElement(name="last_gc_ts") + public long last_gc_ts; + + @XmlElement(name="in_gc") + public boolean in_gc; + + @XmlElement(name="last_gc_duration") + public long last_gc_duration; + + @XmlElement(name="last_append_pos") + public long log_append_pos; + + @XmlElement(name="log_stats") + public String log_stats; + } Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade?rev=1162574&r1=1162573&r2=1162574&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/webapp/WEB-INF/org/apache/activemq/apollo/broker/store/hawtdb/dto/HawtDBStoreStatusDTO.jade Sun Aug 28 19:13:04 2011 @@ -4,9 +4,9 @@ -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at --# +-# -# http://www.apache.org/licenses/LICENSE-2.0 --# +-# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,6 +25,8 @@ h1 Store: #{id} p state: #{state} for #{ uptime(state_since) } +p pending stores: #{pending_stores} + h2 Cancel Stats p canceled message stores: #{canceled_message_counter} p canceled message enqueues: #{canceled_enqueue_counter} @@ -40,5 +42,16 @@ h2 Store Latency Stats - show("Message load latency", message_load_latency) - show("UOW flush latency", flush_latency) -- show("Journal append latency", journal_append_latency) -- show("Index update latency", index_update_latency) + +h2 Log Status +p last log GC occured #{uptime(last_gc_ts)} +p last log GC duration: #{friendly_duration(last_gc_duration)} +pre + !~~ log_stats +p + Index recovery starts from log position: + code #{"%016x".format(index_snapshot_pos)} +p + Append position: + code #{"%016x".format(log_append_pos)} + Modified: activemq/activemq-apollo/trunk/pom.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1162574&r1=1162573&r2=1162574&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/pom.xml (original) +++ activemq/activemq-apollo/trunk/pom.xml Sun Aug 28 19:13:04 2011 @@ -121,7 +121,7 @@ <uniqueVersion>false</uniqueVersion> <cascal-version>1.3-SNAPSHOT</cascal-version> - <hawtdb-version>1.6-SNAPSHOT</hawtdb-version> + <hawtdb-version>1.6</hawtdb-version> <josql-version>1.5</josql-version> <!-- osgi stuff -->
