WIP, overwrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cee76da7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cee76da7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cee76da7 Branch: refs/heads/USERGRID-528 Commit: cee76da72c3343d562867988f76f264204d54b59 Parents: 61aa979 Author: Todd Nine <[email protected]> Authored: Tue Mar 24 14:15:49 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Tue Mar 24 14:15:49 2015 -0600 ---------------------------------------------------------------------- .../io/read/EntityIndexCommand.java | 404 ++++++++++++------- .../corepersistence/rx/impl/CollectUntil.java | 2 +- 2 files changed, 270 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cee76da7/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java index a8fa221..464dd45 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java @@ -22,8 +22,9 @@ package org.apache.usergrid.corepersistence.io.read; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.collection.CollectionScope; @@ -32,8 +33,8 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory import org.apache.usergrid.persistence.collection.EntitySet; import org.apache.usergrid.persistence.collection.MvccEntity; import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.CandidateResult; @@ -52,32 +53,34 @@ import rx.functions.Func0; import rx.functions.Func1; +/** + * Performs a search of the given type along the specified edgeName. It then loads and validates results, + * and returns an Observable of SearchResults + */ @Singleton public class EntityIndexCommand implements Command<Id, EntityIndexCommand.SearchResults> { private final Id applicationId; - private final ApplicationScope applicationScope; private final ApplicationEntityIndex index; private final SearchTypes types; private final String query; private final int resultSetSize; - private final String scopeType; + private final String edgeName; private final EntityCollectionManagerFactory entityCollectionManagerFactory; @Inject - public EntityIndexCommand( final Id applicationId, final ApplicationScope applicationScope, - final ApplicationEntityIndex index, final SearchTypes types, final String query, - final int resultSetSize, final String scopeType, + public EntityIndexCommand( final Id applicationId, final ApplicationEntityIndex index, final SearchTypes types, + final String query, final int resultSetSize, final String edgeName, final EntityCollectionManagerFactory entityCollectionManagerFactory ) { this.applicationId = applicationId; - this.applicationScope = applicationScope; + this.index = index; this.types = types; this.query = query; this.resultSetSize = resultSetSize; - this.scopeType = scopeType; + this.edgeName = edgeName; this.entityCollectionManagerFactory = entityCollectionManagerFactory; } @@ -85,150 +88,216 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search @Override public Observable<EntityIndexCommand.SearchResults> call( final Observable<Id> idObservable ) { - //create our observable of candidate search results - final Observable<CandidateResults> candidateResults = idObservable - .flatMap( id -> Observable.create( new ElasticSearchObservable( initialSearch( id ), nextPage( id ) ) ) ); - - final Observable<CandidateResult> candidateObservable = - candidateResults.flatMap( candidates -> Observable.from( candidates ) ); - - //since we'll only have at most 100 results in memory at a time, we roll up our groups and emit them on to - // the collector - final Observable<CandidateGroup> candidateGroup = - candidateObservable.groupBy( candidate -> candidate.getId() ).map( observableGroup -> { - - - //for each group, create a list, then sort by highest version first - final List<CandidateResult> groupList = observableGroup.toList().toBlocking().last(); - - Collections.sort( groupList, CandidateVersionComparator::compare ); - - //create our candidate group and emit it - final CandidateGroup group = - new CandidateGroup( groupList.get( 0 ), groupList.subList( 1, groupList.size() ) ); - - return group; - } ); - + //load all the potential candidates + final Observable<CandidateCollector> collectedCandidates = idObservable.compose( createCandidates() ); - //buffer our candidate group up to our resultset size. - final Observable<CandidateCollector> collectedCandidates = - candidateGroup.buffer( resultSetSize ).flatMap( candidates -> { + //now we have our collected candidates, load them from ES + final Observable<SearchResults> loadedEntities = collectedCandidates.map( loadEntities( resultSetSize ) ); - final Observable<CandidateCollector> collector = Observable.from( candidates ).collect( - () -> new CandidateCollector( resultSetSize ), ( candidateCollector, candidate ) -> { - //add our candidates to our collector - candidateCollector.addCandidate( candidate.toKeep ); - //add our empty results - candidateCollector.addEmptyResults( candidate.toRemove ); - } ); - - return collector; - } ); - //now we have our collected candidates, load them + //after we've loaded, remove everything that's left + loadedEntities.doOnNext( results ->{ + //run the deindex + Observable.from( results.toRemove ).collect( () -> index.createBatch(), (batch, toRemove) -> { + batch.deindex( toRemove.indexScope, toRemove.result ); + }).doOnNext( batch -> batch.execute() ).toBlocking().lastOrDefault( null ); - final Observable<SearchResults> loadedEntities = collectedCandidates.map( loadEntities(resultSetSize) ); + }); return loadedEntities; } - /** - * Perform the initial search with the sourceId - */ - private Func0<CandidateResults> initialSearch( final Id sourceId ) { - return () -> index.search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ) ); - } /** - * Search the next page for the specified source + * Using an input of observable ids, transform and collect them into a set of CandidateCollector to be loaded, or + * verified by the EM */ - private Func1<String, CandidateResults> nextPage( final Id sourceId ) { - return cursor -> index - .search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ).withCursor( cursor ) ); - } + private Observable.Transformer<Id, CandidateCollector> createCandidates() { + return idObservable -> { + //create our observable of candidate search results + final Observable<ScopedCandidateResults> candidateResults = idObservable.flatMap( - /** - * Function that will load our entities from our candidates, filter stale or missing candidates and return results - * @param expectedSize - * @return - */ - private Func1<CandidateCollector, SearchResults> loadEntities(final int expectedSize) { - return candidateCollector -> { - //from our candidates, group them by id type so we can create scopes - Observable.from( candidateCollector.getCandidates() ).groupBy( candidate -> candidate.getId().getType() ) - .flatMap( groups -> { + id -> { + //create the index scope from the id + final IndexScope scope = createScope( id ); - final List<CandidateResult> candidates = groups.toList().toBlocking().last(); + //perform the ES search of candidate results + final Observable<ScopedCandidateResults> scoped = Observable.create( + new ElasticSearchObservable( initialSearch( scope ), nextPage( scope ) ) ); - //we can get no results, so quit aggregating if there are none - if ( candidates.size() == 0 ) { - return Observable.just( new SearchResults( 0 ) ); - } + final Observable<CandidateCollector> collectedResults = scoped.flatMap( scopedCandidateResults -> { - final String typeName = candidates.get( 0 ).getId().getType(); + //here b/c the compiler cannot infer type directly + final Observable<CandidateResult> candidates = + Observable.from( scopedCandidateResults.candidateResults ); - final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( typeName ); + return candidates.collect( () -> new CandidateCollector( resultSetSize, + scopedCandidateResults.candidateResults.getCursor() ), ( collector, candidate ) -> { + collector.insert( candidate ); + } ); + } ); - //create our scope to get our entity manager - final CollectionScope scope = - new CollectionScopeImpl( applicationId, applicationId, collectionType ); + //group our candidates by type +// collectedResults.flatMap( collected -> Observable.from( collected.candidates ) ).groupBy( candidate -> candidate.getId().getType() ).co - final EntityCollectionManager ecm = - entityCollectionManagerFactory.createCollectionManager( scope ); + final Observable<SearchResults> resultsObservable = collectedResults.map( collectedCandidates -> { - //get our entity ids - final List<Id> entityIds = - Observable.from( candidates ).map( c -> c.getId() ).toList().toBlocking().last(); - //TODO, change this out + //we can get no results, so quit aggregating if there are none + if ( collectedCandidates.candidates.size() == 0 ) { + return new SearchResults( 0, collectedCandidates.cursor ); + } - //an observable of our entity set + final String collectionType = + CpNamingUtils.getCollectionScopeNameFromEntityType( id.getType() ); - //now go through all our candidates and verify + //create our scope to get our entity manager - return Observable.from( candidates ).collect( () -> new SearchResults( expectedSize ), (searchResults, candidate) ->{ + final CollectionScope collectionScope = + new CollectionScopeImpl( applicationId, applicationId, collectionType ); - final EntitySet entitySet = ecm.load( entityIds ).toBlocking().last(); + final EntityCollectionManager ecm = + entityCollectionManagerFactory.createCollectionManager( collectionScope ); - final MvccEntity entity = entitySet.getEntity( candidate.getId() ); + //get our entity ids + final List<Id> entityIds = + Observable.from( collectedCandidates.candidates ).map( c -> c.getId() ).toList() + .toBlocking().last(); - //our candidate it stale, or target entity was deleted add it to the remove of our collector - if(UUIDComparator.staticCompare( entity.getVersion(), candidate.getVersion()) > 0 || !entity.getEntity().isPresent()){ - searchResults.addToRemove( candidate ); - return; - } + //TODO, change this out + //an observable of our entity set - searchResults.addEntity( entity.getEntity().get() ); + return ecm.load( entityIds ).map( results -> { + Observable.from( entityIds ).collect( () -> new SearchResults( resultSetSize ), + ( searchResults, entityIds ) -> { + } ); + } ); + } ); - } ) - //add the existing set to remove to this set - .doOnNext( results -> results.addToRemove( candidateCollector.getToRemove() ) ); + return resultsObservable; + } ); - } ); + return candidateResults; + }; + } - return null; - }; + /** + * Perform the initial search with the sourceId + */ + private Func0<ScopedCandidateResults> initialSearch( final IndexScope scope ) { + return () -> new ScopedCandidateResults(scope, index.search( scope, types, Query.fromQL( query ) ) ); + } + + + /** + * Search the next page for the specified source + */ + private Func1<String, ScopedCandidateResults> nextPage( final IndexScope scope ) { + return cursor -> new ScopedCandidateResults(scope, index.search( scope, types, Query.fromQL( query ).withCursor( + cursor ) ) ); } + /** + * Create the scope and return it + * @param sourceId + * @return + */ + private IndexScope createScope(final Id sourceId){ + return new IndexScopeImpl( sourceId, edgeName ); + } + +// +// /** +// * Function that will load our entities from our candidates, filter stale or missing candidates and return results +// */ +// private Func1<CandidateCollector, SearchResults> loadEntities( final int expectedSize ) { +// return candidateCollector -> { +// +// //from our candidates, group them by id type so we can create scopes +// Observable.from( candidateCollector.getCandidates() ).groupBy( candidate -> candidate.result.getId().getType() ) +// .flatMap( groups -> { +// +// +// final List<ScopedCandidateResult> candidates = groups.toList().toBlocking().last(); +// +// //we can get no results, so quit aggregating if there are none +// if ( candidates.size() == 0 ) { +// return Observable.just( new SearchResults( 0, null ) ); +// } +// +// +// final String typeName = candidates.get( 0 ).result.getId().getType(); +// +// final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( typeName ); +// +// +// //create our scope to get our entity manager +// +// final CollectionScope scope = +// new CollectionScopeImpl( applicationId, applicationId, collectionType ); +// +// final EntityCollectionManager ecm = +// entityCollectionManagerFactory.createCollectionManager( scope ); +// +// +// //get our entity ids +// final List<Id> entityIds = +// Observable.from( candidates ).map( c -> c.result.getId() ).toList().toBlocking().last(); +// +// //TODO, change this out +// +// //an observable of our entity set +// +// +// //now go through all our candidates and verify +// +// return Observable.from( candidates ).collect( () -> new SearchResults( +// expectedSize ), +// ( searchResults, candidate ) -> { +// +// final EntitySet entitySet = ecm.load( entityIds ).toBlocking().last(); +// +// final MvccEntity entity = entitySet.getEntity( candidate.result.getId() ); +// +// +// //our candidate it stale, or target entity was deleted add it to the remove of our +// // collector +// if ( UUIDComparator.staticCompare( entity.getVersion(), candidate.result.getVersion() ) > 0 +// || !entity.getEntity().isPresent() ) { +// +// searchResults.addToRemove( candidate ); +// return; +// } +// +// +// searchResults.addEntity( entity.getEntity().get() ); +// } ) +// //add the existing set to remove to this set +// .doOnNext( results -> results.addToRemove( candidateCollector.getToRemove() ) ); +// } ); +// +// +// return null; +// }; +// } /** @@ -238,20 +307,49 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search private final List<CandidateResult> candidates; private final List<CandidateResult> toRemove; + private final Map<Id, Integer> indexMapping; - public CandidateCollector( final int maxSize ) { + private final String cursor; + + + public CandidateCollector( final int maxSize, final String cursor ) { + this.cursor = cursor; candidates = new ArrayList<>( maxSize ); toRemove = new ArrayList<>( maxSize ); + indexMapping = new HashMap<>(maxSize); } - public void addCandidate( final CandidateResult candidate ) { - this.candidates.add( candidate ); - } + public void insert(final CandidateResult newValue){ + + final Id candidateId = newValue.getId(); + + final Integer index = indexMapping.get( candidateId ); + + if(index == null){ + candidates.add( newValue ); + indexMapping.put( candidateId, candidates.size()-1); + return; + } + + //present, perform a comparison + + final CandidateResult existing = candidates.get( index ); + + + //it's a greater version, add this to ignore + if(UUIDComparator.staticCompare( existing.getVersion(), newValue.getVersion() ) > 0){ + toRemove.add( newValue ); + } + + //remove the stale version from the list and put it in deindex + else{ + candidates.remove( index ); + candidates.add( newValue ); + toRemove.add( existing ); + } - public void addEmptyResults( final Collection<CandidateResult> stale ) { - this.toRemove.addAll( stale ); } @@ -268,13 +366,16 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search public static class SearchResults { private final List<Entity> entities; - private final List<CandidateResult> toRemove; + private final List<ScopedCandidateResult> toRemove; + private final int maxSize; - private String cursor; + private final String cursor; - public SearchResults( final int maxSize ) { - entities = new ArrayList<>( maxSize ); + public SearchResults( final int maxSize, final String cursor ) { + this.maxSize = maxSize; + this.cursor = cursor; + this.entities = new ArrayList<>( maxSize ); this.toRemove = new ArrayList<>( maxSize ); } @@ -284,20 +385,23 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search } - public void addToRemove( final Collection<CandidateResult> stale ) { + public void addToRemove( final Collection<ScopedCandidateResult> stale ) { this.toRemove.addAll( stale ); } - public void addToRemove( final CandidateResult candidateResult ) { - this.toRemove.add( candidateResult ); - } + public void addToRemove( final ScopedCandidateResult candidateResult ) { + this.toRemove.add( candidateResult ); + } + public String getCursor() { + return cursor; + } - public void setCursor( final String cursor ) { - this.cursor = cursor; + public boolean isFull(){ + return entities.size() >= maxSize; } } @@ -305,26 +409,26 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search /** * An observable that will perform a search and continually emit results while they exist. */ - public static class ElasticSearchObservable implements Observable.OnSubscribe<CandidateResults> { + public static class ElasticSearchObservable implements Observable.OnSubscribe<ScopedCandidateResults> { - private final Func1<String, CandidateResults> fetchNextPage; - private final Func0<CandidateResults> fetchInitialResults; + private final Func1<String, ScopedCandidateResults> fetchNextPage; + private final Func0<ScopedCandidateResults> fetchInitialResults; - public ElasticSearchObservable( final Func0<CandidateResults> fetchInitialResults, - final Func1<String, CandidateResults> fetchNextPage ) { + public ElasticSearchObservable( final Func0<ScopedCandidateResults> fetchInitialResults, + final Func1<String, ScopedCandidateResults> fetchNextPage ) { this.fetchInitialResults = fetchInitialResults; this.fetchNextPage = fetchNextPage; } @Override - public void call( final Subscriber<? super CandidateResults> subscriber ) { + public void call( final Subscriber<? super ScopedCandidateResults> subscriber ) { subscriber.onStart(); try { - CandidateResults results = fetchInitialResults.call(); + ScopedCandidateResults results = fetchInitialResults.call(); //emit our next page @@ -332,13 +436,13 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search subscriber.onNext( results ); //if we have no cursor, we're done - if ( !results.hasCursor() ) { + if ( !results.candidateResults.hasCursor() ) { break; } //we have a cursor, get our results to emit for the next page - results = fetchNextPage.call( results.getCursor() ); + results = fetchNextPage.call( results.candidateResults.getCursor() ); } subscriber.onCompleted(); @@ -351,16 +455,51 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search /** - * A message that contains the candidate to keep, and the candidate toRemove + * A candidate result, aslong with the scope it was returned in + */ + public static class ScopedCandidateResult{ + private final IndexScope indexScope; + private final CandidateResult result; + + + public ScopedCandidateResult( final IndexScope indexScope, final CandidateResult result ) { + this.indexScope = indexScope; + this.result = result; + } + } + + + /** + * Object that represents our candidate results, along with the scope it was searched in + */ + public static class ScopedCandidateResults { + private final IndexScope indexScope; + private CandidateResults candidateResults; + + + public ScopedCandidateResults( final IndexScope indexScope, final CandidateResults candidateResults ) { + this.indexScope = indexScope; + this.candidateResults = candidateResults; + } + } + + + /** + * A message that contains the candidate to keep, and the candidate toRemove and the scope they were searched in */ public static class CandidateGroup { + private final IndexScope indexScope; private final CandidateResult toKeep; private final Collection<CandidateResult> toRemove; + private final String cursor; - public CandidateGroup( final CandidateResult toKeep, final Collection<CandidateResult> toRemove ) { + public CandidateGroup( final IndexScope indexScope, final CandidateResult toKeep, + final Collection<CandidateResult> toRemove, final String cursor ) { + this.indexScope = indexScope; this.toKeep = toKeep; this.toRemove = toRemove; + this.cursor = cursor; } } @@ -375,10 +514,5 @@ public class EntityIndexCommand implements Command<Id, EntityIndexCommand.Search } } - /*********************** - * FROM HERE DOWN IS EXPERIMENTAL - ************************* - */ - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cee76da7/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java index 8a0c014..93de094 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java @@ -38,7 +38,7 @@ public class CollectUntil<T, R> implements Observable.Transformer<T, R> { final Func1<R, Boolean> shortCircuitWhen; - public CollectUntil( final Func1<R, Boolean> shortCircuitWhen, final Func0<R> stateFactory, final Action2<R, ? super T> collector) { + public CollectUntil( final Func0<R> stateFactory,final Func1<R, Boolean> shortCircuitWhen, final Action2<R, ? super T> collector) { this.stateFactory = stateFactory; this.collector = collector; this.shortCircuitWhen = shortCircuitWhen;
