[USERGRID-677] Added a new SearchRequestBuilderStrategy that allows you to make term post filters easily without the cruft of the regular SearchRequestBuilderStrategy. Added a version of the scrolling api and an additional parser that will let us aggregate the from the constant queries and return a single CandidateResults set. Added a test that currently just checks to make sure the new scrolling search still functions as a regular search ( it does! Yay! ).
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf823c47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf823c47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf823c47 Branch: refs/heads/USERGRID-669 Commit: cf823c47d3fc2bb92198e7d1e65ecc4be96ccfcc Parents: 11648ab Author: GERey <gre...@apigee.com> Authored: Fri May 22 09:53:44 2015 -0700 Committer: GERey <gre...@apigee.com> Committed: Fri May 22 09:53:44 2015 -0700 ---------------------------------------------------------------------- .../SearchRequestBuilderStrategyV2.java | 69 ++++++++++++++ .../impl/EsApplicationEntityIndexImpl.java | 98 ++++++++++++++++++-- 2 files changed, 160 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java new file mode 100644 index 0000000..0511b75 --- /dev/null +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java @@ -0,0 +1,69 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder; + + +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; +import org.elasticsearch.action.search.SearchType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.impl.EsProvider; +import org.apache.usergrid.persistence.index.impl.IndexAlias; +import org.apache.usergrid.persistence.index.impl.IndexingUtils; + + +/** + * The class to use when creating new custom queries for specific terms to elasticsearch. + */ +public class SearchRequestBuilderStrategyV2 { + private static final Logger logger = LoggerFactory.getLogger( SearchRequestBuilderStrategyV2.class ); + + private final EsProvider esProvider; + private final ApplicationScope applicationScope; + private final IndexAlias alias; + private final int cursorTimeout; + + + public SearchRequestBuilderStrategyV2( final EsProvider esProvider, final ApplicationScope applicationScope, + final IndexAlias alias, int cursorTimeout ) { + + this.esProvider = esProvider; + this.applicationScope = applicationScope; + this.alias = alias; + this.cursorTimeout = cursorTimeout; + } + + public SearchRequestBuilder getBuilder(){ + SearchRequestBuilder srb = + esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes( IndexingUtils.ES_ENTITY_TYPE ).setSearchType( + SearchType.QUERY_THEN_FETCH); + + + return srb; + } + + public SearchScrollRequestBuilder getScrollBuilder(String scrollId){ + return esProvider.getClient().prepareSearchScroll( scrollId ); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index 2709569..d4ed36a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -41,8 +42,6 @@ import org.elasticsearch.search.SearchHits; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang.NotImplementedException; - import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; @@ -50,6 +49,7 @@ import org.apache.usergrid.persistence.index.AliasedEntityIndex; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.CandidateResult; import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2; import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.IndexFig; @@ -96,6 +96,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { private final Timer deleteApplicationTimer; private final Meter deleteApplicationMeter; private final SearchRequestBuilderStrategy searchRequest; + private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2; private FailureMonitor failureMonitor; private final int cursorTimeout; private final long queryTimeout; @@ -119,7 +120,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { this.esProvider = provider; mapManager = mapManagerFactory.createMapManager( mapScope ); - this.searchTimer = metricsFactory.getTimer(EsApplicationEntityIndexImpl.class, "search.timer"); + this.searchTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.timer" ); this.cursorTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.cursor.timer" ); this.cursorTimeout = config.getQueryCursorTimeout(); this.queryTimeout = config.getWriteTimeout(); @@ -132,6 +133,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { this.alias = indexIdentifier.getAlias(); this.searchRequest = new SearchRequestBuilderStrategy( esProvider, appScope, alias, cursorTimeout ); + this.searchRequestBuilderStrategyV2 = new SearchRequestBuilderStrategyV2( esProvider, appScope, alias, cursorTimeout ); } @@ -156,7 +158,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { final ParsedQuery parsedQuery = ParsedQueryBuilder.build( query ); final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset ) - .setTimeout( TimeValue.timeValueMillis(queryTimeout) ); + .setTimeout( TimeValue.timeValueMillis( queryTimeout ) ); if ( logger.isDebugEnabled() ) { logger.debug( "Searching index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ", @@ -204,7 +206,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { if ( logger.isDebugEnabled() ) { logger.debug( "Searching for edge index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ", this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(), - SearchTypes.fromTypes( entityId.getType()), srb ); + SearchTypes.fromTypes( entityId.getType() ), srb ); } try { @@ -225,9 +227,72 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { @Override - public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, final UUID markedVersion, final int limit, + public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion, final int limit, final int offset ) { - throw new NotImplementedException( "Implement me or else I won't work." ); + + /** + * Take a list of IndexEdge, with an entityId + and query Es directly for matches + + */ + Preconditions.checkNotNull( entityId, "entityId cannot be null" ); + Preconditions.checkArgument( limit > 0, "limit must be > 0" ); + + SearchResponse searchResponse; + + List<CandidateResult> candidates = new ArrayList<>(); + + final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" ); + + final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); + + FilterBuilder termFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME, + IndexingUtils.idString( entityId ) ); + srb.setPostFilter( termFilter ); + + + + if ( logger.isDebugEnabled() ) { + logger.debug( "Searching for edge index (read alias): {}\n nodeId: {},\n query: {} ", + this.alias.getReadAlias(),entityId, srb ); + } + + try { + //Added For Graphite Metrics + Timer.Context timeSearch = searchTimer.time(); + + //REfactor this out and have it return a modified parseResults that will create the candidateResults from + //the hit results and then keep that + //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100. + searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet(); + + //list that will hold all of the search hits + + + while(true){ + //add search result hits to some sort of running tally of hits. + candidates = aggregateScrollResults( candidates, searchResponse ); + + searchResponse = searchRequestBuilderStrategyV2 + .getScrollBuilder( searchResponse.getScrollId() ) + .setScroll( new TimeValue( 6000 ) ).execute().actionGet(); + + if (searchResponse.getHits().getHits().length == 0) { + break; + } + + + } + timeSearch.stop(); + } + catch ( Throwable t ) { + logger.error( "Unable to communicate with Elasticsearch", t ); + failureMonitor.fail( "Unable to execute batch", t ); + throw t; + } + failureMonitor.success(); + + return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings()); } @@ -314,4 +379,23 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { return candidateResults; } + private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates, + final SearchResponse searchResponse ){ + + final SearchHits searchHits = searchResponse.getHits(); + final SearchHit[] hits = searchHits.getHits(); + + logger.debug( "Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() ); + + for ( SearchHit hit : hits ) { + + final CandidateResult candidateResult = parseIndexDocId( hit.getId() ); + + candidates.add( candidateResult ); + } + + return candidates; + + } + }