Updated tests to also contain larger body for measuring performance with more accurate entity size
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bab5ba7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bab5ba7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bab5ba7f Branch: refs/heads/USERGRID-347 Commit: bab5ba7f018c18aab2ee09b98eeb74c6668a893a Parents: c864ab6 Author: Todd Nine <[email protected]> Authored: Fri Mar 20 15:23:29 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Fri Mar 20 15:23:29 2015 -0600 ---------------------------------------------------------------------- stack/corepersistence/queryindex/pom.xml | 57 ++-- .../persistence/index/EntityIndexBatch.java | 5 + .../index/impl/EsEntityIndexBatchImpl.java | 32 +- .../usergrid/persistence/index/query/Query.java | 6 +- .../persistence/index/guice/IndexTestFig.java | 7 +- .../index/impl/IndexLoadTestsIT.java | 317 ++++++++++++++++--- 6 files changed, 329 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml index 2dc40ce..04b3b1a 100644 --- a/stack/corepersistence/queryindex/pom.xml +++ b/stack/corepersistence/queryindex/pom.xml @@ -45,45 +45,24 @@ </executions> </plugin> -<!-- <plugin> - <groupId>org.safehaus.chop</groupId> - <artifactId>chop-maven-plugin</artifactId> - <version>${chop.version}</version> - - - NOTE: you should be putting most of these variables into your settings.xml - as an automatically activated profile. - - - <configuration> - <accessKey>${aws.s3.key}</accessKey> - <secretKey>${aws.s3.secret}</secretKey> - <availabilityZone>${availabilityZone}</availabilityZone> - <bucketName>${aws.s3.bucket}</bucketName> - <managerAppUsername>admin</managerAppUsername> - <managerAppPassword>${manager.app.password}</managerAppPassword> - <testPackageBase>org.apache.usergrid</testPackageBase> - <runnerSSHKeyFile>${runner.ssh.key.file}</runnerSSHKeyFile> - <failIfCommitNecessary>false</failIfCommitNecessary> - <amiID>${ami.id}</amiID> - <instanceType>m1.large</instanceType> - <resultsDirectory>${resultsDirectory}</resultsDirectory> - <dumpType>${dumpType}</dumpType> - <coldRestartTomcat>true</coldRestartTomcat> - <awsSecurityGroup>${security.group}</awsSecurityGroup> - <runnerKeyPairName>${runner.keypair.name}</runnerKeyPairName> - <runnerCount>6</runnerCount> - <securityGroupExceptions> - - Add your own IP address as an exception to allow access - but please do this in the settings.xml file .. essentially - all parameters should be in the settings.xml file. - - <param>${myip.address}/32:24981</param> - <param>${myip.address}/32:22</param> - </securityGroupExceptions> - </configuration> - </plugin>--> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + + <configuration> + <includes> + <include>**/*IT.java</include> + <include>**/*Test.java</include> + </includes> + <!-- run this one manually to stress test --> + <excludes> + <exclude>**/IndexLoadTestsIT.java</exclude> + </excludes> + + </configuration> + + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java index 580a7f4..77b6e7a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java @@ -68,4 +68,9 @@ public interface EntityIndexBatch { */ public BetterFuture execute(); + /** + * Get the number of operations in the batch + * @return + */ + public int size(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java index 92312d2..e9ba09c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java @@ -204,6 +204,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { return indexBatchBufferProducer.put(tempContainer); } + + @Override + public int size() { + return container.getDeIndexRequests().size() + container.getIndexRequests().size(); + } + + /** * Set the entity as a map with the context * @@ -241,23 +248,24 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch { Field field = ( Field ) f; - if ( f instanceof ListField ) { - List list = ( List ) field.getValue(); - entityMap.put( field.getName().toLowerCase(), - new ArrayList( processCollectionForMap( list ) ) ); - if ( !list.isEmpty() ) { - if ( list.get( 0 ) instanceof String ) { - entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(), - new ArrayList( processCollectionForMap( list ) ) ); - } - } - } - else if ( f instanceof ArrayField ) { + if ( f instanceof ArrayField ) { List list = ( List ) field.getValue(); entityMap.put( field.getName().toLowerCase(), new ArrayList( processCollectionForMap( list ) ) ); } + else if ( f instanceof ListField ) { + List list = ( List ) field.getValue(); + entityMap.put( field.getName().toLowerCase(), + new ArrayList( processCollectionForMap( list ) ) ); + + if ( !list.isEmpty() ) { + if ( list.get( 0 ) instanceof String ) { + entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(), + new ArrayList( processCollectionForMap( list ) ) ); + } + } + } else if ( f instanceof SetField ) { Set set = ( Set ) field.getValue(); entityMap.put( field.getName().toLowerCase(), http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java index da68772..67a1731 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java @@ -67,7 +67,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * TODO, this is a copy from 1.0 and a mess. Clean this up to be clearer as we iterate on our refactor of EM/RM + * Query should only be used for term querying, not identity of name lookup, that should + * come directly from cassandra + */ public class Query { private static final Logger logger = LoggerFactory.getLogger( Query.class ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java index ecf3dfa..5aacef0 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java @@ -38,7 +38,7 @@ public interface IndexTestFig extends GuicyFig { @Default( "16" ) public int getNumberOfWorkers(); - @Key( "stresstest.numberofRecords" ) + @Key( "stresstest.numberOfRecords" ) @Default( "10000" ) public int getNumberOfRecords(); @@ -54,4 +54,9 @@ public interface IndexTestFig extends GuicyFig { @Key( "stresstest.applicationId" ) @Default( "0df46683-cdab-11e4-83c2-d2be4de3081a" ) public String getApplicationId(); + + @Key( "stresstest.readThreads" ) + @Default( "40" ) + public int getConcurrentReadThreads(); + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java index 82af950..057c472 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java @@ -19,45 +19,78 @@ package org.apache.usergrid.persistence.index.impl; -import java.util.List; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.core.test.UseModules; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.guice.IndexTestFig; import org.apache.usergrid.persistence.index.guice.TestIndexModule; +import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.index.query.Query; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.field.ArrayField; +import org.apache.usergrid.persistence.model.field.BooleanField; +import org.apache.usergrid.persistence.model.field.DoubleField; import org.apache.usergrid.persistence.model.field.IntegerField; +import org.apache.usergrid.persistence.model.field.StringField; +import org.apache.usergrid.persistence.model.util.EntityUtils; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Slf4jReporter; +import com.codahale.metrics.Timer; import com.google.inject.Inject; import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; import rx.schedulers.Schedulers; +import static org.junit.Assert.assertEquals; + /** - * TODO: make CorePerformanceIT configurable, add CHOP markup. + * This is configuration via the properties in the IndexTestFig object. Most of these values you won't need to touch. + * To run this against a live cluster. You execute this maven command. + * + * <command> mvn test -Dtest=IndexLoadTestsIT#testHeavyLoadValidate -Dstresstest.numWorkers=16 + * -Dstresstest.numberOfRecords=10000 </command> + * + * This will insert 10000 records for each worker thread. There will be 16 worker threads. Validation will occur after + * the wait timeout (stresstest.validate.wait) of 2 seconds. Up to 40 concurrent queries (stresstest.readThreads) will + * be executed to validate each result. + * + * By default this test is excluded from surefire, and will need to be run manually */ @RunWith( EsRunner.class ) @UseModules( { TestIndexModule.class } ) -@Ignore( "Should only be run during load tests of elasticsearch" ) public class IndexLoadTestsIT extends BaseIT { private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class ); + public static final String FIELD_WORKER_INDEX = "workerIndex"; + private static final String FIELD_ORDINAL = "ordinal"; + private static final String FIELD_UNIQUE_IDENTIFIER = "uniqueIdentifier"; + + @Inject + @Rule + public MigrationManagerRule migrationManagerRule; @ClassRule public static ElasticSearchResource es = new ElasticSearchResource(); @@ -69,70 +102,270 @@ public class IndexLoadTestsIT extends BaseIT { @Inject public EntityIndexFactory entityIndexFactory; + @Inject + public MetricsFactory metricsFactory; + + private Meter batchWriteTPS; + private Timer batchWriteTimer; + + private Meter queryTps; + private Timer queryTimer; + + private Slf4jReporter reporter; + + + @Before + public void setupMeters() { + batchWriteTPS = metricsFactory.getMeter( IndexLoadTestsIT.class, "write.tps" ); + + batchWriteTimer = metricsFactory.getTimer( IndexLoadTestsIT.class, "write.timer" ); + + queryTps = metricsFactory.getMeter( IndexLoadTestsIT.class, "query.tps" ); + + queryTimer = metricsFactory.getTimer( IndexLoadTestsIT.class, "query.timer" ); + + reporter = + Slf4jReporter.forRegistry( metricsFactory.getRegistry() ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS ) + .convertDurationsTo( TimeUnit.MILLISECONDS ).build(); + + reporter.start( 30, TimeUnit.SECONDS ); + } + + @After + public void printMetricsBeforeShutdown() { + //stop the log reporter and print the last report + reporter.stop(); + reporter.report(); + } + + + /** + * Perform the following 1, spin up the specified number of workers For each worker, insert the specified number of + * elements + * + * Wait the wait time after buffer execution before beginning validate + * + * Validate every entity inserted is returned by a search. + */ @Test - public void testHeavyLoad() { + public void testHeavyLoadValidate() { + final String userAppId = indexTestFig.getApplicationId(); + + + //if it's unset, generate one + final String uniqueIdentifier = UUIDGenerator.newTimeUUID().toString(); - final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() ); + //use the appId supplied, or generate one + final UUID applicationUUID = UUID.fromString( userAppId ); final Id applicationId = new SimpleId( applicationUUID, "application" ); final ApplicationScope scope = new ApplicationScopeImpl( applicationId ); final EntityIndex index = entityIndexFactory.createEntityIndex( scope ); + final IndexScope indexScope = new IndexScopeImpl( applicationId, "test" ); + //create our index if it doesn't exist index.initializeIndex(); - final Observable<Entity> createEntities = createStreamFromWorkers( index, applicationId ); - //run them all - createEntities.toBlocking().last(); + //delay our verification for indexing to happen + final Observable<DataLoadResult> dataLoadResults = + createStreamFromWorkers( index, indexScope, uniqueIdentifier ).buffer( indexTestFig.getBufferSize() ) + //perform a delay to let ES index from our batches + .delay( indexTestFig.getValidateWait(), TimeUnit.MILLISECONDS ) + //do our search in parallel, otherwise this test will take far too long + .flatMap( entitiesToValidate -> { + return Observable.from( entitiesToValidate ).map( entityObservable -> { + + + final int workerIndex = ( int ) entityObservable.getField( FIELD_WORKER_INDEX ).getValue(); + final int ordinal = ( int ) entityObservable.getField( FIELD_ORDINAL ).getValue(); + + + final Timer.Context queryTimerContext = queryTimer.time(); + + + //execute our search + final CandidateResults results = index + .search( indexScope, SearchTypes.fromTypes( indexScope.getName() ), Query.fromQLNullSafe( + "select * where " + FIELD_WORKER_INDEX + " = " + workerIndex + " AND " + FIELD_ORDINAL + + " = " + ordinal + " AND " + FIELD_UNIQUE_IDENTIFIER + " = '" + uniqueIdentifier + + "'" ) ); + + queryTps.mark(); + queryTimerContext.stop(); + + boolean found; + + if ( !results.isEmpty() && results.get( 0 ).getId().equals( entityObservable.getId() ) ) { + found = true; + } + else { + found = false; + } + + return new EntitySearchResult( entityObservable, found ); + } ).subscribeOn( Schedulers.io() ); + }, indexTestFig.getConcurrentReadThreads() ) + + //collect all the results into a single data load result + .collect( () -> new DataLoadResult(), ( dataloadResult, entitySearchResult ) -> { + if ( entitySearchResult.found ) { + dataloadResult.success(); + return; + } + + final int ordinal = ( int ) entitySearchResult.searched.getField( FIELD_ORDINAL ).getValue(); + final int worker = ( int ) entitySearchResult.searched.getField( FIELD_WORKER_INDEX ).getValue(); + + dataloadResult.failed(); + + log.error( + "Could not find entity with worker {}, ordinal {}, and Id {} after waiting {} milliseconds", + worker, ordinal, entitySearchResult.searched.getId(), indexTestFig.getValidateWait() ); + } ); + + + //wait for processing to finish + final DataLoadResult result = dataLoadResults.toBlocking().last(); + + final long expectedCount = indexTestFig.getNumberOfRecords() * indexTestFig.getNumberOfWorkers(); + + assertEquals( "Excepted to have no failures", 0, result.getFailCount() ); + + assertEquals( "Excepted to find all records", expectedCount, result.getSuccessCount() ); } - public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final Id ownerId ) { + public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final IndexScope indexScope, + final String uniqueIdentifier ) { //create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread() return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap( - integer -> createWriteObservable( entityIndex, ownerId, integer ).subscribeOn( Schedulers.newThread() ) ); + integer -> createWriteObservable( entityIndex, indexScope, uniqueIdentifier, integer ) + .subscribeOn( Schedulers.newThread() ) ); } - private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId, - final int workerIndex ) { - - - final IndexScope scope = new IndexScopeImpl( ownerId, "test" ); + private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final IndexScope indexScope, + final String uniqueIdentifier, final int workerIndex ) { return Observable.range( 0, indexTestFig.getNumberOfRecords() ) //create our entity - .map( new Func1<Integer, Entity>() { - @Override - public Entity call( final Integer integer ) { - final Entity entity = new Entity( "test" ); - - entity.setField( new IntegerField( "workerIndex", workerIndex ) ); - entity.setField( new IntegerField( "ordinal", integer ) ); - - return entity; - } - } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() { - @Override - public void call( final List<Entity> entities ) { - //take our entities and roll them into a batch - Observable.from( entities ) - .collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> { - - entityIndexBatch.index( scope, entity ); - } ).doOnNext( entityIndexBatch -> { - entityIndexBatch.execute(); - } ).toBlocking().last(); - } - } ) + .map( integer -> { + final Entity entity = new Entity( indexScope.getName() ); + + entity.setField( new IntegerField( FIELD_WORKER_INDEX, workerIndex ) ); + entity.setField( new IntegerField( FIELD_ORDINAL, integer ) ); + entity.setField( new StringField( FIELD_UNIQUE_IDENTIFIER, uniqueIdentifier ) ); + + + EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); + + //add some fields for indexing + + entity.setField( new StringField( "emtpyField", "" ) ); + entity.setField( new StringField( "singleCharField1", "L" ) ); + entity.setField( new StringField( "longStringField", "000000000000001051" ) ); + entity.setField( new StringField( "singleCharField2", "0" ) ); + entity.setField( new StringField( "singleCharField3", "0" ) ); + entity.setField( new StringField( "singleCharField4", "0" ) ); + entity.setField( new StringField( "dept", "VALUE" ) ); + entity.setField( new StringField( "description", "I'm a longer description" ) ); + + ArrayField<Long> array = new ArrayField<>("longs"); + + array.add( 9315321008910l ); + array.add( 9315321009016l ); + array.add( 9315321009115l ); + array.add( 9315321009313l ); + array.add( 9315321009320l ); + array.add( 9315321984955l ); + + entity.setField( array ); + + entity.setField( new StringField( "singleCharField5", "N" ) ); + entity.setField( new BooleanField( "booleanField1", true ) ); + entity.setField( new BooleanField( "booleanField2", false ) ); + entity.setField( new StringField( "singleCharField5", "N" ) ); + entity.setField( new StringField( "singleCharField6", "N" ) ); + entity.setField( new StringField( "stringField", "ALL CAPS)); I MEAN IT" ) ); + entity.setField( new DoubleField( "doubleField1", 750.0 ) ); + entity.setField( new StringField( "charField", "AB" ) ); + entity.setField( new StringField( "name", "000000000000001051-1004" ) ); + + return entity; + } ) + //buffer up a batch size + .buffer( indexTestFig.getBufferSize() ).doOnNext( entities -> { + + //take our entities and roll them into a batch + Observable.from( entities ).collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> { + entityIndexBatch.index( indexScope, entity ); + } ).doOnNext( entityIndexBatch -> { + log.info( "Indexing next {} in batch", entityIndexBatch.size() ); + //gather the metrics + final Timer.Context time = batchWriteTimer.time(); + batchWriteTPS.mark(); + + + //execute + entityIndexBatch.execute(); + //stop + time.close(); + } ).toBlocking().last(); + } ) //translate back into a stream of entities for the caller to use - .flatMap(entities -> Observable.from( entities ) ); + .flatMap( entities -> Observable.from( entities ) ); + } + + + /** + * Class for entity search results + */ + private static class EntitySearchResult { + + public final Entity searched; + public final boolean found; + + + private EntitySearchResult( final Entity searched, final boolean found ) { + this.searched = searched; + this.found = found; + } + } + + + /** + * Class for collecting results + */ + private static final class DataLoadResult { + private final AtomicLong successCount = new AtomicLong( 0 ); + private final AtomicLong failCount = new AtomicLong( 0 ); + + + public void success() { + successCount.addAndGet( 1 ); + } + + + public long getSuccessCount() { + return successCount.get(); + } + + + public void failed() { + failCount.addAndGet( 1 ); + } + + + public long getFailCount() { + return failCount.get(); + } } }
