[USERGRID-607] Refactored getAllEdgeDocuments to use scroll api and to do a simplified edge search. Refactored deleteIndexEdge to take advantage of the scroll api.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/aa8b20d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/aa8b20d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/aa8b20d9 Branch: refs/heads/USERGRID-669 Commit: aa8b20d9ae33cddff289aa7652ee7e4fe88c9b0e Parents: 00aa293 Author: GERey <gre...@apigee.com> Authored: Fri May 22 14:37:56 2015 -0700 Committer: GERey <gre...@apigee.com> Committed: Fri May 22 14:37:56 2015 -0700 ---------------------------------------------------------------------- .../corepersistence/index/IndexServiceImpl.java | 27 ++++------ .../index/ApplicationEntityIndex.java | 4 +- .../impl/EsApplicationEntityIndexImpl.java | 56 +++++++++++++++----- 3 files changed, 53 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 5e2a5ea..908bd3d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -187,32 +187,23 @@ public class IndexServiceImpl implements IndexService { final IndexEdge fromSource = generateScopeFromSource( edge ); final Id targetId = edge.getTargetNode(); + CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId ); - CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000, 0 ); - //Should loop thorugh and query for all documents and if there are no documents then the loop should - // exit. - do { - batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch ); - if ( !targetEdgesToBeDeindexed.getOffset().isPresent() ) break; - targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000, - targetEdgesToBeDeindexed.getOffset().get() ); - } - while ( !targetEdgesToBeDeindexed.isEmpty() ); + //1. Feed the observable the candidate results you got back. Since it now does the aggregation for you + // you don't need to worry about putting your code in a do while. + + + batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch ); + final IndexEdge fromTarget = generateScopeFromTarget( edge ); final Id sourceId = edge.getSourceNode(); - CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, 0 ); + CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId ); - do { - batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch ); - if ( !sourceEdgesToBeDeindexed.getOffset().isPresent() ) break; - sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, - sourceEdgesToBeDeindexed.getOffset().get() ); - } - while ( !sourceEdgesToBeDeindexed.isEmpty() ); + batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch ); return batch.execute(); } ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java index 9ce65e9..b392f3c 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java @@ -54,11 +54,9 @@ public interface ApplicationEntityIndex { * Same as search, just iterates all documents that match the index edge exactly. * @param edge The edge to search on * @param entityId The entity that the searchEdge is connected to. - * @param limit The limit of the values to return per search. - * @param offset The offset to page the query on. * @return */ - CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId, final int limit, final int offset); + CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId ); /** * Returns all entity documents that match the entityId and come before the marked version http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/aa8b20d9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index 99e5525..347a7b9 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -185,8 +185,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { @Override - public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId, final int limit, - final int offset ) { + public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId ) { /** * Take a list of IndexEdge, with an entityId and query Es directly for matches @@ -194,26 +193,59 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { */ IndexValidationUtils.validateSearchEdge( edge ); Preconditions.checkNotNull( entityId, "entityId cannot be null" ); - Preconditions.checkArgument( limit > 0, "limit must be > 0" ); SearchResponse searchResponse; + List<CandidateResult> candidates = new ArrayList<>(); + final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" ); - FilterBuilders.idsFilter( entityId.getType() ); - final SearchRequestBuilder srb = searchRequest.getBuilder( edge, SearchTypes.fromTypes( entityId.getType() ), - parsedQuery, limit, offset ).setTimeout( TimeValue.timeValueMillis( queryTimeout ) ); + final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); + + //I can't just search on the entity Id. + + FilterBuilder entityEdgeFilter = FilterBuilders.termFilter( IndexingUtils.EDGE_NODE_ID_FIELDNAME, + IndexingUtils.idString( edge.getNodeId() )); + + srb.setPostFilter(entityEdgeFilter); if ( logger.isDebugEnabled() ) { - logger.debug( "Searching for edge index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ", - this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(), - SearchTypes.fromTypes( entityId.getType() ), srb ); + logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ", + this.alias.getReadAlias(),entityId, srb ); } try { //Added For Graphite Metrics Timer.Context timeSearch = searchTimer.time(); - searchResponse = srb.execute().actionGet(); + + //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100. + //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment + //TODO: review this and make them not magic numbers when acking this PR. + searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet(); + + + while(true){ + //add search result hits to some sort of running tally of hits. + candidates = aggregateScrollResults( candidates, searchResponse ); + + SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2 + .getScrollBuilder( searchResponse.getScrollId() ) + .setScroll( new TimeValue( 6000 ) ); + + //TODO: figure out how to log exactly what we're putting into elasticsearch + // if ( logger.isDebugEnabled() ) { + // logger.debug( "Scroll search using query: {} ", + // ssrb.toString() ); + // } + + searchResponse = ssrb.execute().actionGet(); + + if (searchResponse.getHits().getHits().length == 0) { + break; + } + + + } timeSearch.stop(); } catch ( Throwable t ) { @@ -223,7 +255,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { } failureMonitor.success(); - return parseResults(searchResponse, parsedQuery, limit, offset); + return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings()); } @@ -264,8 +296,6 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { //Added For Graphite Metrics Timer.Context timeSearch = searchTimer.time(); - //REfactor this out and have it return a modified parseResults that will create the candidateResults from - //the hit results and then keep that //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100. //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment //TODO: review this and make them not magic numbers when acking this PR.