Author: gtully
Date: Fri Dec 23 10:48:38 2011
New Revision: 1222635
URL: http://svn.apache.org/viewvc?rev=1222635&view=rev
Log:
CallerBufferingDataFileAppender, fix rollover of cached buffers
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java?rev=1222635&r1=1222634&r2=1222635&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
Fri Dec 23 10:48:38 2011
@@ -50,11 +50,13 @@ public class KahaDBFastEnqueueTest {
private Destination destination = new ActiveMQQueue("Test");
private String payloadString = new String(new byte[6*1024]);
private boolean useBytesMessage= true;
- private final int parallelProducer = 2;
+ private final int parallelProducer = 20;
private Vector<Exception> exceptions = new Vector<Exception>();
- final long toSend = 1000;//500000;
+ final long toSend = 500000;
- @Ignore("not ready yet, exploring getting broker disk bound")
+ @Ignore("too slow, exploring getting broker disk bound")
+ // use with:
+ // -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000
-Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true
public void testPublishNoConsumer() throws Exception {
startBroker(true);
@@ -116,8 +118,10 @@ public class KahaDBFastEnqueueTest {
@After
public void stopBroker() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
}
final double sampleRate = 100000;
@@ -174,4 +178,12 @@ public class KahaDBFastEnqueueTest {
String options =
"?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
connectionFactory = new
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()
+ options);
}
+
+ public void testRollover() throws Exception {
+ byte flip = 0x1;
+ for (long i=0; i<Short.MAX_VALUE; i++) {
+ assertEquals("0 @:" + i, 0, flip ^= 1);
+ assertEquals("1 @:" + i, 1, flip ^= 1);
+ }
+ }
}
\ No newline at end of file
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java?rev=1222635&r1=1222634&r2=1222635&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
Fri Dec 23 10:48:38 2011
@@ -57,10 +57,10 @@ class CallerBufferingDataFileAppender im
new DataByteArrayOutputStream(maxWriteBatchSize),
new DataByteArrayOutputStream(maxWriteBatchSize)
};
- AtomicInteger writeBatchInstanceCount = new AtomicInteger();
+ volatile byte flip = 0x1;
public class WriteBatch {
- DataByteArrayOutputStream buff =
cachedBuffers[writeBatchInstanceCount.getAndIncrement()%2];
+ DataByteArrayOutputStream buff = cachedBuffers[flip ^= 1];
public final DataFile dataFile;
public final LinkedNodeList<Journal.WriteCommand> writes = new
LinkedNodeList<Journal.WriteCommand>();