WIP, overwrite

Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cee76da7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cee76da7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cee76da7

Branch: refs/heads/USERGRID-528
Commit: cee76da72c3343d562867988f76f264204d54b59
Parents: 61aa979
Author: Todd Nine <[email protected]>
Authored: Tue Mar 24 14:15:49 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Tue Mar 24 14:15:49 2015 -0600

----------------------------------------------------------------------
 .../io/read/EntityIndexCommand.java             | 404 ++++++++++++-------
 .../corepersistence/rx/impl/CollectUntil.java   |   2 +-
 2 files changed, 270 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cee76da7/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
index a8fa221..464dd45 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/io/read/EntityIndexCommand.java
@@ -22,8 +22,9 @@ package org.apache.usergrid.corepersistence.io.read;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.collection.CollectionScope;
@@ -32,8 +33,8 @@ import 
org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -52,32 +53,34 @@ import rx.functions.Func0;
 import rx.functions.Func1;
 
 
+/**
+ * Performs a search of the given type along the specified edgeName.  It then 
loads and validates results,
+ * and returns an Observable of SearchResults
+ */
 @Singleton
 public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.SearchResults> {
 
     private final Id applicationId;
-    private final ApplicationScope applicationScope;
     private final ApplicationEntityIndex index;
     private final SearchTypes types;
     private final String query;
     private final int resultSetSize;
-    private final String scopeType;
+    private final String edgeName;
     private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
 
 
     @Inject
-    public EntityIndexCommand( final Id applicationId, final ApplicationScope 
applicationScope,
-                               final ApplicationEntityIndex index, final 
SearchTypes types, final String query,
-                               final int resultSetSize, final String scopeType,
+    public EntityIndexCommand( final Id applicationId, final 
ApplicationEntityIndex index, final SearchTypes types,
+                               final String query, final int resultSetSize, 
final String edgeName,
                                final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
         this.applicationId = applicationId;
-        this.applicationScope = applicationScope;
+
 
         this.index = index;
         this.types = types;
         this.query = query;
         this.resultSetSize = resultSetSize;
-        this.scopeType = scopeType;
+        this.edgeName = edgeName;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
     }
 
@@ -85,150 +88,216 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
     @Override
     public Observable<EntityIndexCommand.SearchResults> call( final 
Observable<Id> idObservable ) {
 
-        //create our observable of candidate search results
-        final Observable<CandidateResults> candidateResults = idObservable
-            .flatMap( id -> Observable.create( new ElasticSearchObservable( 
initialSearch( id ), nextPage( id ) ) ) );
-
-        final Observable<CandidateResult> candidateObservable =
-            candidateResults.flatMap( candidates -> Observable.from( 
candidates ) );
-
-        //since we'll only have at most 100 results in memory at a time, we 
roll up our groups and emit them on to
-        // the collector
-        final Observable<CandidateGroup> candidateGroup =
-            candidateObservable.groupBy( candidate -> candidate.getId() ).map( 
observableGroup -> {
-
-
-                //for each group, create a list, then sort by highest version 
first
-                final List<CandidateResult> groupList = 
observableGroup.toList().toBlocking().last();
-
-                Collections.sort( groupList, 
CandidateVersionComparator::compare );
-
-                //create our candidate group and emit it
-                final CandidateGroup group =
-                    new CandidateGroup( groupList.get( 0 ), groupList.subList( 
1, groupList.size() ) );
-
-                return group;
-            } );
-
+        //load all the potential candidates
+        final Observable<CandidateCollector> collectedCandidates = 
idObservable.compose( createCandidates() );
 
-        //buffer our candidate group up to our resultset size.
-        final Observable<CandidateCollector> collectedCandidates =
-            candidateGroup.buffer( resultSetSize ).flatMap( candidates -> {
+        //now we have our collected candidates, load them from ES
+        final Observable<SearchResults> loadedEntities = 
collectedCandidates.map( loadEntities( resultSetSize ) );
 
-                final Observable<CandidateCollector> collector = 
Observable.from( candidates ).collect(
-                    () -> new CandidateCollector( resultSetSize ), ( 
candidateCollector, candidate ) -> {
-                        //add our candidates to our collector
-                        candidateCollector.addCandidate( candidate.toKeep );
-                        //add our empty results
-                        candidateCollector.addEmptyResults( candidate.toRemove 
);
-                    } );
-
-                return collector;
-            } );
 
-        //now we have our collected candidates, load them
+        //after we've loaded, remove everything that's left
+        loadedEntities.doOnNext( results ->{
 
+            //run the deindex
+            Observable.from( results.toRemove ).collect( () -> 
index.createBatch(), (batch, toRemove) -> {
+              batch.deindex( toRemove.indexScope, toRemove.result );
+            }).doOnNext( batch -> batch.execute() 
).toBlocking().lastOrDefault( null );
 
-        final Observable<SearchResults> loadedEntities = 
collectedCandidates.map( loadEntities(resultSetSize) );
+        });
 
 
         return loadedEntities;
     }
 
 
-    /**
-     * Perform the initial search with the sourceId
-     */
-    private Func0<CandidateResults> initialSearch( final Id sourceId ) {
-        return () -> index.search( new IndexScopeImpl( sourceId, scopeType ), 
types, Query.fromQL( query ) );
-    }
 
 
     /**
-     * Search the next page for the specified source
+     * Using an input of observable ids, transform and collect them into a set 
of CandidateCollector to be loaded, or
+     * verified by the EM
      */
-    private Func1<String, CandidateResults> nextPage( final Id sourceId ) {
-        return cursor -> index
-            .search( new IndexScopeImpl( sourceId, scopeType ), types, 
Query.fromQL( query ).withCursor( cursor ) );
-    }
+    private Observable.Transformer<Id, CandidateCollector> createCandidates() {
+        return idObservable -> {
 
+            //create our observable of candidate search results
+            final Observable<ScopedCandidateResults> candidateResults = 
idObservable.flatMap(
 
-    /**
-     * Function that will load our entities from our candidates, filter stale 
or missing candidates and return results
-     * @param expectedSize
-     * @return
-     */
-    private Func1<CandidateCollector, SearchResults> loadEntities(final int 
expectedSize) {
-        return candidateCollector -> {
 
-            //from our candidates, group them by id type so we can create 
scopes
-            Observable.from( candidateCollector.getCandidates() ).groupBy( 
candidate -> candidate.getId().getType() )
-                      .flatMap( groups -> {
+                id -> {
 
+                    //create the index scope from the id
+                    final IndexScope scope = createScope( id );
 
-                          final List<CandidateResult> candidates = 
groups.toList().toBlocking().last();
+                    //perform the ES search of candidate results
+                    final Observable<ScopedCandidateResults> scoped = 
Observable.create(
+                        new ElasticSearchObservable( initialSearch( scope ), 
nextPage( scope ) ) );
 
-                          //we can get no results, so quit aggregating if 
there are none
-                          if ( candidates.size() == 0 ) {
-                              return Observable.just( new SearchResults( 0 ) );
-                          }
 
+                    final Observable<CandidateCollector> collectedResults =  
scoped.flatMap( scopedCandidateResults -> {
 
-                          final String typeName = candidates.get( 0 
).getId().getType();
+                        //here b/c the compiler cannot infer type directly
+                        final Observable<CandidateResult> candidates =
+                            Observable.from( 
scopedCandidateResults.candidateResults );
 
-                          final String collectionType = 
CpNamingUtils.getCollectionScopeNameFromEntityType( typeName );
+                        return candidates.collect( () -> new 
CandidateCollector( resultSetSize,
+                            
scopedCandidateResults.candidateResults.getCursor() ), ( collector, candidate ) 
-> {
+                            collector.insert( candidate );
+                        } );
+                    } );
 
 
-                          //create our scope to get our entity manager
 
-                          final CollectionScope scope =
-                              new CollectionScopeImpl( applicationId, 
applicationId, collectionType );
+                    //group our candidates by type
+//                   collectedResults.flatMap( collected -> Observable.from( 
collected.candidates ) ).groupBy( candidate -> candidate.getId().getType() ).co
 
-                          final EntityCollectionManager ecm =
-                              
entityCollectionManagerFactory.createCollectionManager( scope );
 
+                    final Observable<SearchResults> resultsObservable = 
collectedResults.map( collectedCandidates -> {
 
-                          //get our entity ids
-                          final List<Id> entityIds =
-                              Observable.from( candidates ).map( c -> 
c.getId() ).toList().toBlocking().last();
 
-                          //TODO, change this out
+                        //we can get no results, so quit aggregating if there 
are none
+                        if ( collectedCandidates.candidates.size() == 0 ) {
+                            return new SearchResults( 0, 
collectedCandidates.cursor );
+                        }
 
-                          //an observable of our entity set
 
+                        final String collectionType =
+                            
CpNamingUtils.getCollectionScopeNameFromEntityType( id.getType() );
 
 
-                          //now go through all our candidates and verify
+                        //create our scope to get our entity manager
 
-                          return Observable.from( candidates ).collect(  () -> 
new SearchResults( expectedSize ), (searchResults, candidate) ->{
+                        final CollectionScope collectionScope =
+                            new CollectionScopeImpl( applicationId, 
applicationId, collectionType );
 
-                              final EntitySet entitySet = ecm.load( entityIds 
).toBlocking().last();
+                        final EntityCollectionManager ecm =
+                            
entityCollectionManagerFactory.createCollectionManager( collectionScope );
 
-                              final MvccEntity entity = entitySet.getEntity( 
candidate.getId() );
 
+                        //get our entity ids
+                        final List<Id> entityIds =
+                            Observable.from( collectedCandidates.candidates 
).map( c -> c.getId() ).toList()
+                                      .toBlocking().last();
 
-                              //our candidate it stale, or target entity was 
deleted add it to the remove of our collector
-                              if(UUIDComparator.staticCompare( 
entity.getVersion(), candidate.getVersion()) > 0 || 
!entity.getEntity().isPresent()){
-                                  searchResults.addToRemove( candidate );
-                                  return;
-                              }
+                        //TODO, change this out
 
+                        //an observable of our entity set
 
-                              searchResults.addEntity( 
entity.getEntity().get() );
+                        return ecm.load( entityIds ).map( results -> {
 
+                            Observable.from( entityIds ).collect( () -> new 
SearchResults( resultSetSize ),
+                                ( searchResults, entityIds ) -> {
+                                } );
+                        } );
+                    } );
 
-                          } )
-                              //add the existing set to remove to this set
-                              .doOnNext( results -> results.addToRemove( 
candidateCollector.getToRemove() ) );
+                    return resultsObservable;
+                } );
 
-                      } );
+            return candidateResults;
+        };
+    }
 
 
-            return null;
-        };
+    /**
+     * Perform the initial search with the sourceId
+     */
+    private Func0<ScopedCandidateResults> initialSearch( final IndexScope 
scope ) {
+        return () -> new ScopedCandidateResults(scope,  index.search(  scope, 
types, Query.fromQL( query ) ) );
+    }
+
+
+    /**
+     * Search the next page for the specified source
+     */
+    private Func1<String, ScopedCandidateResults> nextPage( final IndexScope 
scope  ) {
+        return cursor -> new ScopedCandidateResults(scope,   index.search( 
scope, types, Query.fromQL( query ).withCursor(
+            cursor ) ) );
     }
 
 
+    /**
+     * Create the scope and return it
+     * @param sourceId
+     * @return
+     */
+    private IndexScope createScope(final Id sourceId){
+        return new IndexScopeImpl( sourceId, edgeName );
+    }
+
+//
+//    /**
+//     * Function that will load our entities from our candidates, filter 
stale or missing candidates and return results
+//     */
+//    private Func1<CandidateCollector, SearchResults> loadEntities( final int 
expectedSize ) {
+//        return candidateCollector -> {
+//
+//            //from our candidates, group them by id type so we can create 
scopes
+//            Observable.from( candidateCollector.getCandidates() ).groupBy( 
candidate -> candidate.result.getId().getType() )
+//                      .flatMap( groups -> {
+//
+//
+//                          final List<ScopedCandidateResult> candidates = 
groups.toList().toBlocking().last();
+//
+//                          //we can get no results, so quit aggregating if 
there are none
+//                          if ( candidates.size() == 0 ) {
+//                              return Observable.just( new SearchResults(  0, 
null ) );
+//                          }
+//
+//
+//                          final String typeName = candidates.get( 0 
).result.getId().getType();
+//
+//                          final String collectionType = 
CpNamingUtils.getCollectionScopeNameFromEntityType( typeName );
+//
+//
+//                          //create our scope to get our entity manager
+//
+//                          final CollectionScope scope =
+//                              new CollectionScopeImpl( applicationId, 
applicationId, collectionType );
+//
+//                          final EntityCollectionManager ecm =
+//                              
entityCollectionManagerFactory.createCollectionManager( scope );
+//
+//
+//                          //get our entity ids
+//                          final List<Id> entityIds =
+//                              Observable.from( candidates ).map( c -> 
c.result.getId() ).toList().toBlocking().last();
+//
+//                          //TODO, change this out
+//
+//                          //an observable of our entity set
+//
+//
+//                          //now go through all our candidates and verify
+//
+//                          return Observable.from( candidates ).collect( () 
-> new SearchResults(
+//                                  expectedSize ),
+//                              ( searchResults, candidate ) -> {
+//
+//                                  final EntitySet entitySet = ecm.load( 
entityIds ).toBlocking().last();
+//
+//                                  final MvccEntity entity = 
entitySet.getEntity( candidate.result.getId() );
+//
+//
+//                                  //our candidate it stale, or target entity 
was deleted add it to the remove of our
+//                                  // collector
+//                                  if ( UUIDComparator.staticCompare( 
entity.getVersion(), candidate.result.getVersion() ) > 0
+//                                      || !entity.getEntity().isPresent() ) {
+//
+//                                      searchResults.addToRemove( candidate  
);
+//                                      return;
+//                                  }
+//
+//
+//                                  searchResults.addEntity( 
entity.getEntity().get() );
+//                              } )
+//                              //add the existing set to remove to this set
+//                              .doOnNext( results -> results.addToRemove( 
candidateCollector.getToRemove() ) );
+//                      } );
+//
+//
+//            return null;
+//        };
+//    }
 
 
     /**
@@ -238,20 +307,49 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
         private final List<CandidateResult> candidates;
         private final List<CandidateResult> toRemove;
 
+        private final Map<Id, Integer> indexMapping;
 
-        public CandidateCollector( final int maxSize ) {
+        private final String cursor;
+
+
+        public CandidateCollector( final int maxSize, final String cursor ) {
+            this.cursor = cursor;
             candidates = new ArrayList<>( maxSize );
             toRemove = new ArrayList<>( maxSize );
+            indexMapping = new HashMap<>(maxSize);
         }
 
 
-        public void addCandidate( final CandidateResult candidate ) {
-            this.candidates.add( candidate );
-        }
 
+        public void insert(final CandidateResult newValue){
+
+            final Id candidateId = newValue.getId();
+
+            final Integer index = indexMapping.get( candidateId  );
+
+            if(index == null){
+                candidates.add( newValue );
+                indexMapping.put( candidateId, candidates.size()-1);
+                return;
+            }
+
+            //present, perform a comparison
+
+            final CandidateResult existing = candidates.get( index );
+
+
+            //it's a greater version, add this to ignore
+            if(UUIDComparator.staticCompare( existing.getVersion(), 
newValue.getVersion() ) > 0){
+                toRemove.add( newValue );
+            }
+
+            //remove the stale version from the list and put it in deindex
+            else{
+                candidates.remove( index );
+                candidates.add( newValue );
+                toRemove.add( existing );
+            }
 
-        public void addEmptyResults( final Collection<CandidateResult> stale ) 
{
-            this.toRemove.addAll( stale );
         }
 
 
@@ -268,13 +366,16 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
 
     public static class SearchResults {
         private final List<Entity> entities;
-        private final List<CandidateResult> toRemove;
+        private final List<ScopedCandidateResult> toRemove;
+        private final int maxSize;
 
-        private String cursor;
+        private final String cursor;
 
 
-        public SearchResults( final int maxSize ) {
-            entities = new ArrayList<>( maxSize );
+        public SearchResults( final int maxSize, final String cursor ) {
+            this.maxSize = maxSize;
+            this.cursor = cursor;
+            this.entities = new ArrayList<>( maxSize );
             this.toRemove = new ArrayList<>( maxSize );
         }
 
@@ -284,20 +385,23 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
         }
 
 
-        public void addToRemove( final Collection<CandidateResult> stale ) {
+        public void addToRemove( final Collection<ScopedCandidateResult> stale 
) {
             this.toRemove.addAll( stale );
         }
 
 
-        public void addToRemove( final CandidateResult candidateResult ) {
-                   this.toRemove.add( candidateResult );
-               }
+        public void addToRemove( final ScopedCandidateResult candidateResult ) 
{
+            this.toRemove.add( candidateResult );
+        }
 
 
+        public String getCursor() {
+            return cursor;
+        }
 
 
-        public void setCursor( final String cursor ) {
-            this.cursor = cursor;
+        public boolean isFull(){
+            return entities.size() >= maxSize;
         }
     }
 
@@ -305,26 +409,26 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
     /**
      * An observable that will perform a search and continually emit results 
while they exist.
      */
-    public static class ElasticSearchObservable implements 
Observable.OnSubscribe<CandidateResults> {
+    public static class ElasticSearchObservable implements 
Observable.OnSubscribe<ScopedCandidateResults> {
 
-        private final Func1<String, CandidateResults> fetchNextPage;
-        private final Func0<CandidateResults> fetchInitialResults;
+        private final Func1<String, ScopedCandidateResults> fetchNextPage;
+        private final Func0<ScopedCandidateResults> fetchInitialResults;
 
 
-        public ElasticSearchObservable( final Func0<CandidateResults> 
fetchInitialResults,
-                                        final Func1<String, CandidateResults> 
fetchNextPage ) {
+        public ElasticSearchObservable( final Func0<ScopedCandidateResults> 
fetchInitialResults,
+                                        final Func1<String, 
ScopedCandidateResults> fetchNextPage ) {
             this.fetchInitialResults = fetchInitialResults;
             this.fetchNextPage = fetchNextPage;
         }
 
 
         @Override
-        public void call( final Subscriber<? super CandidateResults> 
subscriber ) {
+        public void call( final Subscriber<? super ScopedCandidateResults> 
subscriber ) {
 
             subscriber.onStart();
 
             try {
-                CandidateResults results = fetchInitialResults.call();
+                ScopedCandidateResults results = fetchInitialResults.call();
 
 
                 //emit our next page
@@ -332,13 +436,13 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
                     subscriber.onNext( results );
 
                     //if we have no cursor, we're done
-                    if ( !results.hasCursor() ) {
+                    if ( !results.candidateResults.hasCursor() ) {
                         break;
                     }
 
 
                     //we have a cursor, get our results to emit for the next 
page
-                    results = fetchNextPage.call( results.getCursor() );
+                    results = fetchNextPage.call( 
results.candidateResults.getCursor() );
                 }
 
                 subscriber.onCompleted();
@@ -351,16 +455,51 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
 
 
     /**
-     * A message that contains the candidate to keep, and the candidate 
toRemove
+     * A candidate result, aslong with the scope it was returned in
+     */
+    public static class ScopedCandidateResult{
+        private final IndexScope indexScope;
+        private final CandidateResult result;
+
+
+        public ScopedCandidateResult( final IndexScope indexScope, final 
CandidateResult result ) {
+            this.indexScope = indexScope;
+            this.result = result;
+        }
+    }
+
+
+    /**
+     * Object that represents our candidate results, along with the scope it 
was searched in
+     */
+    public static class ScopedCandidateResults {
+        private final IndexScope indexScope;
+        private CandidateResults candidateResults;
+
+
+        public ScopedCandidateResults( final IndexScope indexScope, final 
CandidateResults candidateResults ) {
+            this.indexScope = indexScope;
+            this.candidateResults = candidateResults;
+        }
+    }
+
+
+    /**
+     * A message that contains the candidate to keep, and the candidate 
toRemove and the scope they were searched in
      */
     public static class CandidateGroup {
+        private final IndexScope indexScope;
         private final CandidateResult toKeep;
         private final Collection<CandidateResult> toRemove;
+        private final String cursor;
 
 
-        public CandidateGroup( final CandidateResult toKeep, final 
Collection<CandidateResult> toRemove ) {
+        public CandidateGroup( final IndexScope indexScope, final 
CandidateResult toKeep,
+                               final Collection<CandidateResult> toRemove, 
final String cursor ) {
+            this.indexScope = indexScope;
             this.toKeep = toKeep;
             this.toRemove = toRemove;
+            this.cursor = cursor;
         }
     }
 
@@ -375,10 +514,5 @@ public class EntityIndexCommand implements Command<Id, 
EntityIndexCommand.Search
         }
     }
 
-    /***********************
-     * FROM HERE DOWN IS EXPERIMENTAL
-     *************************
-     */
-
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cee76da7/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
index 8a0c014..93de094 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/CollectUntil.java
@@ -38,7 +38,7 @@ public class CollectUntil<T, R> implements 
Observable.Transformer<T, R> {
     final Func1<R, Boolean> shortCircuitWhen;
 
 
-    public CollectUntil( final Func1<R, Boolean> shortCircuitWhen,  final 
Func0<R> stateFactory, final Action2<R, ? super T> collector) {
+    public CollectUntil(  final Func0<R> stateFactory,final Func1<R, Boolean> 
shortCircuitWhen,  final Action2<R, ? super T> collector) {
         this.stateFactory = stateFactory;
         this.collector = collector;
         this.shortCircuitWhen = shortCircuitWhen;

Reply via email to