Comments with changes to make to selective indexing. Changes to reindexing to add a delay given the reindexing call.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/82ff53b1 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/82ff53b1 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/82ff53b1 Branch: refs/heads/release-2.1.1 Commit: 82ff53b145d84a7fbf198f0eb63153af0d2b6633 Parents: 1ba12b8 Author: George Reyes <[email protected]> Authored: Fri Mar 25 09:06:10 2016 -0700 Committer: George Reyes <[email protected]> Committed: Fri Mar 25 09:06:10 2016 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 4 +- .../corepersistence/CpManagerCache.java | 1 + .../corepersistence/index/IndexServiceImpl.java | 23 +++++- .../index/ReIndexRequestBuilder.java | 7 ++ .../index/ReIndexRequestBuilderImpl.java | 32 ++++++++ .../index/ReIndexServiceImpl.java | 23 ++++-- .../usergrid/persistence/map/MapManager.java | 2 + .../rest/applications/CollectionResource.java | 14 +++- .../collection/CollectionsResourceIT.java | 77 +++++++++++++++++++- 9 files changed, 169 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 1fbacbe..bb3abcb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -1739,9 +1739,11 @@ public class CpEntityManager implements EntityManager { //haven't decided which one I should base off of which, maybe from epoch to utc + + //TODO: change timeservice as below then use timeservice. + //TODO: only allow a single reindex in elasticsearch at a time. akka. Instant timeInstance = Instant.now(); - Date date = Date.from(timeInstance); Long epoch = timeInstance.toEpochMilli(); Map<String,Object> schemaMap = new HashMap<>( ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java index 0408bbd..0b7bb43 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java @@ -37,6 +37,7 @@ import com.google.inject.Inject; /** * Cache for managing our other managers. Now just a delegate. Needs refactored away */ + public class CpManagerCache implements ManagerCache { private final EntityCollectionManagerFactory ecmf; http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index c0f2e1b..5f50a32 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -121,8 +121,10 @@ public class IndexServiceImpl implements IndexService { //do our observable for batching //try to send a whole batch if we can + + //TODO: extract the below and call a single method. final Observable<IndexOperationMessage> batches = sourceEdgesToIndex - .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize() ) + .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize() ) //TODO: change to delay. maybe. at least to the before buffer. //map into batches based on our buffer size .flatMap( buffer -> Observable.from( buffer ) @@ -202,10 +204,9 @@ public class IndexServiceImpl implements IndexService { */ private Map getFilteredStringObjectMap( final ApplicationScope applicationScope, final Entity entity, final IndexEdge indexEdge ) { - IndexOperation indexOperation = new IndexOperation(); - - indexEdge.getNodeId().getUuid(); + //look into this. + IndexOperation indexOperation = new IndexOperation(); Id mapOwner = new SimpleId( indexEdge.getNodeId().getUuid(), TYPE_APPLICATION ); @@ -217,6 +218,7 @@ public class IndexServiceImpl implements IndexService { Set<String> defaultProperties; ArrayList fieldsToKeep; + //TODO: extract collection name using other classes than the split. String jsonSchemaMap = mm.getString( indexEdge.getEdgeName().split( "\\|" )[1] ); //If we do have a schema then parse it and add it to a list of properties we want to keep.Otherwise return. @@ -226,6 +228,8 @@ public class IndexServiceImpl implements IndexService { Schema schema = Schema.getDefaultSchema(); defaultProperties = schema.getRequiredProperties( indexEdge.getEdgeName().split( "\\|" )[1] ); fieldsToKeep = ( ArrayList ) jsonMapData.get( "fields" ); + //TODO: add method here to update the relevant fields in the map manager when it was accessed. + defaultProperties.addAll( fieldsToKeep ); } else { @@ -233,6 +237,7 @@ public class IndexServiceImpl implements IndexService { } //Returns the flattened map of the entity. + //TODO: maybe instead pass the fields to keep to the flattening. Map map = indexOperation.convertedEntityToBeIndexed( applicationScope, indexEdge, entity ); HashSet mapFields = ( HashSet ) map.get( "fields" ); @@ -245,6 +250,12 @@ public class IndexServiceImpl implements IndexService { //Checks to see if the fieldname is a default property. If it is then keep it, otherwise send it to //be verified the aptly named method + + //one.two.three + //one.two.four + //one.two3.five + //one.two + //fields { one.two } if ( !defaultProperties.contains( fieldName ) ) { iterateThroughMapForFieldsToBeIndexed( fieldsToKeep, collectionIterator, fieldName ); } @@ -290,10 +301,14 @@ public class IndexServiceImpl implements IndexService { //Then if that check passes we go to check that both parts are equal. If they are ever not equal // e.g one.two.three and one.three.two then it shouldn't be included + //TODO: regex. for ( int index = 0; index < flattedRequirementString.length; index++ ) { //if the array contains a string that it is equals to then set the remove flag to true //otherwise remain false. + //one.three + //one.two + //one if ( flattedStringArray.length <= index ) { toRemoveFlag = true; break; http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java index 0863a63..139f832 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java @@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.index; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -52,6 +53,8 @@ public interface ReIndexRequestBuilder { ReIndexRequestBuilder withCursor(final String cursor); + ReIndexRequestBuilder withDelay( int delayTimer, TimeUnit timeUnit ); + /** * Set the timestamp to re-index entities updated >= this timestamp * @param timestamp @@ -60,6 +63,10 @@ public interface ReIndexRequestBuilder { ReIndexRequestBuilder withStartTimestamp(final Long timestamp); + Optional<Integer> getDelayTimer(); + + Optional<TimeUnit> getTimeUnitOptional(); + /** * Get the application scope * @return http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java index e93ccf1..d3e7ecf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java @@ -21,6 +21,9 @@ package org.apache.usergrid.corepersistence.index; import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.auth.IAuthenticator; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -37,6 +40,8 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder { private Optional<String> withCollectionName = Optional.absent(); private Optional<String> cursor = Optional.absent(); private Optional<Long> updateTimestamp = Optional.absent(); + private Optional<Integer> delayTimer = Optional.absent(); + private Optional<TimeUnit> timeUnitOptional = Optional.absent(); /*** @@ -81,6 +86,22 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder { /** + * Determines whether we should tack on a delay for reindexing and for how long if we do. Also + * allowed to specify how throttled back it should be. + * @param delayTimer + * @param timeUnit + * @return + */ + @Override + public ReIndexRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){ + this.delayTimer = Optional.fromNullable( delayTimer ); + this.timeUnitOptional = Optional.fromNullable( timeUnit ); + + return this; + } + + + /** * Set start timestamp in epoch time. Only entities updated since this time will be processed for indexing * @param timestamp * @return @@ -93,6 +114,17 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder { @Override + public Optional<Integer> getDelayTimer() { + return delayTimer; + } + + @Override + public Optional<TimeUnit> getTimeUnitOptional() { + return timeUnitOptional; + } + + + @Override public Optional<ApplicationScope> getApplicationScope() { if ( this.withApplicationId.isPresent() ) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index fc1c97f..558a18c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -99,6 +99,7 @@ public class ReIndexServiceImpl implements ReIndexService { } + //TODO: optional delay, param. @Override public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder ) { @@ -111,6 +112,10 @@ public class ReIndexServiceImpl implements ReIndexService { final Optional<ApplicationScope> appId = reIndexRequestBuilder.getApplicationScope(); + final Optional<Integer> delayTimer = reIndexRequestBuilder.getDelayTimer(); + + final Optional<TimeUnit> timeUnitOptional = reIndexRequestBuilder.getTimeUnitOptional(); + Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()), "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" ); @@ -123,16 +128,24 @@ public class ReIndexServiceImpl implements ReIndexService { // create an observable that loads a batch to be indexed - final Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes, + Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes, reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() ) - .buffer( indexProcessorFig.getReindexBufferSize()) - //.delay( 50, TimeUnit.MILLISECONDS ) - .doOnNext(edges -> { + .buffer( indexProcessorFig.getReindexBufferSize()); + if(delayTimer.isPresent()){ + if(timeUnitOptional.isPresent()){ + runningReIndex = runningReIndex.delay( delayTimer.get(),timeUnitOptional.get() ); + } + else{ + runningReIndex = runningReIndex.delay( delayTimer.get(), TimeUnit.MILLISECONDS ); + } + } + + runningReIndex = runningReIndex.doOnNext(edges -> { logger.info("Sending batch of {} to be indexed.", edges.size()); indexService.indexBatch(edges, modifiedSince); - }); + }); //start our sampler and state persistence http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java index 80e2d17..ca8fd9a 100644 --- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java +++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java @@ -27,6 +27,8 @@ import java.util.UUID; /** * Generator of a map manager instance */ +//TODO: This should be a singleton, otherwise cache could be out of sync and would need to invalidated everywher + //TODO: make manager cache injectable everywhere. public interface MapManager { http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java index b674ac4..fb660fa 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java @@ -19,6 +19,7 @@ package org.apache.usergrid.rest.applications; import java.util.UUID; +import java.util.concurrent.TimeUnit; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -47,6 +48,7 @@ import org.apache.usergrid.rest.AbstractContextResource; import org.apache.usergrid.rest.ApiResponse; import org.apache.usergrid.rest.RootResource; import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess; +import org.apache.usergrid.rest.security.annotations.RequireSystemAccess; import org.apache.usergrid.services.AbstractCollectionService; import org.apache.usergrid.services.ServiceAction; import org.apache.usergrid.services.ServiceParameter; @@ -91,6 +93,9 @@ public class CollectionResource extends ServiceResource { if(logger.isTraceEnabled()){ logger.trace( "ServiceResource.executePostOnIndexes" ); } + /** + + */ Object json; if ( StringUtils.isEmpty( body ) ) { @@ -135,10 +140,12 @@ public class CollectionResource extends ServiceResource { return response; } + //TODO: this can't be controlled and until it can be controlled we should allow muggles to do this. So system access only. + //TODO: use scheduler here to get around people sending a reindex call 30 times. @POST @Path("_reindex") @Produces({ MediaType.APPLICATION_JSON,"application/javascript"}) - @RequireApplicationAccess + @RequireSystemAccess @JSONP public ApiResponse executePostForReindexing( @Context UriInfo ui, String body, @QueryParam("callback") @DefaultValue("callback") String callback ) @@ -146,8 +153,8 @@ public class CollectionResource extends ServiceResource { final ReIndexRequestBuilder request = createRequest().withApplicationId( services.getApplicationId() ).withCollection( - String.valueOf( getServiceParameters().get( 0 ) ) ); -// + String.valueOf( getServiceParameters().get( 0 ) ) ).withDelay( 1, TimeUnit.SECONDS ); + return executeAndCreateResponse( request, callback ); } @@ -170,6 +177,7 @@ public class CollectionResource extends ServiceResource { } + //TODO: change this to {itemName}/_indexes and that should do what we already have. Then we don't have this overriden method. @Override @Path("{itemName}") public AbstractContextResource addNameParameter( @Context UriInfo ui, @PathParam("itemName") PathSegment itemName ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java index 5900dd5..109bb6f 100644 --- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java +++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java @@ -168,9 +168,84 @@ public class CollectionsResourceIT extends AbstractRestIT { //Reindex and verify that the entity only has field one index. this.app().collection( "testCollection" ).collection( "_reindex" ).post(); - Thread.sleep( 10000 ); + // Thread.sleep( 10000 ); + //refreshIndex(); + + for(int i = 0; i < 10; i++) { + String query = "one ='value"+ i + "'"; + QueryParameters queryParameters = new QueryParameters().setQuery( query ); + + //having a name breaks it. Need to get rid of the stack trace and also + Collection tempEntity = this.app().collection( "testCollection" ).get( queryParameters, true ); + Entity reindexedEntity = tempEntity.getResponse().getEntity(); + assertEquals( "value"+i, reindexedEntity.get( "one" ) ); + + //Verify if you can query on an entity that was not indexed and that no entities are returned. + query = "two = 'valuetwo1"+ i + "'"; + queryParameters = new QueryParameters().setQuery( query ); + tempEntity = this.app().collection( "testCollection" ).get( queryParameters, true ); + assertEquals( 0, tempEntity.getResponse().getEntities().size() ); + } + } + + + @Test + public void postToCollectionSchemaAndVerifyFieldsAreUpdated() throws Exception { + + //Create test collection with test entity that is full text indexed. + Entity testEntity = new Entity(); + + + for(int i = 0; i < 10; i++){ + testEntity.put( "one","value"+i ); + testEntity.put( "two","valuetwo"+i ); + this.app().collection( "testCollection" ).post( testEntity ); + } + + + //Creating schema. + //this could be changed to a hashmap. + ArrayList<String> indexingArray = new ArrayList<>( ); + indexingArray.add( "one" ); + + + //field "fields" is required. + Entity payload = new Entity(); + payload.put( "fields", indexingArray); + + //Post index to the collection metadata + Entity thing = this.app().collection( "testCollection" ).collection( "_indexes" ).post( payload ); refreshIndex(); + //TODO: write a test to verify the data below. + + Collection collection = this.app().collection( "testCollection" ).collection( "_index" ).get(); + + LinkedHashMap testCollectionSchema = (LinkedHashMap)collection.getResponse().getData(); + //TODO: the below will have to be replaced by the values that I deem correct. + assertEquals( ( thing ).get( "lastUpdated" ), testCollectionSchema.get( "lastUpdated" )); + assertEquals( ( thing ).get( "lastUpdateBy" ),testCollectionSchema.get( "lastUpdateBy" ) ); + assertEquals( ( thing ).get( "lastReindexed" ),testCollectionSchema.get( "lastReindexed" ) ); + + //TODO: this test doesn't check to see if create checks the schema. Only that the reindex removes whats already there. + ArrayList<String> schema = ( ArrayList<String> ) testCollectionSchema.get( "fields" ); + assertEquals( "one",schema.get( 0 ) ); + + + //Reindex and verify that the entity only has field one index. + this.app().collection( "testCollection" ).collection( "_reindex" ).post(); + + collection = this.app().collection( "testCollection" ).collection( "_index" ).get(); + + testCollectionSchema = (LinkedHashMap)collection.getResponse().getData(); + assertEquals( ( thing ).get( "lastUpdated" ), testCollectionSchema.get( "lastUpdated" )); + assertEquals( ( thing ).get( "lastUpdateBy" ),testCollectionSchema.get( "lastUpdateBy" ) ); + assertNotEquals( ( thing ).get( "lastReindexed" ),testCollectionSchema.get( "lastReindexed" ) ); + + schema = ( ArrayList<String> ) testCollectionSchema.get( "fields" ); + assertEquals( "one",schema.get( 0 ) ); + + for(int i = 0; i < 10; i++) { String query = "one ='value"+ i + "'"; QueryParameters queryParameters = new QueryParameters().setQuery( query );
