Author: chirino
Date: Thu Feb 5 22:41:01 2009
New Revision: 741327
URL: http://svn.apache.org/viewvc?rev=741327&view=rev
Log:
- Implemented the setBatch API call in KahaDBStore.
- fixed bug: When a async thread was used for writing in KahaDBStore you could
run in a Memory leak due to an unbound enqueue buffer type problem.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Feb 5 22:41:01 2009
@@ -166,7 +166,7 @@
size++;
}
- protected void setBatch(MessageId messageId) {
+ protected void setBatch(MessageId messageId) throws Exception {
}
public final synchronized void addMessageFirst(MessageReference node)
throws Exception {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Thu Feb 5 22:41:01 2009
@@ -74,7 +74,7 @@
this.store.resetBatching();
}
- protected void setBatch(MessageId messageId) {
+ protected void setBatch(MessageId messageId) throws Exception {
store.setBatch(messageId);
batchResetNeeded = false;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
Thu Feb 5 22:41:01 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.store;
+import java.io.IOException;
+
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.broker.ConnectionContext;
@@ -44,6 +46,6 @@
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
- public void setBatch(MessageId messageId) {
+ public void setBatch(MessageId messageId) throws IOException {
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
Thu Feb 5 22:41:01 2009
@@ -115,6 +115,6 @@
* allow caching cursors to set the current batch offset when cache is
exhausted
* @param messageId
*/
- void setBatch(MessageId messageId);
+ void setBatch(MessageId messageId) throws Exception;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
Thu Feb 5 22:41:01 2009
@@ -93,7 +93,7 @@
}
- public void setBatch(MessageId messageId) {
+ public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Thu Feb 5 22:41:01 2009
@@ -135,7 +135,7 @@
}
- public void setBatch(MessageId messageId) {
+ public void setBatch(MessageId messageId) throws Exception {
delegate.setBatch(messageId);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Thu Feb 5 22:41:01 2009
@@ -245,7 +245,24 @@
@Override
- public void setBatch(MessageId messageId) {
+ public void setBatch(MessageId identity) throws IOException {
+ final String key = identity.toString();
+
+ // Hopefully one day the page file supports concurrent read
operations... but for now we must
+ // externally synchronize...
+ Long location;
+ synchronized(indexMutex) {
+ location = pageFile.tx().execute(new
Transaction.CallableClosure<Long, IOException>(){
+ public Long execute(Transaction tx) throws IOException {
+ StoredDestination sd = getStoredDestination(dest, tx);
+ return sd.messageIdIndex.get(tx, key);
+ }
+ });
+ }
+ if( location!=null ) {
+ cursorPos=location;
+ }
+
}
public void setMemoryUsage(MemoryUsage memoeyUSage) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
Thu Feb 5 22:41:01 2009
@@ -104,7 +104,7 @@
// TODO Auto-generated catch block
e.printStackTrace();
}
- System.out.println("max = " + max);
+ System.out.println("Max Violation = " + max + " - Total SLA
violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f",
100.0*slaViolations.get()/total.get())+"%)");
}
};
ExecutorService executor = Executors.newCachedThreadPool();
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741327&r1=741326&r2=741327&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Thu Feb 5 22:41:01 2009
@@ -813,7 +813,17 @@
void write(Collection<Map.Entry<Long, PageWrite>> updates) throws
IOException {
synchronized( writes ) {
-
+ if( enabledWriteThread ) {
+ while( writes.size() >= writeBatchSize && !stopWriter.get() ) {
+ try {
+ writes.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ }
+ }
+
for (Map.Entry<Long, PageWrite> entry : updates) {
Long key = entry.getKey();
PageWrite value = entry.getValue();
@@ -889,9 +899,11 @@
try {
while( !stopWriter.get() ) {
// Wait for a notification...
- synchronized( writes ) {
+ synchronized( writes ) {
+ writes.notifyAll();
+
// If there is not enough to write, wait for a
notification...
- while( !canStartWriteBatch() && !stopWriter.get() ) {
+ while( writes.isEmpty() && checkpointLatch==null &&
!stopWriter.get() ) {
writes.wait(100);
}