add aggregation service
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/198f4891 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/198f4891 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/198f4891 Branch: refs/heads/two-dot-o-dev Commit: 198f489155b3f9d4346d62c0fe7ba859be917b70 Parents: c47b02d Author: Shawn Feldman <[email protected]> Authored: Wed Aug 26 09:36:32 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Wed Aug 26 09:36:32 2015 -0600 ---------------------------------------------------------------------- .../usergrid/persistence/index/EntityIndex.java | 61 ++++++--- .../index/impl/EsEntityIndexImpl.java | 51 +++++++- .../persistence/index/impl/EntityIndexTest.java | 129 ++++++++++++++++--- 3 files changed, 193 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java index 7fa2f07..6d563ff 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java @@ -39,34 +39,47 @@ public interface EntityIndex extends CPManager { /** * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists - * @param indexSuffix index name + * + * @param indexSuffix index name * @param shards * @param replicas * @param writeConsistency */ - void addIndex( - final String indexSuffix, - final int shards, - final int replicas, - final String writeConsistency - ); + void addIndex( + final String indexSuffix, + final int shards, + final int replicas, + final String writeConsistency + ); /** * Refresh the index. */ - Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync(); + Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync(); /** * Check health of cluster. */ - Health getClusterHealth(); + Health getClusterHealth(); /** * Check health of this specific index. */ - Health getIndexHealth(); + Health getIndexHealth(); + /** + * get total entity size + * @return + */ + long getEntitySize(); + + /** + * get total entity size by an edge -> "term":{"edgeName":"zzzcollzzz|roles"} + * @param edge + * @return + */ + long getEntitySize(final String edge); /** * Initialize the index if necessary. This is an idempotent operation and should not create an index @@ -82,48 +95,54 @@ public interface EntityIndex extends CPManager { /** * Search on every document in the specified search edge. Also search by the types if specified - * @param searchEdge The edge to search on + * + * @param searchEdge The edge to search on * @param searchTypes The search types to search - * @param query The query to execute - * @param limit The limit of values to return - * @param offset The offset to query on + * @param query The query to execute + * @param limit The limit of values to return + * @param offset The offset to query on * @return */ - CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query, - final int limit, final int offset ); + CandidateResults search(final SearchEdge searchEdge, final SearchTypes searchTypes, final String query, + final int limit, final int offset); /** * Same as search, just iterates all documents that match the index edge exactly. - * @param edge The edge to search on + * + * @param edge The edge to search on * @param entityId The entity that the searchEdge is connected to. * @return */ - CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId ); + CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId); /** * Returns all entity documents that match the entityId and come before the marked version - * @param entityId The entityId to match when searching + * + * @param entityId The entityId to match when searching * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted. * @return */ - CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ); + CandidateResults getAllEntityVersionsBeforeMarkedVersion(final Id entityId, final UUID markedVersion); /** * delete all application records + * * @return */ Observable deleteApplication(); /** * Get the indexes for an alias + * * @param aliasType name of alias * @return list of index names */ - String[] getIndexes( final AliasType aliasType ); + String[] getIndexes(final AliasType aliasType); /** * get all unique indexes + * * @return */ String[] getIndexes(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java index 87e2dbd..800dac3 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java @@ -62,6 +62,10 @@ import org.elasticsearch.index.query.*; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -424,7 +428,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { and query Es directly for matches */ - IndexValidationUtils.validateSearchEdge( edge ); + IndexValidationUtils.validateSearchEdge(edge); Preconditions.checkNotNull( entityId, "entityId cannot be null" ); SearchResponse searchResponse; @@ -505,12 +509,12 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder(); - FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME, - IndexingUtils.entityId( entityId ) ); + FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, + IndexingUtils.entityId(entityId)); - FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion ); + FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte(markedVersion); - FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter ); + FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter, entityVersionFilter); srb.setPostFilter(andFilter); @@ -570,7 +574,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { * Completely delete an index. */ public Observable deleteApplication() { - String idString = applicationId( applicationScope.getApplication() ); + String idString = applicationId(applicationScope.getApplication()); final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString); final String[] indexes = getIndexes(); //Added For Graphite Metrics @@ -734,6 +738,41 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData { } @Override + public long getEntitySize(){ + + + SearchRequestBuilder builder = searchRequestBuilderStrategyV2.getBuilder(); + return getEntitySizeAggregation(builder); + } + + + @Override + public long getEntitySize(final String edge){ + //"term":{"edgeName":"zzzcollzzz|roles"} + SearchRequestBuilder builder = searchRequestBuilderStrategyV2.getBuilder(); + builder.setQuery(new TermQueryBuilder("edgeName",edge)); + return getEntitySizeAggregation(builder); + } + + private long getEntitySizeAggregation( SearchRequestBuilder builder) { + final String key = "entitySize"; + SumBuilder sumBuilder = new SumBuilder(key); + sumBuilder.field("entitySize"); + builder.addAggregation(sumBuilder); + Observable<Number> o = Observable.from(builder.execute()) + .map(response -> { + Sum aggregation = (Sum) response.getAggregations().get(key); + if(aggregation == null){ + return -1; + }else{ + return aggregation.getValue(); + } + }); + Number val = o.toBlocking().first(); + return val.longValue(); + } + + @Override public int getImplementationVersion() { return IndexDataVersions.SINGLE_INDEX.getVersion(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index 9154382..564e5e7 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -270,7 +270,7 @@ public class EntityIndexTest extends BaseIT { insertJsonBlob( entityType, searchEdge, "/sample-large.json", 1, 0 ); - entityIndex.addIndex(UUID.randomUUID()+ "v2", 1, 0, "one" ); + entityIndex.addIndex(UUID.randomUUID() + "v2", 1, 0, "one"); entityIndex.refreshAsync().toBlocking().first(); insertJsonBlob( entityType, searchEdge, "/sample-large.json", 1, 1 ); @@ -278,12 +278,12 @@ public class EntityIndexTest extends BaseIT { CandidateResults crs = testQuery( searchEdge, searchTypes, "name = 'Bowers Oneil'", 1 ); EntityIndexBatch entityIndexBatch = entityIndex.createBatch(); - entityIndexBatch.deindex( searchEdge, crs.get( 0 ) ); + entityIndexBatch.deindex(searchEdge, crs.get(0)); entityIndexBatch.execute().toBlocking().last(); entityIndex.refreshAsync().toBlocking().first(); //Hilda Youn - testQuery( searchEdge, searchTypes, "name = 'Bowers Oneil'", 0 ); + testQuery(searchEdge, searchTypes, "name = 'Bowers Oneil'", 0); } @@ -291,13 +291,14 @@ public class EntityIndexTest extends BaseIT { String filePath, final int max, final int startIndex ) throws IOException { InputStream is = this.getClass().getResourceAsStream( filePath ); ObjectMapper mapper = new ObjectMapper(); - List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} ); + List<Object> sampleJson = mapper.readValue(is, new TypeReference<List<Object>>() { + }); EntityIndexBatch batch = entityIndex.createBatch(); - insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, startIndex ); + insertJsonBlob(sampleJson, batch, entityType, indexEdge, max, startIndex); batch.execute().toBlocking().last(); IndexRefreshCommandImpl.IndexRefreshCommandInfo info = entityIndex.refreshAsync().toBlocking().first(); long time = info.getExecutionTime(); - log.info( "refresh took ms:" + time ); + log.info("refresh took ms:" + time); } @@ -366,7 +367,7 @@ public class EntityIndexTest extends BaseIT { candidateResults = entityIndex .search(searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 ); - assertEquals( 0, candidateResults.size() ); + assertEquals(0, candidateResults.size()); } @@ -420,8 +421,8 @@ public class EntityIndexTest extends BaseIT { timer.stop(); - assertEquals( num, candidateResults.size() ); - log.debug( "Query time {}ms", timer.getTime() ); + assertEquals(num, candidateResults.size()); + log.debug("Query time {}ms", timer.getTime()); return candidateResults; } @@ -609,13 +610,13 @@ public class EntityIndexTest extends BaseIT { assertEquals( bill.getId(), r.get( 0 ).getId() ); r = entityIndex.search( indexScope, searchTypes, "where username = 'fred'", 10, 0); - assertEquals( fred.getId(), r.get( 0 ).getId() ); + assertEquals(fred.getId(), r.get(0).getId()); r = entityIndex.search( indexScope, searchTypes, "where age = 41", 10, 0); - assertEquals( fred.getId(), r.get( 0 ).getId() ); + assertEquals(fred.getId(), r.get(0).getId()); r = entityIndex.search( indexScope, searchTypes, "where age = 'thirtysomething'", 10, 0); - assertEquals( bill.getId(), r.get( 0 ).getId() ); + assertEquals(bill.getId(), r.get(0).getId()); } @@ -749,8 +750,8 @@ public class EntityIndexTest extends BaseIT { final String query = "where searchUUID = " + searchUUID; final CandidateResults r = - entityIndex.search( indexSCope, SearchTypes.fromTypes( entityId.getType() ), query, 10, 0); - assertEquals( user.getId(), r.get( 0 ).getId() ); + entityIndex.search( indexSCope, SearchTypes.fromTypes(entityId.getType()), query, 10, 0); + assertEquals(user.getId(), r.get(0).getId()); } @@ -781,7 +782,7 @@ public class EntityIndexTest extends BaseIT { EntityIndexBatch batch = entityIndex.createBatch(); - batch.index( indexSCope, user ); + batch.index(indexSCope, user); batch.execute().toBlocking().last(); entityIndex.refreshAsync().toBlocking().first(); @@ -790,7 +791,7 @@ public class EntityIndexTest extends BaseIT { final CandidateResults r = entityIndex.search( indexSCope, SearchTypes.fromTypes( entityId.getType() ), query, 10, 0); - assertEquals(user.getId(), r.get(0).getId() ); + assertEquals(user.getId(), r.get(0).getId()); //shouldn't match final String queryNoWildCard = "where string = 'I am'"; @@ -830,7 +831,7 @@ public class EntityIndexTest extends BaseIT { final Entity second = new Entity( "search" ); - second.setField( new StringField( "string", "bravo long string" ) ); + second.setField(new StringField("string", "bravo long string")); EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() ); @@ -897,11 +898,11 @@ public class EntityIndexTest extends BaseIT { //get ordering, so 2 is before 1 when both match IndexEdge indexScope1 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 10 ); - batch.index( indexScope1, first ); + batch.index(indexScope1, first); IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 11 ); - batch.index( indexScope2, second); + batch.index(indexScope2, second); batch.execute().toBlocking().last(); @@ -914,8 +915,8 @@ public class EntityIndexTest extends BaseIT { entityIndex.search(indexScope1, SearchTypes.fromTypes( first.getId().getType() ), singleMatchQuery, 10, 0 ); - assertEquals( 1, singleResults.size() ); - assertEquals( first.getId(), singleResults.get( 0 ).getId() ); + assertEquals(1, singleResults.size()); + assertEquals(first.getId(), singleResults.get(0).getId()); //search in reversed @@ -1213,6 +1214,92 @@ public class EntityIndexTest extends BaseIT { assertEquals( 0, noMatchesContainsOrResults.size() ); } + @Test + public void testSize(){ + final String type = UUID.randomUUID().toString(); + + Id ownerId = new SimpleId( type ); + + + final Entity first = new Entity( "search" ); + + first.setField( new StringField( "string", "I ate a sammich" ) ); + first.setSize(100); + + EntityUtils.setVersion( first, UUIDGenerator.newTimeUUID() ); + + + final Entity second = new Entity( "search" ); + second.setSize(100); + + second.setField( new StringField( "string", "I drank a beer" ) ); + + + EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() ); + + + EntityIndexBatch batch = entityIndex.createBatch(); + + + //get ordering, so 2 is before 1 when both match + IndexEdge indexScope1 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 10 ); + batch.index( indexScope1, first ); + + + IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 11 ); + batch.index( indexScope2, second); + + + batch.execute().toBlocking().last(); + entityIndex.refreshAsync().toBlocking().first(); + long size = entityIndex.getEntitySize(); + assertTrue( size >= 200 ); + + } + + @Test + public void testSizeByEdge(){ + final String type = UUID.randomUUID().toString(); + + Id ownerId = new SimpleId( "owner" ); + + + final Entity first = new Entity( type ); + + first.setField( new StringField( "string", "I ate a sammich" ) ); + first.setSize(100); + + EntityUtils.setVersion( first, UUIDGenerator.newTimeUUID() ); + + + final Entity second = new Entity( type ); + second.setSize(100); + + second.setField( new StringField( "string", "I drank a beer" ) ); + + + EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() ); + + + EntityIndexBatch batch = entityIndex.createBatch(); + + + //get ordering, so 2 is before 1 when both match + IndexEdge indexScope1 = new IndexEdgeImpl( ownerId,type , SearchEdge.NodeType.SOURCE, 10 ); + batch.index( indexScope1, first ); + + + IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, type+"er", SearchEdge.NodeType.SOURCE, 11 ); + batch.index( indexScope2, second); + + + batch.execute().toBlocking().last(); + entityIndex.refreshAsync().toBlocking().first(); + long size = entityIndex.getEntitySize(type); + assertTrue( size == 100 ); + + } + }
