Author: chirino Date: Fri Nov 24 16:03:29 2006 New Revision: 479053 URL: http://svn.apache.org/viewvc?view=rev&rev=479053 Log: Added JournalRWPerfTool that tests write and read performance.
Added: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalRWPerfToolSupport.java incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalRWPerfTool.java Removed: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/net/ Modified: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalPerfToolSupport.java incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalStatsFilter.java incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalImplTest.java incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalPerfTool.java incubator/activemq/activeio/trunk/activeio-core/src/test/resources/log4j.properties Modified: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalPerfToolSupport.java URL: http://svn.apache.org/viewvc/incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalPerfToolSupport.java?view=diff&rev=479053&r1=479052&r2=479053 ============================================================================== --- incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalPerfToolSupport.java (original) +++ incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalPerfToolSupport.java Fri Nov 24 16:03:29 2006 @@ -49,9 +49,10 @@ protected File statCSVFile = new File("stats.csv");; // Controls how often we start a new batch of workers. - protected int workerIncrement=20; + protected int workerIncrement=5; protected long incrementDelay=1000*20; protected boolean verbose=true; + protected int initialWorkers=10; // Worker configuration. protected int recordSize=1024; @@ -106,20 +107,26 @@ try { + System.out.println("Starting "+initialWorkers+" Workers..."); + for(int i=0;i <initialWorkers;i++) { + new Thread(new Worker()).start(); + workerCount++; + } + // Wait a little to see the worker affect the stats. // Increment the number of workers every few seconds. while(true) { - System.out.println("Starting "+workerIncrement+" Workers..."); - for(int i=0;i <workerIncrement;i++) { - new Thread(new Worker()).start(); - workerCount++; - } - // Wait a little to see the worker affect the stats. System.out.println("Waiting "+(incrementDelay/1000)+" seconds before next Stat sample."); Thread.sleep(incrementDelay); displayStats(); journal.reset(); + + System.out.println("Starting "+workerIncrement+" Workers..."); + for(int i=0;i <workerIncrement;i++) { + new Thread(new Worker()).start(); + workerCount++; + } } @@ -132,7 +139,7 @@ System.out.println("Stats at "+workerCount+" workers."); System.out.println(journal); if( statWriter!= null ) { - statWriter.println(""+workerCount+","+journal.getThroughputKps()+","+journal.getAvgSyncedLatencyMs()+","+journal.getThroughputRps()); + statWriter.println(""+workerCount+","+journal.getWriteThroughputKps()+","+journal.getAvgSyncedLatencyMs()+","+journal.getWriteThroughputRps()); statWriter.flush(); } } Added: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalRWPerfToolSupport.java URL: http://svn.apache.org/viewvc/incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalRWPerfToolSupport.java?view=auto&rev=479053 ============================================================================== --- incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalRWPerfToolSupport.java (added) +++ incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalRWPerfToolSupport.java Fri Nov 24 16:03:29 2006 @@ -0,0 +1,214 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeio.journal; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.activeio.journal.InvalidRecordLocationException; +import org.apache.activeio.journal.Journal; +import org.apache.activeio.journal.JournalEventListener; +import org.apache.activeio.journal.RecordLocation; +import org.apache.activeio.packet.ByteArrayPacket; +import org.apache.activeio.packet.Packet; + +/** + * Provides the base class uses to run performance tests against a Journal. + * Should be subclassed to customize for specific journal implementation. + * + * @version $Revision: 1.1 $ + */ +abstract public class JournalRWPerfToolSupport implements JournalEventListener { + + private JournalStatsFilter journal; + private Random random = new Random(); + private byte data[]; + private int writeWorkerCount=0; + private int readWorkerCount=0; + private PrintWriter statWriter; + // Performance test Options + + // The output goes here: + protected File journalDirectory = new File("journal-logs"); + protected File statCSVFile = new File("stats.csv");; + + // Controls how often we start a new batch of workers. + protected int writeWorkerIncrement=5; + protected int initialWriteWorkers=5; + protected int readWorkerThinkTime=0; + + protected int readWorkerIncrement=5; + protected int initialReadWorkers=5; + protected int writeWorkerThinkTime=0; + + protected long incrementDelay=1000*20; + protected boolean verbose=true; + + // Worker configuration. + protected int recordSize=1024; + protected int syncFrequency=15; + + private final class WriteWorker implements Runnable { + public void run() { + int i=random.nextInt()%syncFrequency; + while(true) { + boolean sync=false; + + if( syncFrequency>=0 && (i%syncFrequency)==0 ) { + sync=true; + } + try { + journal.write(new ByteArrayPacket(data), sync); + Thread.sleep(writeWorkerThinkTime); + } catch (Exception e) { + e.printStackTrace(); + return; + } + i++; + } + } + } + + + private final class ReadWorker implements Runnable { + + AtomicLong counter=new AtomicLong(); + public void run() { + while(true) { + try { + RecordLocation pos = null; + while( (pos=journal.getNextRecordLocation(pos))!=null ) { + Packet packet = journal.read(pos); + counter.addAndGet(packet.limit()); + Thread.sleep(readWorkerThinkTime); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + } + + /** + * @throws IOException + * + */ + protected void exec() throws Exception { + + System.out.println("Client threads write records using: Record Size: "+recordSize+", Sync Frequency: "+syncFrequency+", Worker Think Time: "+readWorkerThinkTime); + + // Create the record and fill it with some values. + data = new byte[recordSize]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte)i; + } + + if( statCSVFile!=null ) { + statWriter = new PrintWriter(new FileOutputStream(statCSVFile)); + statWriter.println("Threads,Write Throughput (k/s),Forced Write latency (ms), Write Throughput (records/s),Read Throughput (k/s),Read latency (ms),Read Throughput (records/s)"); + } + + if( journalDirectory.exists() ) { + deleteDir(journalDirectory); + } + journal = new JournalStatsFilter(createJournal()).enableDetailedStats(verbose); + journal.setJournalEventListener(this); + + try { + + System.out.println("Starting "+initialWriteWorkers+" Write Workers..."); + for(int i=0;i <initialWriteWorkers;i++) { + new Thread(new WriteWorker()).start(); + writeWorkerCount++; + } + + System.out.println("Starting "+initialReadWorkers+" Read Workers..."); + for(int i=0;i <initialReadWorkers;i++) { + new Thread(new ReadWorker()).start(); + readWorkerCount++; + } + + // Wait a little to see the worker affect the stats. + // Increment the number of workers every few seconds. + while(true) { + // Wait a little to see the worker affect the stats. + System.out.println("Waiting "+(incrementDelay/1000)+" seconds before next Stat sample."); + Thread.sleep(incrementDelay); + displayStats(); + journal.reset(); + + System.out.println("Starting "+writeWorkerIncrement+" Workers..."); + for(int i=0;i <writeWorkerIncrement;i++) { + new Thread(new WriteWorker()).start(); + writeWorkerCount++; + } + } + + + } finally { + journal.close(); + } + } + + private void displayStats() { + System.out.println("Stats at "+writeWorkerCount+" write workers and "+readWorkerCount+" read workers."); + System.out.println(journal); + if( statWriter!= null ) { + statWriter.println( + ""+writeWorkerCount+","+ + journal.getWriteThroughputKps()+","+ + journal.getAvgSyncedLatencyMs()+","+ + journal.getWriteThroughputRps()+","+ + journal.getReadThroughputKps()+","+ + journal.getAvgReadLatencyMs()+","+ + journal.getReadThroughputRps() + ); + statWriter.flush(); + } + } + + /** + * @return + */ + abstract public Journal createJournal() throws Exception; + + static private void deleteDir(File f) { + File[] files = f.listFiles(); + for (int i = 0; i < files.length; i++) { + File file = files[i]; + file.delete(); + } + f.delete(); + } + + + public void overflowNotification(RecordLocation safeLocation) { + try { + // System.out.println("Mark set: "+safeLocation); + journal.setMark(safeLocation, false); + } catch (InvalidRecordLocationException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} Modified: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalStatsFilter.java URL: http://svn.apache.org/viewvc/incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalStatsFilter.java?view=diff&rev=479053&r1=479052&r2=479053 ============================================================================== --- incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalStatsFilter.java (original) +++ incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/JournalStatsFilter.java Fri Nov 24 16:03:29 2006 @@ -40,6 +40,9 @@ private final TimeStatisticImpl unsynchedWriteLatency = new TimeStatisticImpl(writeLatency, "unsynchedWriteLatency", "The amount of time that is spent waiting for a non synch record to be written to the Journal"); private final TimeStatisticImpl readLatency = new TimeStatisticImpl("readLatency", "The amount of time that is spent waiting for a record to be read from the Journal"); private final CountStatisticImpl readBytesCounter = new CountStatisticImpl("readBytesCounter","The number of bytes that have been read by the Journal"); + private final CountStatisticImpl readRecordsCounter = new CountStatisticImpl("readRecordsCounter","The number of records that have been read by the Journal"); + private final TimeStatisticImpl cursorLatency = new TimeStatisticImpl("nextRecordLocationLatency", "The amount of time that is spent waiting to locate the next record location from the Journal"); + private final CountStatisticImpl cursorCounter = new CountStatisticImpl("nextRecordLocationCounter","The number of times the the next record was located in the Journal"); private final Journal next; private boolean detailedStats; @@ -81,6 +84,7 @@ Packet answer = next.read(location); long end = System.currentTimeMillis(); + readRecordsCounter.increment(); readBytesCounter.add(answer.remaining()); readLatency.addTime(end-start); return answer; @@ -119,8 +123,15 @@ * @see org.codehaus.activemq.journal.Journal#getNextRecordLocation(org.codehaus.activemq.journal.RecordLocation) */ public RecordLocation getNextRecordLocation(RecordLocation lastLocation) - throws IOException, InvalidRecordLocationException { - return next.getNextRecordLocation(lastLocation); + throws IOException, InvalidRecordLocationException { + + long start = System.currentTimeMillis(); + RecordLocation rc = next.getNextRecordLocation(lastLocation); + long end = System.currentTimeMillis(); + + cursorCounter.increment(); + cursorLatency.addTime(end-start); + return rc; } /** @@ -133,11 +144,17 @@ out.println("Journal Stats {"); out.incrementIndent(); out.printIndent(); - out.println("Throughput : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s" ); + out.println("Write Throughput : "+ getWriteThroughputKps() +" k/s and " + getWriteThroughputRps() +" records/s" ); + out.printIndent(); + out.println("Write Latency with force : "+ getAvgSyncedLatencyMs() +" ms" ); out.printIndent(); - out.println("Latency with force : "+ getAvgSyncedLatencyMs() +" ms" ); + out.println("Write Latency without force: "+ getAvgUnSyncedLatencyMs() +" ms" ); + out.printIndent(); + out.println("Read Throughput : "+ getReadThroughputKps() +" k/s and " + getReadThroughputRps() +" records/s" ); out.printIndent(); - out.println("Latency without force: "+ getAvgUnSyncedLatencyMs() +" ms" ); + out.println("Read Latency : "+getAvgReadLatencyMs()+" ms" ); + out.printIndent(); + out.println("Cursor Latency : "+ getAvgCursorLatencyMs() +" ms" ); out.printIndent(); out.println("Raw Stats {"); @@ -186,11 +203,18 @@ StringWriter w = new StringWriter(); PrintWriter pw = new PrintWriter(w); IndentPrinter out = new IndentPrinter(pw, " "); - out.println("Throughput : "+ getThroughputKps() +" k/s and " + getThroughputRps() +" records/s"); out.printIndent(); - out.println("Latency with force : "+getAvgSyncedLatencyMs()+" ms" ); + out.println("Write Throughput : "+ getWriteThroughputKps() +" k/s and " + getWriteThroughputRps() +" records/s"); + out.printIndent(); + out.println("Write Latency with force : "+getAvgSyncedLatencyMs()+" ms" ); + out.printIndent(); + out.println("Write Latency without force: "+getAvgUnSyncedLatencyMs()+" ms" ); + out.printIndent(); + out.println("Read Throughput : "+ getReadThroughputKps() +" k/s and " + getReadThroughputRps() +" records/s"); + out.printIndent(); + out.println("Read Latency : "+getAvgReadLatencyMs()+" ms" ); out.printIndent(); - out.println("Latency without force: "+getAvgUnSyncedLatencyMs()+" ms" ); + out.println("Cursor Latency : "+ getAvgCursorLatencyMs() +" ms" ); return w.getBuffer().toString(); } } @@ -209,7 +233,7 @@ * * @return the average throughput in k/s. */ - public double getThroughputKps() { + public double getWriteThroughputKps() { long totalTime = writeBytesCounter.getLastSampleTime()-writeBytesCounter.getStartTime(); return (((double)writeBytesCounter.getCount()/(double)totalTime)/(double)1024)*1000; } @@ -219,12 +243,32 @@ * * @return the average throughput in records/s. */ - public double getThroughputRps() { + public double getWriteThroughputRps() { long totalTime = writeRecordsCounter.getLastSampleTime()-writeRecordsCounter.getStartTime(); return (((double)writeRecordsCounter.getCount()/(double)totalTime))*1000; } /** + * Gets the average throughput in k/s. + * + * @return the average throughput in k/s. + */ + public double getReadThroughputKps() { + long totalTime = readBytesCounter.getLastSampleTime()-readBytesCounter.getStartTime(); + return (((double)readBytesCounter.getCount()/(double)totalTime)/(double)1024)*1000; + } + + /** + * Gets the average throughput in records/s. + * + * @return the average throughput in records/s. + */ + public double getReadThroughputRps() { + long totalTime = readRecordsCounter.getLastSampleTime()-readRecordsCounter.getStartTime(); + return (((double)readRecordsCounter.getCount()/(double)totalTime))*1000; + } + + /** * Gets the average number of writes done per second * * @return the average number of writes in w/s. @@ -238,8 +282,26 @@ * * @return the average sync write latency in ms. */ + public double getAvgCursorLatencyMs() { + return cursorLatency.getAverageTime(); + } + + /** + * Gets the average sync write latency in ms. + * + * @return the average sync write latency in ms. + */ public double getAvgSyncedLatencyMs() { return synchedWriteLatency.getAverageTime(); + } + + /** + * Gets the average read latency in ms. + * + * @return the average sync write latency in ms. + */ + public double getAvgReadLatencyMs() { + return readLatency.getAverageTime(); } /** Modified: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalImplTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalImplTest.java?view=diff&rev=479053&r1=479052&r2=479053 ============================================================================== --- incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalImplTest.java (original) +++ incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalImplTest.java Fri Nov 24 16:03:29 2006 @@ -44,7 +44,7 @@ int size = 1024*10; int logFileCount=2; File logDirectory = new File("test-logfile"); - private Journal journal; + private JournalImpl journal; /** * @see junit.framework.TestCase#setUp() @@ -133,6 +133,9 @@ journal.setMark(pos, false); } while( pos.getLogFileId() < 5 ); + + Packet p = createPacket("<<<data>>>"); + pos = (Location) journal.write( p, true); // Now see if we can read that first packet. Packet data; Modified: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalPerfTool.java URL: http://svn.apache.org/viewvc/incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalPerfTool.java?view=diff&rev=479053&r1=479052&r2=479053 ============================================================================== --- incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalPerfTool.java (original) +++ incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalPerfTool.java Fri Nov 24 16:03:29 2006 @@ -36,10 +36,12 @@ public static void main(String[] args) throws Exception { JournalPerfTool tool = new JournalPerfTool(); + tool.initialWorkers=10; tool.syncFrequency=15; - tool.workerIncrement=50; + tool.workerIncrement=0; tool.workerThinkTime=0; tool.verbose=false; + tool.incrementDelay=5*1000; if( args.length > 0 ) { tool.journalDirectory = new File(args[0]); Added: incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalRWPerfTool.java URL: http://svn.apache.org/viewvc/incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalRWPerfTool.java?view=auto&rev=479053 ============================================================================== --- incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalRWPerfTool.java (added) +++ incubator/activemq/activeio/trunk/activeio-core/src/test/java/org/apache/activeio/journal/active/JournalRWPerfTool.java Fri Nov 24 16:03:29 2006 @@ -0,0 +1,83 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeio.journal.active; + +import java.io.File; +import java.io.IOException; + +import org.apache.activeio.journal.Journal; +import org.apache.activeio.journal.JournalRWPerfToolSupport; + +/** + * A Performance statistics gathering tool for the JournalImpl based Journal. + * + * @version $Revision: 1.1 $ + */ +public class JournalRWPerfTool extends JournalRWPerfToolSupport { + + private int logFileSize = 1024*1024*50; + private int logFileCount = 4; + + public static void main(String[] args) throws Exception { + JournalRWPerfTool tool = new JournalRWPerfTool(); + tool.initialWriteWorkers=10; + tool.syncFrequency=15; + tool.writeWorkerIncrement=0; + tool.writeWorkerThinkTime=0; + tool.verbose=false; + tool.incrementDelay=5*1000; + + if( args.length > 0 ) { + tool.journalDirectory = new File(args[0]); + } + if( args.length > 1 ) { + tool.writeWorkerIncrement = Integer.parseInt(args[1]); + } + if( args.length > 2 ) { + tool.incrementDelay = Long.parseLong(args[2]); + } + if( args.length > 3 ) { + tool.verbose = Boolean.getBoolean(args[3]); + } + if( args.length > 4 ) { + tool.recordSize = Integer.parseInt(args[4]); + } + if( args.length > 5 ) { + tool.syncFrequency = Integer.parseInt(args[5]); + } + if( args.length > 6 ) { + tool.writeWorkerThinkTime = Integer.parseInt(args[6]); + } + if( args.length > 7 ) { + tool.logFileCount = Integer.parseInt(args[7]); + } + if( args.length > 8 ) { + tool.logFileSize = Integer.parseInt(args[8]); + } + tool.exec(); + } + + /** + * @throws IOException + * @see org.apache.activeio.journal.JournalPerfToolSupport#createJournal() + */ + public Journal createJournal() throws IOException { + return new JournalImpl( this.journalDirectory, logFileCount, logFileSize); + } + +} Modified: incubator/activemq/activeio/trunk/activeio-core/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/incubator/activemq/activeio/trunk/activeio-core/src/test/resources/log4j.properties?view=diff&rev=479053&r1=479052&r2=479053 ============================================================================== --- incubator/activemq/activeio/trunk/activeio-core/src/test/resources/log4j.properties (original) +++ incubator/activemq/activeio/trunk/activeio-core/src/test/resources/log4j.properties Fri Nov 24 16:03:29 2006 @@ -18,7 +18,7 @@ # # The logging properties used during tests.. # -log4j.rootLogger=DEBUG, out +log4j.rootLogger=INFO, stdout # CONSOLE appender not used by default log4j.appender.stdout=org.apache.log4j.ConsoleAppender