Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-494 c4f654847 -> 61aa97945
Refactor to clean up. Still a WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/61aa9794 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/61aa9794 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/61aa9794 Branch: refs/heads/USERGRID-494 Commit: 61aa97945313533f6ef676f3f9dab5a22b9cb63b Parents: c4f6548 Author: Todd Nine <[email protected]> Authored: Mon Mar 23 09:52:44 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon Mar 23 17:23:46 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/io/read/Command.java | 8 +- .../corepersistence/io/read/CommandBuilder.java | 8 +- .../io/read/EntityIndexCommand.java | 384 +++++++++++++++++++ .../io/read/EntityIndexCommands.java | 119 ------ .../corepersistence/rx/impl/CollectUntil.java | 11 +- .../java/org/apache/usergrid/TempExample.java | 217 ++++++++++- .../rx/impl/CollectUntilTest.java | 6 +- 7 files changed, 605 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java index 4e3b0fc..34b8d1e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/Command.java @@ -25,14 +25,8 @@ import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; -public interface Command<T> { +public interface Command<T, R> extends Observable.Transformer<T, R> { - /** - * Process our input stream - * @param input - * @return - */ - public Observable<T> process( Observable<T> input ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java index 828cbc9..bbf74b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/CommandBuilder.java @@ -38,7 +38,7 @@ public class CommandBuilder { private CursorCache cache; - private final Observable<Id> pathObservable; + private Observable<Id> pathObservable; public CommandBuilder( final Id root ) { @@ -46,6 +46,12 @@ public class CommandBuilder { } + + public void addCommand(final Command<Id, Id> intermediateCommand){ + pathObservable = pathObservable.compose( intermediateCommand ); + } + + /** * Set our cache * @param cache http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java new file mode 100644 index 0000000..a8fa221 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.io.read; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.collection.CollectionScope; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; +import org.apache.usergrid.persistence.index.query.CandidateResult; +import org.apache.usergrid.persistence.index.query.CandidateResults; +import org.apache.usergrid.persistence.index.query.Query; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func0; +import rx.functions.Func1; + + +@Singleton +public class EntityIndexCommand implements Command<Id, EntityIndexCommand.SearchResults> { + + private final Id applicationId; + private final ApplicationScope applicationScope; + private final ApplicationEntityIndex index; + private final SearchTypes types; + private final String query; + private final int resultSetSize; + private final String scopeType; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public EntityIndexCommand( final Id applicationId, final ApplicationScope applicationScope, + final ApplicationEntityIndex index, final SearchTypes types, final String query, + final int resultSetSize, final String scopeType, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + this.applicationId = applicationId; + this.applicationScope = applicationScope; + + this.index = index; + this.types = types; + this.query = query; + this.resultSetSize = resultSetSize; + this.scopeType = scopeType; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public Observable<EntityIndexCommand.SearchResults> call( final Observable<Id> idObservable ) { + + //create our observable of candidate search results + final Observable<CandidateResults> candidateResults = idObservable + .flatMap( id -> Observable.create( new ElasticSearchObservable( initialSearch( id ), nextPage( id ) ) ) ); + + final Observable<CandidateResult> candidateObservable = + candidateResults.flatMap( candidates -> Observable.from( candidates ) ); + + //since we'll only have at most 100 results in memory at a time, we roll up our groups and emit them on to + // the collector + final Observable<CandidateGroup> candidateGroup = + candidateObservable.groupBy( candidate -> candidate.getId() ).map( observableGroup -> { + + + //for each group, create a list, then sort by highest version first + final List<CandidateResult> groupList = observableGroup.toList().toBlocking().last(); + + Collections.sort( groupList, CandidateVersionComparator::compare ); + + //create our candidate group and emit it + final CandidateGroup group = + new CandidateGroup( groupList.get( 0 ), groupList.subList( 1, groupList.size() ) ); + + return group; + } ); + + + //buffer our candidate group up to our resultset size. + final Observable<CandidateCollector> collectedCandidates = + candidateGroup.buffer( resultSetSize ).flatMap( candidates -> { + + final Observable<CandidateCollector> collector = Observable.from( candidates ).collect( + () -> new CandidateCollector( resultSetSize ), ( candidateCollector, candidate ) -> { + //add our candidates to our collector + candidateCollector.addCandidate( candidate.toKeep ); + //add our empty results + candidateCollector.addEmptyResults( candidate.toRemove ); + } ); + + return collector; + } ); + + //now we have our collected candidates, load them + + + final Observable<SearchResults> loadedEntities = collectedCandidates.map( loadEntities(resultSetSize) ); + + + return loadedEntities; + } + + + /** + * Perform the initial search with the sourceId + */ + private Func0<CandidateResults> initialSearch( final Id sourceId ) { + return () -> index.search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ) ); + } + + + /** + * Search the next page for the specified source + */ + private Func1<String, CandidateResults> nextPage( final Id sourceId ) { + return cursor -> index + .search( new IndexScopeImpl( sourceId, scopeType ), types, Query.fromQL( query ).withCursor( cursor ) ); + } + + + /** + * Function that will load our entities from our candidates, filter stale or missing candidates and return results + * @param expectedSize + * @return + */ + private Func1<CandidateCollector, SearchResults> loadEntities(final int expectedSize) { + return candidateCollector -> { + + //from our candidates, group them by id type so we can create scopes + Observable.from( candidateCollector.getCandidates() ).groupBy( candidate -> candidate.getId().getType() ) + .flatMap( groups -> { + + + final List<CandidateResult> candidates = groups.toList().toBlocking().last(); + + //we can get no results, so quit aggregating if there are none + if ( candidates.size() == 0 ) { + return Observable.just( new SearchResults( 0 ) ); + } + + + final String typeName = candidates.get( 0 ).getId().getType(); + + final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( typeName ); + + + //create our scope to get our entity manager + + final CollectionScope scope = + new CollectionScopeImpl( applicationId, applicationId, collectionType ); + + final EntityCollectionManager ecm = + entityCollectionManagerFactory.createCollectionManager( scope ); + + + //get our entity ids + final List<Id> entityIds = + Observable.from( candidates ).map( c -> c.getId() ).toList().toBlocking().last(); + + //TODO, change this out + + //an observable of our entity set + + + + //now go through all our candidates and verify + + return Observable.from( candidates ).collect( () -> new SearchResults( expectedSize ), (searchResults, candidate) ->{ + + final EntitySet entitySet = ecm.load( entityIds ).toBlocking().last(); + + final MvccEntity entity = entitySet.getEntity( candidate.getId() ); + + + //our candidate it stale, or target entity was deleted add it to the remove of our collector + if(UUIDComparator.staticCompare( entity.getVersion(), candidate.getVersion()) > 0 || !entity.getEntity().isPresent()){ + searchResults.addToRemove( candidate ); + return; + } + + + searchResults.addEntity( entity.getEntity().get() ); + + + } ) + //add the existing set to remove to this set + .doOnNext( results -> results.addToRemove( candidateCollector.getToRemove() ) ); + + } ); + + + return null; + }; + } + + + + + /** + * Collects all valid results (in order) as well as candidates to be removed + */ + public static class CandidateCollector { + private final List<CandidateResult> candidates; + private final List<CandidateResult> toRemove; + + + public CandidateCollector( final int maxSize ) { + candidates = new ArrayList<>( maxSize ); + toRemove = new ArrayList<>( maxSize ); + } + + + public void addCandidate( final CandidateResult candidate ) { + this.candidates.add( candidate ); + } + + + public void addEmptyResults( final Collection<CandidateResult> stale ) { + this.toRemove.addAll( stale ); + } + + + public List<CandidateResult> getCandidates() { + return candidates; + } + + + public List<CandidateResult> getToRemove() { + return toRemove; + } + } + + + public static class SearchResults { + private final List<Entity> entities; + private final List<CandidateResult> toRemove; + + private String cursor; + + + public SearchResults( final int maxSize ) { + entities = new ArrayList<>( maxSize ); + this.toRemove = new ArrayList<>( maxSize ); + } + + + public void addEntity( final Entity entity ) { + this.entities.add( entity ); + } + + + public void addToRemove( final Collection<CandidateResult> stale ) { + this.toRemove.addAll( stale ); + } + + + public void addToRemove( final CandidateResult candidateResult ) { + this.toRemove.add( candidateResult ); + } + + + + + public void setCursor( final String cursor ) { + this.cursor = cursor; + } + } + + + /** + * An observable that will perform a search and continually emit results while they exist. + */ + public static class ElasticSearchObservable implements Observable.OnSubscribe<CandidateResults> { + + private final Func1<String, CandidateResults> fetchNextPage; + private final Func0<CandidateResults> fetchInitialResults; + + + public ElasticSearchObservable( final Func0<CandidateResults> fetchInitialResults, + final Func1<String, CandidateResults> fetchNextPage ) { + this.fetchInitialResults = fetchInitialResults; + this.fetchNextPage = fetchNextPage; + } + + + @Override + public void call( final Subscriber<? super CandidateResults> subscriber ) { + + subscriber.onStart(); + + try { + CandidateResults results = fetchInitialResults.call(); + + + //emit our next page + while ( true ) { + subscriber.onNext( results ); + + //if we have no cursor, we're done + if ( !results.hasCursor() ) { + break; + } + + + //we have a cursor, get our results to emit for the next page + results = fetchNextPage.call( results.getCursor() ); + } + + subscriber.onCompleted(); + } + catch ( Throwable t ) { + subscriber.onError( t ); + } + } + } + + + /** + * A message that contains the candidate to keep, and the candidate toRemove + */ + public static class CandidateGroup { + private final CandidateResult toKeep; + private final Collection<CandidateResult> toRemove; + + + public CandidateGroup( final CandidateResult toKeep, final Collection<CandidateResult> toRemove ) { + this.toKeep = toKeep; + this.toRemove = toRemove; + } + } + + + /** + * Compares 2 candidates by version. The max version is considered greater + */ + private static final class CandidateVersionComparator { + + public static int compare( final CandidateResult o1, final CandidateResult o2 ) { + return UUIDComparator.staticCompare( o1.getVersion(), o2.getVersion() ); + } + } + + /*********************** + * FROM HERE DOWN IS EXPERIMENTAL + ************************* + */ + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java deleted file mode 100644 index d6f4e93..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommands.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.io.read; - - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.IndexScope; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; -import org.apache.usergrid.persistence.index.query.CandidateResult; -import org.apache.usergrid.persistence.index.query.CandidateResults; -import org.apache.usergrid.persistence.index.query.Query; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import rx.Observable; -import rx.functions.Action2; -import rx.functions.Func1; - - -public class EntityIndexCommands { - - -// /** -// * Perform a search of all the entities, and then return the observable of search results -// * @param index -// * @param edgeType -// * @param types -// * @param query -// * @return -// */ -// public static Func1<Id, SearchResults> searchEntities(final ApplicationEntityIndex index,final String edgeType, final SearchTypes types, final String query ){ -// -// return nodeId -> { -// -// } -// } - /** - * Construct an indexScope from the input id type - * @param type - * @return - */ - public static Func1<Id, IndexScope> createSearchScope(final String type){ - return id -> new IndexScopeImpl( id, type ); - } - /** - * Get our candidate results - * @param index - * @param types The types to return - * @param query - * @return - */ - public static Func1<IndexScope, CandidateResults> getCandidates(final ApplicationEntityIndex index, final SearchTypes types, final String query){ - return indexScope -> index.search( indexScope, types, Query.fromQLNullSafe( query ) ); - } - - - /** - * Flattens candidate results into a single stream of a result - * @return - */ - public static Func1<CandidateResults, Observable<CandidateResult>> flattenCandidates(){ - return (CandidateResults candidateResults) -> Observable.from( candidateResults ); - } - - - public static Action2<SearchResults, EntitySet> collectSet(){ - return (searchResults, entitySet) -> { - searchResults.addEntities( entitySet.entities ); - }; - } - - - - public static class SearchResults{ - private final List<Entity> entities; - private String cursor; - - - public SearchResults(final int maxSize) {entities = new ArrayList<>(maxSize);} - - public void addEntities(final Collection<Entity> entities){ - this.entities.addAll( entities ); - - } - public void setCursor(final String cursor){ - this.cursor = cursor; - } - } - - public static class EntitySet{ - private final List<Entity> entities; - - - public EntitySet( final List<Entity> entities ) {this.entities = entities;} - } - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java index a2cb754..8a0c014 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java @@ -29,20 +29,19 @@ import rx.internal.operators.OperatorScan; /** - * An operation for performing a collect until the predicate returns true + * An operation for performing a collect until the shortCircuitWhen returns true */ public class CollectUntil<T, R> implements Observable.Transformer<T, R> { final Func0<R> stateFactory; final Action2<R, ? super T> collector; - final Func1<R, Boolean> predicate; + final Func1<R, Boolean> shortCircuitWhen; - public CollectUntil( final Func0<R> stateFactory, final Action2<R, ? super T> collector, - final Func1<R, Boolean> predicate ) { + public CollectUntil( final Func1<R, Boolean> shortCircuitWhen, final Func0<R> stateFactory, final Action2<R, ? super T> collector) { this.stateFactory = stateFactory; this.collector = collector; - this.predicate = predicate; + this.shortCircuitWhen = shortCircuitWhen; } @@ -54,7 +53,7 @@ public class CollectUntil<T, R> implements Observable.Transformer<T, R> { }; - return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( predicate ); + return tObservable.lift( new OperatorScan<>( stateFactory, accumulator ) ).takeUntil( shortCircuitWhen ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/test/java/org/apache/usergrid/TempExample.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/TempExample.java b/stack/core/src/test/java/org/apache/usergrid/TempExample.java index 1db450d..ed58111 100644 --- a/stack/core/src/test/java/org/apache/usergrid/TempExample.java +++ b/stack/core/src/test/java/org/apache/usergrid/TempExample.java @@ -20,16 +20,37 @@ package org.apache.usergrid; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.Test; + +import org.apache.usergrid.corepersistence.rx.impl.CollectUntil; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.index.query.CandidateResult; import org.apache.usergrid.persistence.index.query.CandidateResults; import org.apache.usergrid.persistence.model.entity.Id; +import com.fasterxml.uuid.UUIDComparator; + import rx.Observable; +import rx.Statement; +import rx.Subscriber; +import rx.functions.Action2; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.observables.GroupedObservable; import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.createSearchScope; import static org.apache.usergrid.corepersistence.io.read.EntityIndexCommands.getCandidates; import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; +import static org.junit.Assert.assertEquals; public class TempExample { @@ -37,38 +58,210 @@ public class TempExample { //set our root observable - public static void main(String[] args) { - final Id rootId = createId( "thing" ); +// +// final Id rootId = createId( "thing" ); +// +// final ApplicationEntityIndex index = null; +// +// +// final SearchTypes searchType = SearchTypes.fromTypes( "test" ); +// +// final String query = "select * "; +// +// final int selectSize = 100; +// +// final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) ) +// .map( getCandidates( index, searchType, +// query ) ); //.compose(); +// +// +// final Observable<GroupedObservable<Id, CandidateResult>> next = +// observable.flatMap( candidates -> Observable.from( candidates ) ).groupBy( result -> result.getId() ); +// +// +// final Observable<ResultsCollector> results = next.compose( new CollectUntil<>( +// //stop when the collector has a full set +// collector -> collector.hasFullSet(), +// //create a new results collector, based on requested size +// () -> new ResultsCollector( selectSize ), +// //collect each candidate into our set of Candidate Results +// ( collector, group ) -> { +// +// final List<CandidateResult> list = group.toList().toBlocking().last(); +// +// collector.addCandidates( list ); +// } ) ); +// + + //now reduce these results again via a cassandra collector + + + + @Test + public void testLoops(){ + + final Observable<Integer> observable = Observable.create( new Observable.OnSubscribe<Integer>() { + + int value = 0; + + + @Override + public void call( final Subscriber<? super Integer> subscriber ) { + subscriber.onNext( ++value ); + } + } ); + + + + + final CounterCollector collector = new CounterCollector(); + + final Action2<CounterCollector, Integer> collectorFunction = (col, value) -> {col.set( value ); }; + + - final ApplicationEntityIndex index = null; + final Func0<Boolean> complete = () -> collector.isFull(); - final SearchTypes searchType = SearchTypes.fromTypes( "test" ); + final Observable<CounterCollector> collectorObservable = Statement.doWhile( observable, complete ).collect( () -> collector, collectorFunction ); - final String query = "select * "; - final Observable<CandidateResults> observable = Observable.just( rootId ).map( createSearchScope( "type" ) ).map(getCandidates(index, searchType, query)); + final int value = collectorObservable.toBlocking().last().lastValue; - observable.doOnNext( a -> System.out.println( a) ).toBlocking().last(); + assertEquals(100, value); + + + + + + } + + + private static class CounterCollector{ + private int lastValue; + + public void set(final int newValue){ + lastValue = newValue; + } + + public boolean isFull(){ + return lastValue >= 100; + } } - private static final class ResultsCollector{ + private static final class ResultsCollector { + + private Map<Id, IndexResult<CandidateResult>> toKeep = new HashMap<>(); + private Set<CandidateResult> toRemove = new HashSet<>(); + + private final int resultSetSize; + + + private int currentIndex; + + + private ResultsCollector( final int resultSetSize ) {this.resultSetSize = resultSetSize;} + /** * Add the candidates to our collection - * @param results */ - public void addCandidates(final CandidateResults results ){ + public void addCandidates( final List<CandidateResult> results ) { + + Collections.sort( results, CandidateVersionComparator::compare ); + + final CandidateResult maxCandidate = results.get( 0 ); + + //add everything we don't use to our toRemoveSet + for ( int i = 1; i < results.size(); i++ ) { + toRemove.add( results.get( i ) ); + } + + //we have this id already, remove it and pick the max + final Id maxId = maxCandidate.getId(); + + //see if it exists in our set to keep + final IndexResult<CandidateResult> existingCandidate = toKeep.get( maxId ); + + if ( existingCandidate != null ) { + + final CandidateResult existing = existingCandidate.value; + + //our new value is greater than our existing, replace it + if ( CandidateVersionComparator.compare( maxCandidate, existing ) > 0 ) { + //add it to the keep + toKeep.put( maxId, new IndexResult( currentIndex, maxCandidate ) ); - //TODO, collect the results, removing groups - Observable.from( results ).groupBy( candidate -> candidate.getId() ).collect( ) + //remove the old value + toRemove.add( existingCandidate.value ); + } + + //what's in the map is already the max, add our candidate to the cleanup list + else { + toRemove.add( maxCandidate ); + } + } + + //add it to our list of items to keep + else { + toKeep.put( maxId, new IndexResult( currentIndex, maxCandidate ) ); + } + + + //increment our index for the next invocation + currentIndex++; + } + + + /** + * We have a full set to evaluate for candidates + */ + public boolean hasFullSet() { + return resultSetSize >= toKeep.size(); } + + /** + * Get all the canidates we've collected + */ + public Collection<IndexResult<CandidateResult>> getCandidates() { + return toKeep.values(); + } + + public Collection<CandidateResult> getStaleCandidates(){ return toRemove;} } + + /** + * Compares 2 candidates by version. The max version is considered greater + */ + private static final class CandidateVersionComparator { + + public static int compare( final CandidateResult o1, final CandidateResult o2 ) { + return UUIDComparator.staticCompare( o1.getVersion(), o2.getVersion() ); + } + } + + + /** + * A data message with an index and a result. Can be used in aggregations + * @param <T> + */ + private static final class IndexResult<T>{ + private final int index; + + private final T value; + + + private IndexResult( final int index, final T value ) { + this.index = index; + this.value = value; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/61aa9794/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java index ce12429..b6ebcaa 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntilTest.java @@ -33,10 +33,10 @@ public class CollectUntilTest { public void testCollectUntil() { final CollectUntil<Integer, CountCollector> collectUntil = - new CollectUntil<>( + new CollectUntil<>( collector -> collector.isFull() , () -> new CountCollector(), - ( collector, value ) -> collector.mark(), - collector -> collector.isFull() ); + ( collector, value ) -> collector.mark() + ); final CountCollector collector = Observable.range( 0, 200 ).compose( collectUntil ).toBlocking().last();
