Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexBenchMark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexBenchMark.java?rev=692288&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexBenchMark.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexBenchMark.java Thu Sep 4 15:46:42 2008 @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.index; + +import org.apache.kahadb.LongMarshaller; +import org.apache.kahadb.StringMarshaller; +import org.apache.kahadb.page.Transaction; + +public class HashIndexBenchMark extends IndexBenchmark { + + @Override + protected Index<String, Long> createIndex() throws Exception { + + Transaction tx = pf.tx(); + long id = tx.allocate().getPageId(); + tx.commit(); + + HashIndex<String, Long> index = new HashIndex<String, Long>(pf, id); + index.setKeyMarshaller(StringMarshaller.INSTANCE); + index.setValueMarshaller(LongMarshaller.INSTANCE); + + return index; + } + +}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexTest.java?rev=692288&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexTest.java Thu Sep 4 15:46:42 2008 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.index; + +import org.apache.kahadb.LongMarshaller; +import org.apache.kahadb.StringMarshaller; +import org.apache.kahadb.index.HashIndex; +import org.apache.kahadb.index.Index; + +public class HashIndexTest extends IndexTestSupport { + + @Override + protected Index<String, Long> createIndex() throws Exception { + + long id = tx.allocate().getPageId(); + tx.commit(); + + HashIndex<String, Long> index = new HashIndex<String,Long>(pf, id); + index.setBinCapacity(12); + index.setKeyMarshaller(StringMarshaller.INSTANCE); + index.setValueMarshaller(LongMarshaller.INSTANCE); + + return index; + } + +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java?rev=692288&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java Thu Sep 4 15:46:42 2008 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.index; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import junit.framework.TestCase; +import org.apache.kahadb.page.PageFile; +import org.apache.kahadb.page.Transaction; +import org.apache.kahadb.util.IOHelper; + +/** + * @author chirino + */ +public abstract class IndexBenchmark extends TestCase { + + // Slower machines might need to make this bigger. + private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 5)); + // How many times do we sample? + private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "" + 60 * 1000 / SAMPLE_DURATION)); + // How many indexes will we be benchmarking concurrently? + private static final int INDEX_COUNT = Integer.parseInt(System.getProperty("INDEX_COUNT", "" + 1)); + // Indexes tend to perform worse when they get big.. so how many items + // should we put into the index before we start sampling. + private static final int INDEX_PRE_LOAD_COUNT = Integer.parseInt(System.getProperty("INDEX_PRE_LOAD_COUNT", "" + 10000 / INDEX_COUNT)); + + protected File ROOT_DIR; + protected final HashMap<String, Index<String, Long>> indexes = new HashMap<String, Index<String, Long>>(); + protected PageFile pf; + + public void setUp() throws Exception { + ROOT_DIR = new File(IOHelper.getDefaultDataDirectory()); + IOHelper.mkdirs(ROOT_DIR); + IOHelper.deleteChildren(ROOT_DIR); + + pf = new PageFile(ROOT_DIR, getClass().getName()); + pf.load(); + } + + protected void tearDown() throws Exception { + for (Index i : indexes.values()) { + try { + i.unload(); + } catch (Throwable ignore) { + } + } + } + + abstract protected Index<String, Long> createIndex() throws Exception; + + synchronized private Index<String, Long> openIndex(String name) throws Exception { + Index<String, Long> index = indexes.get(name); + if (index == null) { + index = createIndex(); + index.load(); + indexes.put(name, index); + } + return index; + } + + class Producer extends Thread { + private final String name; + AtomicBoolean shutdown = new AtomicBoolean(); + + public Producer(String name) { + super("Producer: " + name); + this.name = name; + } + + public void shutdown() { + shutdown.set(true); + } + + @Override + public void run() { + try { + + Transaction tx = pf.tx(); + + Index<String,Long> index = openIndex(name); + long counter = 0; + while (!shutdown.get()) { + long c = counter; + + String key = key(c); + index.put(tx, key, c); + tx.commit(); + Thread.yield(); // This avoids consumer starvation.. + + onProduced(counter++); + } + + } catch (Throwable e) { + e.printStackTrace(); + } + } + + public void onProduced(long counter) { + } + } + + protected String key(long c) { + return "a-long-message-id-like-key-" + c; + } + + + class Consumer extends Thread { + private final String name; + AtomicBoolean shutdown = new AtomicBoolean(); + + public Consumer(String name) { + super("Consumer: " + name); + this.name = name; + } + + public void shutdown() { + shutdown.set(true); + } + + @Override + public void run() { + try { + Transaction tx = pf.tx(); + + Index<String,Long> index = openIndex(name); + long counter = 0; + while (!shutdown.get()) { + long c = counter; + String key = key(c); + + Long record = index.get(tx, key); + if (record != null) { + if( index.remove(tx, key) == null ) { + System.out.print("Remove failed..."); + } + tx.commit(); + onConsumed(counter++); + } + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + + public void onConsumed(long counter) { + } + } + + protected void dumpIndex(Index<String, Long> index) throws IOException { + } + + public void testLoad() throws Exception { + + final Producer producers[] = new Producer[INDEX_COUNT]; + final Consumer consumers[] = new Consumer[INDEX_COUNT]; + final CountDownLatch preloadCountDown = new CountDownLatch(INDEX_COUNT); + final AtomicLong producedRecords = new AtomicLong(); + final AtomicLong consumedRecords = new AtomicLong(); + + System.out.println("Starting: " + INDEX_COUNT + " producers"); + for (int i = 0; i < INDEX_COUNT; i++) { + producers[i] = new Producer("test-" + i) { + private boolean prelaodDone; + + public void onProduced(long counter) { + if (!prelaodDone && counter >= INDEX_PRE_LOAD_COUNT) { + prelaodDone = true; + preloadCountDown.countDown(); + } + producedRecords.incrementAndGet(); + } + }; + producers[i].start(); + } + + long start = System.currentTimeMillis(); + System.out.println("Waiting for each producer create " + INDEX_PRE_LOAD_COUNT + " records before starting the consumers."); + preloadCountDown.await(); + long end = System.currentTimeMillis(); + System.out.println("Preloaded " + INDEX_PRE_LOAD_COUNT * INDEX_COUNT + " records at " + (INDEX_PRE_LOAD_COUNT * INDEX_COUNT * 1000f / (end - start)) + " records/sec"); + + System.out.println("Starting: " + INDEX_COUNT + " consumers"); + for (int i = 0; i < INDEX_COUNT; i++) { + consumers[i] = new Consumer("test-" + i) { + public void onConsumed(long counter) { + consumedRecords.incrementAndGet(); + } + }; + consumers[i].start(); + } + + long sample_start = System.currentTimeMillis(); + System.out.println("Taking " + SAMPLES + " performance samples every " + SAMPLE_DURATION + " ms"); + System.out.println("time (s), produced, produce rate (r/s), consumed, consume rate (r/s), used memory (k)"); + producedRecords.set(0); + consumedRecords.set(0); + for (int i = 0; i < SAMPLES; i++) { + start = System.currentTimeMillis(); + Thread.sleep(SAMPLE_DURATION); + end = System.currentTimeMillis(); + long p = producedRecords.getAndSet(0); + long c = consumedRecords.getAndSet(0); + + long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + System.out.println(((end-sample_start)/1000f)+", "+p+", "+(p * 1000f / (end - start)) + ", "+ c+", " + (c * 1000f / (end - start))+", "+(usedMemory/(1024)) ); + } + System.out.println("Samples done... Shutting down the producers and consumers..."); + for (int i = 0; i < INDEX_COUNT; i++) { + producers[i].shutdown(); + consumers[i].shutdown(); + } + for (int i = 0; i < INDEX_COUNT; i++) { + producers[i].join(1000 * 5); + consumers[i].join(1000 * 5); + } + System.out.println("Shutdown."); + } + +} Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java?rev=692288&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java Thu Sep 4 15:46:42 2008 @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.index; + +import java.io.File; +import java.io.IOException; + +import junit.framework.TestCase; +import org.apache.kahadb.page.PageFile; +import org.apache.kahadb.page.Transaction; +import org.apache.kahadb.util.IOHelper; + +/** + * Test a HashIndex + */ +public abstract class IndexTestSupport extends TestCase { + + private static final int COUNT = 10000; + + protected Index<String,Long> index; + protected File directory; + protected PageFile pf; + protected Transaction tx; + + /** + * @throws java.lang.Exception + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + super.setUp(); + directory = new File(IOHelper.getDefaultDataDirectory()); + IOHelper.mkdirs(directory); + IOHelper.deleteChildren(directory); + + } + + protected void tearDown() throws Exception { + if( pf!=null ) { + pf.unload(); + pf.delete(); + } + } + + protected void createPageFileAndIndex(int pageSize) throws Exception { + pf = new PageFile(directory, getClass().getName()); + pf.setPageSize(pageSize); + pf.load(); + tx = pf.tx(); + this.index = createIndex(); + } + + abstract protected Index<String, Long> createIndex() throws Exception; + + public void testIndex() throws Exception { + createPageFileAndIndex(500); + + this.index.load(); + doInsert(COUNT); + this.index.unload(); + this.index.load(); + checkRetrieve(COUNT); + doRemove(COUNT); + this.index.unload(); + this.index.load(); + doInsert(COUNT); + doRemoveHalf(COUNT); + doInsertHalf(COUNT); + this.index.unload(); + this.index.load(); + checkRetrieve(COUNT); + this.index.unload(); + } + + void doInsert(int count) throws Exception { + for (int i = 0; i < count; i++) { + index.put(tx, key(i), (long)i); + tx.commit(); + } + } + + protected String key(int i) { + return "key:"+i; + } + + void checkRetrieve(int count) throws IOException { + for (int i = 0; i < count; i++) { + Long item = index.get(tx, key(i)); + assertNotNull("Key missing: "+key(i), item); + } + } + + void doRemoveHalf(int count) throws Exception { + for (int i = 0; i < count; i++) { + if (i % 2 == 0) { + assertNotNull("Expected remove to return value for index "+i, index.remove(tx, key(i))); + tx.commit(); + } + + } + } + + void doInsertHalf(int count) throws Exception { + for (int i = 0; i < count; i++) { + if (i % 2 == 0) { + index.put(tx, key(i), (long)i); + tx.commit(); + } + } + } + + void doRemove(int count) throws Exception { + for (int i = 0; i < count; i++) { + assertNotNull("Expected remove to return value for index "+i, index.remove(tx, key(i))); + tx.commit(); + } + for (int i = 0; i < count; i++) { + Long item = index.get(tx, key(i)); + assertNull(item); + } + } + + void doRemoveBackwards(int count) throws Exception { + for (int i = count - 1; i >= 0; i--) { + index.remove(tx, key(i)); + tx.commit(); + } + for (int i = 0; i < count; i++) { + Long item = index.get(tx, key(i)); + assertNull(item); + } + } + +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java?rev=692288&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java Thu Sep 4 15:46:42 2008 @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.journal; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; +import org.apache.kahadb.journal.Journal; +import org.apache.kahadb.util.ByteSequence; + +public class JournalTest extends TestCase { + protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4; + + Journal dataManager; + File dir; + + @Override + public void setUp() throws Exception { + dir = new File("target/tests/DataFileAppenderTest"); + dir.mkdirs(); + dataManager = new Journal(); + dataManager.setDirectory(dir); + configure(dataManager); + dataManager.start(); + } + + protected void configure(Journal dataManager) { + dataManager.setUseNio(false); + } + + @Override + public void tearDown() throws Exception { + dataManager.close(); + deleteFilesInDirectory(dir); + dir.delete(); + } + + private void deleteFilesInDirectory(File directory) { + File[] files = directory.listFiles(); + for (int i=0; i<files.length; i++) { + File f = files[i]; + if (f.isDirectory()) { + deleteFilesInDirectory(f); + } + f.delete(); + } + } + + public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception { + final int iterations = 10; + final CountDownLatch latch = new CountDownLatch(iterations); + ByteSequence data = new ByteSequence("DATA".getBytes()); + for (int i=0; i < iterations; i++) { + dataManager.write(data, new Runnable() { + public void run() { + latch.countDown(); + } + }); + } + // at this point most probably dataManager.getInflightWrites().size() >= 0 + // as the Thread created in DataFileAppender.enqueue() may not have caught up. + assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS)); + } + + public void testBatchWriteCallbackCompleteAfterClose() throws Exception { + final int iterations = 10; + final CountDownLatch latch = new CountDownLatch(iterations); + ByteSequence data = new ByteSequence("DATA".getBytes()); + for (int i=0; i<iterations; i++) { + dataManager.write(data, new Runnable() { + public void run() { + latch.countDown(); + } + }); + } + dataManager.close(); + assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty()); + assertEquals("none written", 0, latch.getCount()); + } + + public void testBatchWriteCompleteAfterClose() throws Exception { + ByteSequence data = new ByteSequence("DATA".getBytes()); + final int iterations = 10; + for (int i=0; i<iterations; i++) { + dataManager.write(data, false); + } + dataManager.close(); + assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty()); + } + + public void testBatchWriteToMaxMessageSize() throws Exception { + final int iterations = 4; + final CountDownLatch latch = new CountDownLatch(iterations); + Runnable done = new Runnable() { + public void run() { + latch.countDown(); + } + }; + int messageSize = DEFAULT_MAX_BATCH_SIZE / iterations; + byte[] message = new byte[messageSize]; + ByteSequence data = new ByteSequence(message); + + for (int i=0; i< iterations; i++) { + dataManager.write(data, done); + } + + // write may take some time + assertTrue("all callbacks complete", latch.await(10, TimeUnit.SECONDS)); + } + + public void testNoBatchWriteWithSync() throws Exception { + ByteSequence data = new ByteSequence("DATA".getBytes()); + final int iterations = 10; + for (int i=0; i<iterations; i++) { + dataManager.write(data, true); + assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty()); + } + } +} Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java?rev=692288&view=auto ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java (added) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java Thu Sep 4 15:46:42 2008 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kahadb.journal; + +import org.apache.kahadb.journal.Journal; + +public class NioJournalTest extends JournalTest { + + @Override + protected void configure(Journal dataManager) { + dataManager.setUseNio(true); + } +}
