Author: chirino
Date: Thu Feb 5 16:33:23 2009
New Revision: 741169
URL: http://svn.apache.org/viewvc?rev=741169&view=rev
Log:
Bettter property names.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Thu Feb 5 16:33:23 2009
@@ -140,7 +140,7 @@
org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(message);
command.setMessage(new Buffer(packet.getData(),
packet.getOffset(), packet.getLength()));
- store(command, isSyncWrites() && message.isResponseRequired());
+ store(command, isEnableJournalDiskSyncs() &&
message.isResponseRequired());
}
@@ -149,7 +149,7 @@
command.setDestination(dest);
command.setMessageId(ack.getLastMessageId().toString());
command.setTransactionInfo(createTransactionInfo(ack.getTransactionId()) );
- store(command, isSyncWrites() && ack.isResponseRequired());
+ store(command, isEnableJournalDiskSyncs() &&
ack.isResponseRequired());
}
public void removeAllMessages(ConnectionContext context) throws
IOException {
@@ -282,14 +282,14 @@
command.setRetroactive(retroactive);
org.apache.activemq.util.ByteSequence packet =
wireFormat.marshal(subscriptionInfo);
command.setSubscriptionInfo(new Buffer(packet.getData(),
packet.getOffset(), packet.getLength()));
- store(command, isSyncWrites() && true);
+ store(command, isEnableJournalDiskSyncs() && true);
}
public void deleteSubscription(String clientId, String
subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId,
subscriptionName));
- store(command, isSyncWrites() && true);
+ store(command, isEnableJournalDiskSyncs() && true);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Feb 5 16:33:23 2009
@@ -146,9 +146,9 @@
protected boolean deleteAllMessages;
protected File directory;
protected Thread checkpointThread;
- protected boolean syncWrites=true;
- int checkpointInterval = 5*1000;
- int cleanupInterval = 30*1000;
+ protected boolean enableJournalDiskSyncs=true;
+ long checkpointInterval = 5*1000;
+ long cleanupInterval = 30*1000;
protected AtomicBoolean started = new AtomicBoolean();
protected AtomicBoolean opened = new AtomicBoolean();
@@ -1182,9 +1182,7 @@
// /////////////////////////////////////////////////////////////////
private PageFile createPageFile() {
- PageFile pf = new PageFile(directory, "db");
- pf.setEnableAsyncWrites(!isSyncWrites());
- return pf;
+ return new PageFile(directory, "db");
}
private Journal createJournal() {
@@ -1211,27 +1209,27 @@
this.deleteAllMessages = deleteAllMessages;
}
- public boolean isSyncWrites() {
- return syncWrites;
+ public boolean isEnableJournalDiskSyncs() {
+ return enableJournalDiskSyncs;
}
- public void setSyncWrites(boolean syncWrites) {
- this.syncWrites = syncWrites;
+ public void setEnableJournalDiskSyncs(boolean syncWrites) {
+ this.enableJournalDiskSyncs = syncWrites;
}
- public int getCheckpointInterval() {
+ public long getCheckpointInterval() {
return checkpointInterval;
}
- public void setCheckpointInterval(int checkpointInterval) {
+ public void setCheckpointInterval(long checkpointInterval) {
this.checkpointInterval = checkpointInterval;
}
- public int getCleanupInterval() {
+ public long getCleanupInterval() {
return cleanupInterval;
}
- public void setCleanupInterval(int cleanupInterval) {
+ public void setCleanupInterval(long cleanupInterval) {
this.cleanupInterval = cleanupInterval;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
Thu Feb 5 16:33:23 2009
@@ -65,6 +65,8 @@
private void doTestEnqueue(final boolean transacted) throws Exception {
final long min = 100;
+ final AtomicLong total = new AtomicLong(0);
+ final AtomicLong slaViolations = new AtomicLong(0);
final AtomicLong max = new AtomicLong(0);
long reportTime = 0;
@@ -81,16 +83,20 @@
long endT = System.currentTimeMillis();
long duration = endT - startT;
+ total.incrementAndGet();
+
if (duration > max.get()) {
max.set(duration);
}
if (duration > min) {
- System.err.println(Thread.currentThread().getName()
+ slaViolations.incrementAndGet();
+ System.err.println("SLA violation @
"+Thread.currentThread().getName()
+ " "
+ DateFormat.getTimeInstance().format(
new Date(startT)) + " at message "
- + i + " send time=" + duration);
+ + i + " send time=" + duration
+ + " - Total SLA violations:
"+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f",
100.0*slaViolations.get()/total.get())+"%)");
}
}
@@ -145,7 +151,13 @@
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File("target/activemq-data/kahadb"));
kaha.deleteAllMessages();
- kaha.getPageFile().setWriteBatchSize(10);
+ kaha.setCleanupInterval(1000 * 60 * 60 * 60);
+ // The setEnableJournalDiskSyncs(false) setting is a little
dangerous right now, as I have not verified
+ // what happens if the index is updated but a journal update is
lost.
+ // Index is going to be in consistent, but can it be repaired?
+ kaha.setEnableJournalDiskSyncs(false);
+ kaha.getPageFile().setWriteBatchSize(100);
+ kaha.getPageFile().setEnableWriteThread(true);
broker.setPersistenceAdapter(kaha);
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741169&r1=741168&r2=741169&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Thu Feb 5 16:33:23 2009
@@ -26,12 +26,9 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -39,7 +36,6 @@
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
@@ -48,7 +44,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.IOExceptionSupport;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache;
@@ -119,9 +114,9 @@
// page write failures..
private boolean enableRecoveryFile=true;
// Will we sync writes to disk. Ensures that data will not be lost after a
checkpoint()
- private boolean enableSyncedWrites=true;
+ private boolean enableDiskSyncs=true;
// Will writes be done in an async thread?
- private boolean enableAsyncWrites=false;
+ private boolean enabledWriteThread=false;
// These are used if enableAsyncWrites==true
private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -427,7 +422,7 @@
*/
public void flush() throws IOException {
- if( enableAsyncWrites && stopWriter.get() ) {
+ if( enabledWriteThread && stopWriter.get() ) {
throw new IOException("Page file already stopped: checkpointing is
not allowed");
}
@@ -437,7 +432,7 @@
if( writes.isEmpty()) {
return;
}
- if( enableAsyncWrites ) {
+ if( enabledWriteThread ) {
if( this.checkpointLatch == null ) {
this.checkpointLatch = new CountDownLatch(1);
}
@@ -591,17 +586,17 @@
/**
* @return Are page writes synced to disk?
*/
- public boolean isEnableSyncedWrites() {
- return enableSyncedWrites;
+ public boolean isEnableDiskSyncs() {
+ return enableDiskSyncs;
}
/**
* Allows you enable syncing writes to disk.
* @param syncWrites
*/
- public void setEnableSyncedWrites(boolean syncWrites) {
+ public void setEnableDiskSyncs(boolean syncWrites) {
assertNotLoaded();
- this.enableSyncedWrites = syncWrites;
+ this.enableDiskSyncs = syncWrites;
}
/**
@@ -662,13 +657,13 @@
this.pageCacheSize = pageCacheSize;
}
- public boolean isEnableAsyncWrites() {
- return enableAsyncWrites;
+ public boolean isEnabledWriteThread() {
+ return enabledWriteThread;
}
- public void setEnableAsyncWrites(boolean enableAsyncWrites) {
+ public void setEnableWriteThread(boolean enableAsyncWrites) {
assertNotLoaded();
- this.enableAsyncWrites = enableAsyncWrites;
+ this.enabledWriteThread = enableAsyncWrites;
}
public long getDiskSize() throws IOException {
@@ -700,7 +695,16 @@
this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
}
- ///////////////////////////////////////////////////////////////////
+ public int getWriteBatchSize() {
+ return writeBatchSize;
+ }
+
+ public void setWriteBatchSize(int writeBatchSize) {
+ assertNotLoaded();
+ this.writeBatchSize = writeBatchSize;
+ }
+
+ ///////////////////////////////////////////////////////////////////
// Package Protected Methods exposed to Transaction
///////////////////////////////////////////////////////////////////
@@ -817,7 +821,7 @@
// Once we start approaching capacity, notify the writer to start
writing
if( canStartWriteBatch() ) {
- if( enableAsyncWrites ) {
+ if( enabledWriteThread ) {
writes.notify();
} else {
writeBatch();
@@ -828,7 +832,7 @@
private boolean canStartWriteBatch() {
int capacityUsed = ((writes.size() * 100)/writeBatchSize);
- if( enableAsyncWrites ) {
+ if( enabledWriteThread ) {
// The constant 10 here controls how soon write batches start
going to disk..
// would be nice to figure out how to auto tune that value. Make
to small and
// we reduce through put because we are locking the write mutex
too often doing writes
@@ -963,7 +967,7 @@
recoveryFile.write(w.diskBound, 0, pageSize);
}
- if (enableSyncedWrites) {
+ if (enableDiskSyncs) {
// Sync to make sure recovery buffer writes land on disk..
recoveryFile.getFD().sync();
}
@@ -978,7 +982,7 @@
}
// Sync again
- if( enableSyncedWrites ) {
+ if( enableDiskSyncs ) {
writeFile.getFD().sync();
}
@@ -1077,7 +1081,7 @@
private void startWriter() {
synchronized( writes ) {
- if( enableAsyncWrites ) {
+ if( enabledWriteThread ) {
stopWriter.set(false);
writerThread = new Thread("KahaDB Page Writer") {
@Override
@@ -1092,7 +1096,7 @@
}
private void stopWriter() throws InterruptedException {
- if( enableAsyncWrites ) {
+ if( enabledWriteThread ) {
stopWriter.set(true);
writerThread.join();
}
@@ -1102,12 +1106,4 @@
return getMainPageFile();
}
- public int getWriteBatchSize() {
- return writeBatchSize;
- }
-
- public void setWriteBatchSize(int writeBatchSize) {
- this.writeBatchSize = writeBatchSize;
- }
-
}