[USERGRID-677] Added revised test for the scrolling query. Added fixes to method so that it properly logs aggregation and searching for versions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/630cb4a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/630cb4a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/630cb4a8 Branch: refs/heads/USERGRID-669 Commit: 630cb4a87e0943d03b9d515166b1529b4cd649d6 Parents: cf823c4 Author: GERey <gre...@apigee.com> Authored: Fri May 22 11:21:09 2015 -0700 Committer: GERey <gre...@apigee.com> Committed: Fri May 22 11:21:09 2015 -0700 ---------------------------------------------------------------------- .../impl/EsApplicationEntityIndexImpl.java | 43 +++++++++++------ .../persistence/index/impl/EntityIndexTest.java | 49 +++++++++++++++++++- 2 files changed, 75 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/630cb4a8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index d4ed36a..ad47348 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.FilterBuilders; @@ -227,16 +228,11 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { @Override - public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion, final int limit, - final int offset ) { + public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion) { - /** - * Take a list of IndexEdge, with an entityId - and query Es directly for matches - - */ Preconditions.checkNotNull( entityId, "entityId cannot be null" ); - Preconditions.checkArgument( limit > 0, "limit must be > 0" ); + //TODO: check to see if there is some version verifcation. I know there is but i forget where. + Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" ); SearchResponse searchResponse; @@ -246,14 +242,21 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); - FilterBuilder termFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME, + //I can't just search on the entity Id. + FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME, IndexingUtils.idString( entityId ) ); - srb.setPostFilter( termFilter ); + + FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion ); + + //aggregate the filters into the and filder and feed that in. + FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter ); + + srb.setPostFilter(andFilter); if ( logger.isDebugEnabled() ) { - logger.debug( "Searching for edge index (read alias): {}\n nodeId: {},\n query: {} ", + logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ", this.alias.getReadAlias(),entityId, srb ); } @@ -264,6 +267,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { //REfactor this out and have it return a modified parseResults that will create the candidateResults from //the hit results and then keep that //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100. + //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment + //TODO: review this and make them not magic numbers when acking this PR. searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet(); //list that will hold all of the search hits @@ -273,9 +278,17 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { //add search result hits to some sort of running tally of hits. candidates = aggregateScrollResults( candidates, searchResponse ); - searchResponse = searchRequestBuilderStrategyV2 + SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2 .getScrollBuilder( searchResponse.getScrollId() ) - .setScroll( new TimeValue( 6000 ) ).execute().actionGet(); + .setScroll( new TimeValue( 6000 ) ); + + //TODO: figure out how to log exactly what we're putting into elasticsearch +// if ( logger.isDebugEnabled() ) { +// logger.debug( "Scroll search using query: {} ", +// ssrb.toString() ); +// } + + searchResponse = ssrb.execute().actionGet(); if (searchResponse.getHits().getHits().length == 0) { break; @@ -385,8 +398,6 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { final SearchHits searchHits = searchResponse.getHits(); final SearchHit[] hits = searchHits.getHits(); - logger.debug( "Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() ); - for ( SearchHit hit : hits ) { final CandidateResult candidateResult = parseIndexDocId( hit.getId() ); @@ -394,6 +405,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { candidates.add( candidateResult ); } + logger.debug( "Aggregated {} out of {} hits ",candidates.size(),searchHits.getTotalHits() ); + return candidates; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/630cb4a8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index df0c701..79fc14a 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -298,7 +298,7 @@ public class EntityIndexTest extends BaseIT { insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 0 ); - ei.addIndex("v2", 1, 0, "one"); + ei.addIndex( "v2", 1, 0, "one" ); ei.refreshAsync().toBlocking().first(); insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 1 ); @@ -325,7 +325,7 @@ public class EntityIndexTest extends BaseIT { batch.execute().toBlocking().last(); IndexRefreshCommandImpl.IndexRefreshCommandInfo info = ei.refreshAsync().toBlocking().first(); long time = info.getExecutionTime(); - log.info("refresh took ms:"+time); + log.info( "refresh took ms:" + time ); } @@ -405,6 +405,51 @@ public class EntityIndexTest extends BaseIT { } + /** + * Tests that we aggregate results only before the halfway version point. + */ + @Test + public void testScollingDeindex() { + + int numberOfEntities = 1000; + int versionToSearchFor = numberOfEntities / 2; + + Id appId = new SimpleId( "application" ); + + ApplicationScope applicationScope = new ApplicationScopeImpl( appId ); + + IndexEdge searchEdge = new IndexEdgeImpl( appId, "mehCars", SearchEdge.NodeType.SOURCE, 1 ); + + ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex( applicationScope ); + + UUID entityUUID = UUID.randomUUID(); + Id entityId = new SimpleId( "mehCar" ); + + Map entityMap = new HashMap() {{ + put( "name", "Toyota Corolla" ); + put( "introduced", 1966 ); + put( "topspeed", 111 ); + }}; + + Entity[] entity = new Entity[numberOfEntities]; + for(int i = 0; i < numberOfEntities; i++) { + entity[i] = EntityIndexMapUtils.fromMap( entityMap ); + EntityUtils.setId( entity[i], entityId ); + EntityUtils.setVersion( entity[i], UUIDGenerator.newTimeUUID() ); + entity[i].setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID ) ); + + //index the new entity. This is where the loop will be set to create like 100 entities. + entityIndex.createBatch().index( searchEdge, entity[i] ).execute().toBlocking().last(); + } + ei.refreshAsync().toBlocking().first(); + + CandidateResults candidateResults = entityIndex + .getAllEntityVersionBeforeMark( entity[versionToSearchFor].getId(), entity[versionToSearchFor].getVersion()); + assertEquals( 501, candidateResults.size() ); + } + + + private CandidateResults testQuery( final SearchEdge scope, final SearchTypes searchTypes, final ApplicationEntityIndex entityIndex, final String queryString, final int num ) {