Author: gtully
Date: Fri Feb 27 10:22:50 2009
New Revision: 748475
URL: http://svn.apache.org/viewvc?rev=748475&view=rev
Log:
fix org.apache.kahadb.journal.JournalTest.testBatchWriteCompleteAfterClose()
after changes for AMQ-2143, the file and offset in a location are set after an
enqueue to a write batch, so doing the inflight add just overwrote the same
entry, -1:-1. Which fixed the leak but broke inflights, proper fix is to do the
inflight addition in enqueue, just after the location details are known and
with a lock on appender enqueue
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=748475&r1=748474&r2=748475&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Fri Feb 27 10:22:50 2009
@@ -175,9 +175,6 @@
// assigned
// by the data manager (which is basically just appending)
- if (!sync) {
- inflightWrites.put(new WriteKey(location), write);
- }
synchronized (this) {
batch = enqueue(write);
}
@@ -188,8 +185,7 @@
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
- }
-
+ }
return location;
}
@@ -208,14 +204,13 @@
synchronized (this) {
batch = enqueue(write);
}
- inflightWrites.put(new WriteKey(location), write);
+
location.setLatch(batch.latch);
return location;
}
private WriteBatch enqueue(WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
- WriteBatch rc = null;
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
@@ -244,14 +239,13 @@
}
nextWriteBatch = new WriteBatch(file, file.getLength(),
write);
- rc = nextWriteBatch;
enqueueMutex.notify();
- return rc;
+ break;
} else {
// Append to current batch if possible..
if (nextWriteBatch.canAppend(write)) {
nextWriteBatch.append(write);
- return nextWriteBatch;
+ break;
} else {
// Otherwise wait for the queuedCommand to be null
try {
@@ -267,6 +261,10 @@
}
}
}
+ if (!write.sync) {
+ inflightWrites.put(new WriteKey(write.location), write);
+ }
+ return nextWriteBatch;
}
}