http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java index 7ffe957..2ec0082 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java @@ -20,9 +20,15 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.corepersistence.pipeline.DataPipeline; -import org.apache.usergrid.persistence.Results; +import java.util.ArrayList; +import java.util.List; + +import org.apache.usergrid.corepersistence.pipeline.Pipeline; +import org.apache.usergrid.corepersistence.pipeline.PipelineResult; +import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateResultsEntityResultsCollector; +import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollector; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.util.ValidationUtils; import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Optional; @@ -38,60 +44,69 @@ import rx.Observable; */ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { + private static final int DEFAULT_LIMIT = 10; + + private final FilterFactory filterFactory; + + private final CollectorState collectorState; - private final ReadFilterFactory readFilterFactory; + private final ApplicationScope applicationScope; - private final DataPipeline pipeline; /** * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are * rendered */ - private CollectorFilter<Results> collectorFilter; + private List<Filter> filters; + private Optional<String> cursor; - private Optional<Integer> limit; + private int limit; @Inject - public ReadPipelineBuilderImpl( final ReadFilterFactory readFilterFactory, + public ReadPipelineBuilderImpl( final FilterFactory filterFactory, final CollectorFactory collectorFactory, @Assisted final ApplicationScope applicationScope ) { - this.readFilterFactory = readFilterFactory; + this.filterFactory = filterFactory; - //set up our pipeline with our application scope - this.pipeline = new DataPipeline( applicationScope ); + this.applicationScope = applicationScope; //init our cursor to empty this.cursor = Optional.absent(); //set the default limit - this.limit = Optional.absent(); + this.limit = DEFAULT_LIMIT; + + + this.collectorState = new CollectorState( collectorFactory ); + + this.filters = new ArrayList<>(); } @Override - public ReadPipelineBuilder withCursor( final String cursor ) { - this.cursor = Optional.fromNullable( cursor ); - pipeline.setCursor( this.cursor ); + public ReadPipelineBuilder withCursor( final Optional<String> cursor ) { + Preconditions.checkNotNull( cursor, "cursor must not be null" ); + this.cursor = cursor; return this; } @Override - public ReadPipelineBuilder withLimit( final int limit ) { - Preconditions.checkArgument( limit > 0, "You must set the limit > 0" ); - this. limit = Optional.of( limit ); - //set the default value - pipeline.setLimit( this.limit.or( 10 ) ); + public ReadPipelineBuilder withLimit( final Optional<Integer> limit ) { + Preconditions.checkNotNull( limit, "limit must not be null" ); + this.limit = limit.or( DEFAULT_LIMIT ); return this; } @Override public ReadPipelineBuilder setStartId( final Id id ) { - pipeline.withTraverseCommand( readFilterFactory.getEntityIdFilter( id ) ); + ValidationUtils.verifyIdentity( id ); + + filters.add( filterFactory.getEntityIdFilter( id ) ); - this.collectorFilter = null; + this.collectorState.clear(); return this; @@ -100,10 +115,12 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getEntityViaCollection( final String collectionName, final Id entityId ) { + Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); + ValidationUtils.verifyIdentity( entityId ); - pipeline.withTraverseCommand( readFilterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) ); + filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) ); - setEntityLoaderFilter(); + this.collectorState.setEntityLoaderCollector(); return this; } @@ -111,11 +128,11 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getCollection( final String collectionName ) { + Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); + filters.add( filterFactory.readGraphCollectionFilter( collectionName ) ); - pipeline.withTraverseCommand( readFilterFactory.readGraphCollectionCommand( collectionName ) ); - - setEntityLoaderFilter(); + this.collectorState.setEntityLoaderCollector(); return this; } @@ -123,18 +140,26 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String query ) { + Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); + Preconditions.checkNotNull( query, "query must not be null" ); //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = readFilterFactory.queryCollectionElasticSearchCollector( collectionName, query ); + + filters.add( filterFactory.collectionElasticSearchFilter( query, collectionName ) ); + + this.collectorState.setCandidateResultsEntityResultsCollector(); + return this; } @Override public ReadPipelineBuilder getEntityViaConnection( final String connectionName, final Id entityId ) { + Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); + ValidationUtils.verifyIdentity( entityId ); - pipeline.withTraverseCommand( readFilterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) ); - setEntityLoaderFilter(); + filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) ); + collectorState.setEntityLoaderCollector(); return this; } @@ -142,9 +167,9 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getConnection( final String connectionName ) { - - pipeline.withTraverseCommand( readFilterFactory.readGraphConnectionCommand( connectionName ) ); - setEntityLoaderFilter(); + Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); + filters.add( filterFactory.readGraphConnectionFilter( connectionName ) ); + collectorState.setEntityLoaderCollector(); return this; } @@ -152,51 +177,98 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getConnection( final String connectionName, final String entityType ) { - pipeline.withTraverseCommand( readFilterFactory.readGraphConnectionCommand( connectionName, entityType ) ); - setEntityLoaderFilter(); + Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); + Preconditions.checkNotNull( connectionName, "entityType must not be null" ); + + filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) ); + collectorState.setEntityLoaderCollector(); return this; } - /** - * - * @param connectionName - * @param query - * @return - */ @Override - public ReadPipelineBuilder connectionWithQuery( final String connectionName, final String query ) { + public ReadPipelineBuilder connectionWithQuery( final String connectionName, final Optional<String> entityType, + final String query ) { - //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = readFilterFactory.queryConnectionElasticSearchCollector( connectionName, query ); + Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); + Preconditions.checkNotNull( connectionName, "entityType must not be null" ); + Preconditions.checkNotNull( query, "query must not be null" ); + filters.add( filterFactory.connectionElasticSearchFilter( query, connectionName, entityType ) ); + collectorState.setCandidateResultsEntityResultsCollector(); return this; } @Override - public ReadPipelineBuilder connectionWithQuery( final String connectionName, final String entityType, - final String query ) { + public Observable<PipelineResult<ResultsPage>> execute() { - //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = - readFilterFactory.queryConnectionElasticSearchCollector( connectionName, entityType, query); - return this; - } + ValidationUtils.validateApplicationScope( applicationScope ); + final Collector<?, ResultsPage> collector = collectorState.getCollector(); - @Override - public Observable<Results> build() { - Preconditions.checkNotNull( collectorFilter, + Preconditions.checkNotNull( collector, "You have not specified an operation that creates a collection filter. This is required for loading " + "results" ); - return pipeline.build( collectorFilter ); + + Preconditions.checkNotNull( cursor, "A cursor should be initialized even if absent" ); + + Preconditions.checkArgument( limit > 0, "limit must be > than 0" ); + + + Pipeline pipeline = new Pipeline( applicationScope, filters, collector, cursor, limit ); + + + return pipeline.execute(); } - private void setEntityLoaderFilter() { - collectorFilter = readFilterFactory.entityLoadCollector(); + /** + * A mutable state for our collectors. Rather than create a new instance each time, we create a singleton + * collector + */ + private static final class CollectorState { + private final CollectorFactory collectorFactory; + + private EntityLoadCollector entityLoadCollector; + + private CandidateResultsEntityResultsCollector candidateResultsEntityResultsCollector; + + + private Collector<?, ResultsPage> collector = null; + + + private CollectorState( final CollectorFactory collectorFactory ) {this.collectorFactory = collectorFactory;} + + + public void setEntityLoaderCollector() { + if ( entityLoadCollector == null ) { + entityLoadCollector = collectorFactory.entityLoadCollector(); + } + + + collector = entityLoadCollector; + } + + + public void setCandidateResultsEntityResultsCollector() { + if ( candidateResultsEntityResultsCollector == null ) { + candidateResultsEntityResultsCollector = collectorFactory.candidateResultsEntityResultsCollector(); + } + + collector = candidateResultsEntityResultsCollector; + } + + + public void clear() { + collector = null; + } + + + public Collector<?, ResultsPage> getCollector() { + return collector; + } } }
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java new file mode 100644 index 0000000..e428e7a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import java.util.List; + +import org.apache.usergrid.persistence.model.entity.Entity; + + +/** + * An encapsulation of entities as a group of responses. Ordered by the requesting filters. Each set should be considered a "page" of results. + */ +public class ResultsPage { + + private final List<Entity> entityList; + + + public ResultsPage( final List<Entity> entityList ) {this.entityList = entityList;} + + + public List<Entity> getEntityList() { + return entityList; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java deleted file mode 100644 index ba7c802..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/TraverseFilter.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.persistence.model.entity.Id; - - -/** - * Traverses edges in the graph. Either by query or graph traversal. Take an observable of ids, and emits - * an observable of ids - */ -public interface TraverseFilter extends Filter<Id> {} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java new file mode 100644 index 0000000..eac8a65 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter; +import org.apache.usergrid.corepersistence.pipeline.read.CandidateResultsFilter; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.metrics.ObservableTimer; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.codahale.metrics.Timer; +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * Command for reading graph edges + */ +public abstract class AbstractElasticSearchFilter extends AbstractSeekingFilter<Id, CandidateResults, Integer> + implements CandidateResultsFilter { + + private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class ); + + private final EntityIndexFactory entityIndexFactory; + private final String query; + private final Timer searchTimer; + + + /** + * Create a new instance of our command + */ + public AbstractElasticSearchFilter( final EntityIndexFactory entityIndexFactory, + final MetricsFactory metricsFactory, final String query ) { + this.entityIndexFactory = entityIndexFactory; + this.query = query; + this.searchTimer = metricsFactory.getTimer( AbstractElasticSearchFilter.class, "query" ); + } + + + @Override + public Observable<CandidateResults> call( final Observable<Id> observable ) { + + //get the graph manager + final ApplicationEntityIndex applicationEntityIndex = + entityIndexFactory.createApplicationEntityIndex( pipelineContext.getApplicationScope() ); + + + final int limit = pipelineContext.getLimit(); + + + final SearchTypes searchTypes = getSearchTypes(); + + + //return all ids that are emitted from this edge + return observable.flatMap( id -> { + + final SearchEdge searchEdge = getSearchEdge( id ); + + + final Observable<CandidateResults> candidates = Observable.create( subscriber -> { + + //our offset to our start value. This will be set the first time we emit + //after we receive new ids, we want to reset this to 0 + //set our our constant state + final Optional<Integer> startFromCursor = getSeekValue(); + + final int startOffset = startFromCursor.or( 0 ); + + int currentOffSet = startOffset; + + subscriber.onStart(); + + //emit while we have values from ES + while ( true ) { + + + try { + final CandidateResults candidateResults = + applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet ); + + currentOffSet += candidateResults.size(); + + //set the cursor for the next value + setCursor( currentOffSet ); + + /** + * No candidates, we're done + */ + if ( candidateResults.size() == 0 ) { + subscriber.onCompleted(); + return; + } + + subscriber.onNext( candidateResults ); + } + catch ( Throwable t ) { + + log.error( "Unable to search candidates", t ); + subscriber.onError( t ); + } + } + } ); + + + //add a timer around our observable + ObservableTimer.time( candidates, searchTimer ); + + return candidates; + } ); + } + + + @Override + protected CursorSerializer<Integer> getCursorSerializer() { + return ElasticsearchCursorSerializer.INSTANCE; + } + + + /** + * Get the search edge from the id + */ + protected abstract SearchEdge getSearchEdge( final Id id ); + + /** + * Get the search types + */ + protected abstract SearchTypes getSearchTypes(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java deleted file mode 100644 index f46a78a..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractQueryElasticSearchCollectorFilter.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ElasticSearchQueryExecutor; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ResultsLoaderFactory; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.SimpleEntityRef; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * A command that will query and load elasticsearch - * - * On future iteration, this needs to be split into 2 commands 1 that loads the candidate results and validates the - * versions then another that will search and load - * - * TODO, split this into 3 seperate observables - * - * 1) An observable that emits candidate results 2) An observable that validates versions by uuid (for Traverse - * commands) 3) An observbale that emits Results as a collector (for final commands) - */ -public abstract class AbstractQueryElasticSearchCollectorFilter extends AbstractFilter<Results, Integer> - implements CollectorFilter<Results> { - - - protected final EntityIndexFactory applicationEntityIndex; - protected final Query query; - private int limit; - - - @Inject - protected AbstractQueryElasticSearchCollectorFilter( final EntityIndexFactory applicationEntityIndex, final Query query ) { - this.applicationEntityIndex = applicationEntityIndex; - this.query = query; - } - - - @Override - public Observable<Results> call( final Observable<Id> idObservable ) { - - - final ApplicationEntityIndex - entityIndex = applicationEntityIndex.createApplicationEntityIndex( applicationScope ); - - return idObservable.flatMap( id -> { - - //TODO, refactor this logic to use Observables. make this a TraverseFilter and load entities with the entity loader collector - final ResultsLoaderFactory resultsLoaderFactory = getResultsLoaderFactory( id ); - final SearchEdge searchEdge = getSearchEdge( id ); - final SearchTypes searchTypes = getSearchTypes(); - - - - final Iterable<Results> executor = - new ElasticSearchQueryExecutor( resultsLoaderFactory, entityIndex, applicationScope, - searchEdge, searchTypes, query.withLimit( limit ) ); - - return Observable.from( executor ); - } ); - } - - - /** - * Get the search types - */ - protected abstract SearchTypes getSearchTypes(); - - /** - * Get the search edge - */ - protected abstract SearchEdge getSearchEdge(final Id incomingId); - - - /** - * Get the results loader factor - */ - protected abstract ResultsLoaderFactory getResultsLoaderFactory( final Id incomingId ); - - - @Override - protected CursorSerializer<Integer> getCursorSerializer() { - return ElasticsearchCursorSerializer.INSTANCE; - } - - - @Override - public void setLimit( final int limit ) { - this.limit = limit; - } - - - /** - * Get an entiity ref from the Id. TODO refactor this away - * @param id - * @return - */ - protected EntityRef getRef(final Id id){ - return new SimpleEntityRef( id.getType(), id.getUuid() ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java new file mode 100644 index 0000000..83a4b8c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.Collector; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Loads entities from an incoming CandidateResults object and return them as results + */ +public class CandidateResultsEntityResultsCollector extends AbstractPipelineOperation<CandidateResults, ResultsPage> + implements Collector<CandidateResults, ResultsPage> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; + + + @Inject + public CandidateResultsEntityResultsCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.entityIndexFactory = entityIndexFactory; + } + + + @Override + public Observable<ResultsPage> call( final Observable<CandidateResults> observable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + final ApplicationEntityIndex applicationIndex = + entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + final Observable<ResultsPage> searchIdSetObservable = observable.flatMap( candidateResults -> { + //flatten toa list of ids to load + final Observable<List<Id>> candidateIds = + Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList(); + + //load the ids + final Observable<EntitySet> entitySetObservable = + candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) ); + + //now we have a collection, validate our canidate set is correct. + + return entitySetObservable + .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) ) + .doOnNext( entityCollector -> entityCollector.merge() ) + .map( entityCollector -> entityCollector.getResults() ); + } ); + + return searchIdSetObservable; + } + + + /** + * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally + */ + private static final class EntityCollector { + + private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class ); + private List<Entity> results = new ArrayList<>(); + + private final EntityIndexBatch batch; + private final CandidateResults candidateResults; + private final EntitySet entitySet; + + + public EntityCollector( final EntityIndexBatch batch, final EntitySet entitySet, + final CandidateResults candidateResults ) { + this.batch = batch; + this.entitySet = entitySet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( entitySet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final CandidateResult candidateResult : candidateResults ) { + validate( candidateResult ); + } + + batch.execute(); + } + + + public ResultsPage getResults() { + return new ResultsPage( results ); + } + + + public EntityIndexBatch getBatch() { + return batch; + } + + + private void validate( final CandidateResult candidateResult ) { + + final Id candidateId = candidateResult.getId(); + final UUID candidateVersion = candidateResult.getVersion(); + + + final MvccEntity entity = entitySet.getEntity( candidateId ); + + + //doesn't exist warn and drop + if ( entity == null ) { + logger.warn( + "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra." + + " Ignoring since this could be a region sync issue", + candidateId, candidateVersion ); + + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + + } + + + final UUID entityVersion = entity.getVersion(); + + + //entity is newer than ES version, could be an update or the entity is marked as deleted + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0) { + + final Id entityId = entity.getId(); + final SearchEdge searchEdge = candidateResults.getSearchEdge(); + + logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + new Object[] { searchEdge, entityId, entityVersion } ); + batch.deindex( searchEdge, entityId, entityVersion ); + return; + } + + //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to + //remove the ES record, since the read in cass should cause a read repair, just ignore + if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { + + final Id entityId = entity.getId(); + final SearchEdge searchEdge = candidateResults.getSearchEdge(); + + logger.warn( + "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " + + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); + + //TODO trigger an audit after a fail count where we explicitly try to repair from other regions + + return; + } + + //they're the same add it + + + results.add( entity.getEntity().get() ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java new file mode 100644 index 0000000..bb9ab76 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.MvccLogEntry; +import org.apache.usergrid.persistence.collection.VersionSet; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.CandidateResult; +import org.apache.usergrid.persistence.index.CandidateResults; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.fasterxml.uuid.UUIDComparator; +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Responsible for verifying candidate result versions, then emitting the Ids of these versions + * Input is a batch of candidate results, output is a stream of validated Ids + */ +public class CandidateResultsIdVerifyFilter extends AbstractPipelineOperation<CandidateResults, Id> + implements PipelineOperation<CandidateResults, Id> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final EntityIndexFactory entityIndexFactory; + + + @Inject + public CandidateResultsIdVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final EntityIndexFactory entityIndexFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.entityIndexFactory = entityIndexFactory; + } + + + + @Override + public Observable<Id> call( final Observable<CandidateResults> observable ) { + + + /** + * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results + * objects + */ + + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( applicationScope ); + + + final ApplicationEntityIndex applicationIndex = + entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + final Observable<Id> searchIdSetObservable = observable.flatMap( candidateResults -> { + //flatten toa list of ids to load + final Observable<List<Id>> candidateIds = + Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList(); + + //load the ids + final Observable<VersionSet> versionSetObservable = + candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) ); + + //now we have a collection, validate our canidate set is correct. + + return versionSetObservable + .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) ) + .doOnNext( entityCollector -> entityCollector.merge() ) + .flatMap( entityCollector -> Observable.from( entityCollector.collectResults() ) ); + } ); + + return searchIdSetObservable; + } + + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + private static final class EntityCollector { + + private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class ); + private List<Id> results = new ArrayList<>(); + + private final EntityIndexBatch batch; + private final CandidateResults candidateResults; + private final VersionSet versionSet; + + + public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet, + final CandidateResults candidateResults ) { + this.batch = batch; + this.versionSet = versionSet; + this.candidateResults = candidateResults; + this.results = new ArrayList<>( versionSet.size() ); + } + + + /** + * Merge our candidates and our entity set into results + */ + public void merge() { + + for ( final CandidateResult candidateResult : candidateResults ) { + validate( candidateResult ); + } + + batch.execute(); + } + + + public List<Id> collectResults() { + return results; + } + + + /** + * Validate each candidate results vs the data loaded from cass + * @param candidateResult + */ + private void validate( final CandidateResult candidateResult ) { + + final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() ); + + final UUID candidateVersion = candidateResult.getVersion(); + + final UUID entityVersion = logEntry.getVersion(); + + final Id entityId = logEntry.getEntityId(); + + //entity is newer than ES version + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) { + + final SearchEdge searchEdge = candidateResults.getSearchEdge(); + + logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + new Object[] { searchEdge, entityId, entityVersion } ); + batch.deindex( searchEdge, entityId, entityVersion ); + return; + } + + //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to + //remove the ES record, since the read in cass should cause a read repair, just ignore + if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) { + + final SearchEdge searchEdge = candidateResults.getSearchEdge(); + + logger.warn( + "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair " + + "should be run", new Object[] { searchEdge, entityId, entityVersion } ); + } + + //they're the same add it + + results.add( entityId ); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java new file mode 100644 index 0000000..4280f6a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CollectionElasticSearchFilter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge; + + +public class CollectionElasticSearchFilter extends AbstractElasticSearchFilter { + + private final String collectionName; + + /** + * Create a new instance of our command + */ + @Inject + public CollectionElasticSearchFilter( final EntityIndexFactory entityIndexFactory, + final MetricsFactory metricsFactory, + @Assisted("query") + final String query, + @Assisted("collectionName") final String collectionName ) { + super( entityIndexFactory, metricsFactory, query ); + this.collectionName = collectionName; + } + + + + @Override + protected SearchTypes getSearchTypes() { + final SearchTypes types = SearchTypes.fromTypes( collectionName ); + + return types; + } + + + @Override + protected SearchEdge getSearchEdge( final Id incomingId ) { + final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName ); + + return searchEdge; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java new file mode 100644 index 0000000..ab5d233 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ConnectionElasticSearchFilter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; + + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.SearchEdge; +import org.apache.usergrid.persistence.index.SearchTypes; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge; + + +public class ConnectionElasticSearchFilter extends AbstractElasticSearchFilter { + + + private final String connectionName; + private final Optional<String> connectedEntityType; + + + /** + * Create a new instance of our command + */ + @Inject + public ConnectionElasticSearchFilter( final EntityIndexFactory entityIndexFactory, + final MetricsFactory metricsFactory, @Assisted("query") final String query, + @Assisted("connectionName") final String connectionName, + @Assisted("connectedEntityType") final Optional<String> connectedEntityType ) { + super( entityIndexFactory, metricsFactory, query ); + + this.connectionName = connectionName; + this.connectedEntityType = connectedEntityType; + } + + + @Override + protected SearchTypes getSearchTypes() { + final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType.orNull() ); + + return searchTypes; + } + + + @Override + protected SearchEdge getSearchEdge( final Id id ) { + final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName ); + + return searchEdge; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java deleted file mode 100644 index 4813978..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryCollectionElasticSearchCollectorFilter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ResultsLoaderFactory; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge; - - -/** - * Command for querying collections - */ -public class QueryCollectionElasticSearchCollectorFilter extends AbstractQueryElasticSearchCollectorFilter { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final EntityIndexFactory entityIndexFactory; - private final String collectionName; - - - @Inject - public QueryCollectionElasticSearchCollectorFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final EntityIndexFactory entityIndexFactory, - @Assisted final String collectionName , @Assisted final Query query ) { - super( entityIndexFactory, query ); - this.entityIndexFactory = entityIndexFactory; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.collectionName = collectionName; - } - - - @Override - protected SearchTypes getSearchTypes() { - final SearchTypes types = SearchTypes.fromTypes( collectionName ); - - return types; - } - - - @Override - protected SearchEdge getSearchEdge( final Id incomingId ) { - final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName ); - - return searchEdge; - } - - - @Override - protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) { - final EntityCollectionManager entityCollectionManager = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - final ApplicationEntityIndex entityIndex = entityIndexFactory.createApplicationEntityIndex( applicationScope ); - - final EntityRef entityRef = getRef( id ); - return new ConnectionResultsLoaderFactoryImpl( entityCollectionManager, entityIndex, entityRef, - collectionName ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java deleted file mode 100644 index 2f7a6b3..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/QueryConnectionElasticSearchCollectorFilter.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch; - - -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ResultsLoaderFactory; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge; - - -/** - * Command for querying connections - */ -public class QueryConnectionElasticSearchCollectorFilter extends AbstractQueryElasticSearchCollectorFilter { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final EntityIndexFactory entityIndexFactory; - private final String connectionName; - - - @Inject - public QueryConnectionElasticSearchCollectorFilter( - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final EntityIndexFactory entityIndexFactory, @Assisted final String connectionName, - @Assisted final Query query ) { - super( entityIndexFactory, query ); - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.entityIndexFactory = entityIndexFactory; - this.connectionName = connectionName; - } - - - @Override - protected SearchTypes getSearchTypes() { - - final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() ); - - return searchTypes; - } - - - @Override - protected SearchEdge getSearchEdge( final Id id ) { - final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName ); - - return searchEdge; - } - - - @Override - protected ResultsLoaderFactory getResultsLoaderFactory( final Id id ) { - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - final ApplicationEntityIndex entityIndex = entityIndexFactory.createApplicationEntityIndex( applicationScope ); - - final EntityRef entityRef = getRef( id ); - return new ConnectionResultsLoaderFactoryImpl( entityCollectionManager, entityIndex, entityRef, - connectionName ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java index ccc5198..6e170f8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java @@ -48,7 +48,13 @@ public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<R private final SearchTypes types; - private final Query query; + private final String query; + + private final Optional<Integer> setOffsetFromCursor; + + private final int limit; + + private int offset; private Results currentResults; @@ -56,18 +62,22 @@ public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<R private boolean moreToLoad = true; + + public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex, final ApplicationScope applicationScope, final SearchEdge indexScope, - final SearchTypes types, final Query query ) { + final SearchTypes types, final String query, final Optional<Integer> setOffsetFromCursor, final int limit ) { this.resultsLoaderFactory = resultsLoaderFactory; this.applicationScope = applicationScope; this.entityIndex = entityIndex; this.indexScope = indexScope; this.types = types; + this.setOffsetFromCursor = setOffsetFromCursor; //we must deep copy the query passed. Otherwise we will modify it's state with cursors. Won't fix, not relevant //once we start subscribing to streams. - this.query = new Query(query); + this.query = query; + this.limit = limit; } @@ -83,7 +93,6 @@ public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<R final int maxQueries = 10; // max re-queries to satisfy query limit - final int originalLimit = query.getLimit(); Results results = null; int queryCount = 0; @@ -91,13 +100,15 @@ public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<R CandidateResults crs = null; + int newLimit = limit; + while ( queryCount++ < maxQueries ) { - crs = getCandidateResults( query ); + crs = entityIndex.search( indexScope, types, query, newLimit , offset); logger.debug( "Calling build results with crs {}", crs ); - results = buildResults( indexScope, query, crs ); + results = buildResults( indexScope, crs ); /** * In an edge case where we delete stale entities, we could potentially get less results than expected. @@ -115,17 +126,15 @@ public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<R // need to query for more // ask for just what we need to satisfy, don't want to exceed limit - query.setOffsetFromCursor(results.getCursor()); - query.setLimit( originalLimit - results.size() ); + newLimit = newLimit - results.size(); logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] { - originalLimit, query.getLimit(), queryCount + limit, newLimit, queryCount } ); } //now set our cursor if we have one for the next iteration if ( results.hasCursor() ) { - query.setOffsetFromCursor(results.getCursor()); moreToLoad = true; } @@ -133,48 +142,32 @@ public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<R moreToLoad = false; } - - //set our select subjects into our query if provided - if(crs != null){ - query.setSelectSubjects( crs.getGetFieldMappings() ); - } - +// +// //set our select subjects into our query if provided +// if(crs != null){ +// query.setSelectSubjects( crs.getGetFieldMappings() ); +// } +// //set our current results and the flag this.currentResults = results; } - /** - * Get the candidates or load the cursor, whichever we require - * @param query - * @return - */ - private CandidateResults getCandidateResults(final Query query){ - final Optional<Integer> cursor = query.getOffset(); - final String queryToExecute = query.getQl().or("select *"); - - CandidateResults results = cursor.isPresent() - ? entityIndex.search( indexScope, types, queryToExecute, query.getLimit() , cursor.get()) - : entityIndex.search( indexScope, types, queryToExecute, query.getLimit()); - - return results; - } - /** * Build results from a set of candidates, and discard those that represent stale indexes. * - * @param query Query that was executed + * @param indexScope The index scope to execute the search on * @param crs Candidates to be considered for results */ - private Results buildResults( final SearchEdge indexScope, final Query query, final CandidateResults crs ) { + private Results buildResults( final SearchEdge indexScope, final CandidateResults crs ) { logger.debug( "buildResults() from {} candidates", crs.size() ); //get an instance of our results loader final ResultsLoader resultsLoader = - this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() ); + this.resultsLoaderFactory.getLoader( applicationScope, indexScope, Query.Level.ALL_PROPERTIES ); //load the results final Results results = resultsLoader.loadResults(crs); @@ -183,12 +176,6 @@ public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<R resultsLoader.postProcess(); //set offset into query - if(crs.getOffset().isPresent()) { - query.setOffset(crs.getOffset().get()); - }else{ - query.clearOffset(); - } - results.setCursorFromOffset( query.getOffset() ); logger.debug( "Returning results size {}", results.size() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java index 8ba5238..6230147 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java @@ -20,12 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.entity; -import java.io.Serializable; - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; @@ -38,7 +34,7 @@ import rx.Observable; * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should * be removed */ -public class EntityIdFilter extends AbstractFilter<Id, Serializable> implements TraverseFilter { +public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> implements Filter<Id, Id> { private final Id entityId; @@ -54,9 +50,4 @@ public class EntityIdFilter extends AbstractFilter<Id, Serializable> implements return Observable.just( entityId ); } - - @Override - protected CursorSerializer<Serializable> getCursorSerializer() { - return NoCursorSerializer.create(); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java new file mode 100644 index 0000000..dd6b9b8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.entity; + + +import java.util.List; + +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.Collector; +import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; +import org.apache.usergrid.persistence.collection.EntityCollectionManager; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.EntitySet; +import org.apache.usergrid.persistence.collection.MvccEntity; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; + +import rx.Observable; + + +/** + * Loads entities from a set of Ids. + * + * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification + */ +public class EntityLoadCollector extends AbstractPipelineOperation<Id, ResultsPage> + implements Collector<Id, ResultsPage> { + + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public Observable<ResultsPage> call( final Observable<Id> observable ) { + + + final EntityCollectionManager entityCollectionManager = + entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() ); + + final Observable<EntitySet> entitySetObservable = observable.buffer( pipelineContext.getLimit() ).flatMap( + bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) ); + + + final Observable<ResultsPage> resultsObservable = entitySetObservable + + .flatMap( entitySet -> { + + //get our entites and filter missing ones, then collect them into a results object + final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() ); + + + //convert them to our old entity model, then filter abscent, meaning they weren't found + final Observable<List<Entity>> entitiesPageObservable = + mvccEntityObservable.filter( mvccEntity -> mvccEntity.getEntity().isPresent() ) + .map( mvccEntity -> mvccEntity.getEntity().get() ).toList(); + + //convert them to a list, then map them into results + return entitiesPageObservable.map( entities -> new ResultsPage( entities ) ); + } ); + + + return resultsObservable; + } + + /** + * Map a new cp entity to an old entity. May be null if not present + */ + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java deleted file mode 100644 index 78a9835..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollectorFilter.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.entity; - - -import java.io.Serializable; -import java.util.Map; - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.CollectorFilter; -import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; -import org.apache.usergrid.persistence.EntityFactory; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.EntitySet; -import org.apache.usergrid.persistence.collection.MvccEntity; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.inject.Inject; - -import rx.Observable; - - -/** - * Loads entities from a set of Ids. - * - * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification - */ -public class EntityLoadCollectorFilter extends AbstractFilter<Results, Serializable> - implements CollectorFilter<Results> { - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private int resultSize; - - - @Inject - public EntityLoadCollectorFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - } - - - @Override - protected CursorSerializer<Serializable> getCursorSerializer() { - return NoCursorSerializer.create(); - } - - - @Override - public Observable<Results> call( final Observable<Id> observable ) { - - - /** - * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results - * objects - */ - - final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( applicationScope ); - - final Observable<EntitySet> entitySetObservable = observable.buffer( resultSize ).flatMap( - bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) ); - - - final Observable<Results> resultsObservable = entitySetObservable - - .flatMap( entitySet -> { - - //get our entites and filter missing ones, then collect them into a results object - final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() ); - - //convert them to our old entity model, then filter nulls, meaning they weren't found - return mvccEntityObservable.map( mvccEntity -> mapEntity( mvccEntity ) ).filter( entity -> entity != null ) - - //convert them to a list, then map them into results - .toList().map( entities -> { - final Results results = Results.fromEntities( entities ); - results.setCursor( generateCursor() ); - - return results; - } ); - } ); - - - return resultsObservable; - } - - /** - * Map a new cp entity to an old entity. May be null if not present - */ - - - private org.apache.usergrid.persistence.Entity mapEntity( final MvccEntity mvccEntity ) { - if ( !mvccEntity.getEntity().isPresent() ) { - return null; - } - - - final Entity cpEntity = mvccEntity.getEntity().get(); - final Id entityId = cpEntity.getId(); - - org.apache.usergrid.persistence.Entity entity = - EntityFactory.newEntity( entityId.getUuid(), entityId.getType() ); - - Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity ); - entity.addProperties( entityMap ); - - return entity; - } - - - @Override - public void setLimit( final int limit ) { - this.resultSize = limit; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java index 9f63bd8..e0f69cf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java @@ -20,10 +20,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.cursor.NoCursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchByEdge; @@ -41,7 +39,8 @@ import rx.Observable; /** * Filter should take and Id and a graph edge, and ensure the connection between the two exists */ -public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements TraverseFilter { +public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOperation<Id, Id> implements + Filter<Id, Id> { private final GraphManagerFactory graphManagerFactory; private final Id targetId; @@ -56,15 +55,9 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, @Override - protected CursorSerializer<Id> getCursorSerializer() { - return NoCursorSerializer.create(); - } - - - @Override public Observable<Id> call( final Observable<Id> idObservable ) { - final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); return idObservable.flatMap( id -> { final String edgeTypeName = getEdgeName(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java index e6da9c2..4021952 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java @@ -21,8 +21,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -38,7 +38,7 @@ import rx.Observable; /** * Command for reading graph edges */ -public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> implements TraverseFilter { +public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> { private final GraphManagerFactory graphManagerFactory; @@ -55,10 +55,8 @@ public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> i public Observable<Id> call( final Observable<Id> observable ) { //get the graph manager - final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); - //set our our constant state - final Optional<Edge> startFromCursor = getCursor(); final String edgeName = getEdgeTypeName(); @@ -66,6 +64,10 @@ public abstract class AbstractReadGraphFilter extends AbstractFilter<Id, Edge> i //return all ids that are emitted from this edge return observable.flatMap( id -> { + //set our our constant state + final Optional<Edge> startFromCursor = getSeekValue(); + + final SimpleSearchByEdgeType search = new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, startFromCursor ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b5e60e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java index 65b02b6..12306fd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java @@ -21,8 +21,8 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.read.TraverseFilter; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; @@ -42,7 +42,7 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeType /** * Command for reading graph edges on a connection */ -public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> implements TraverseFilter { +public class ReadGraphConnectionByTypeFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> { private final GraphManagerFactory graphManagerFactory; private final String connectionName; @@ -54,7 +54,7 @@ public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> im */ @Inject public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory, - @Assisted final String connectionName, @Assisted final String entityType ) { + @Assisted("connectionName") final String connectionName, @Assisted("entityType") final String entityType ) { this.graphManagerFactory = graphManagerFactory; this.connectionName = connectionName; this.entityType = entityType; @@ -65,10 +65,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> im public Observable<Id> call( final Observable<Id> observable ) { //get the graph manager - final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope ); + final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); + - //set our our constant state - final Optional<Edge> startFromCursor = getCursor(); final String edgeName = getEdgeTypeFromConnectionType( connectionName ); @@ -76,6 +75,9 @@ public class ReadGraphConnectionByTypeFilter extends AbstractFilter<Id, Edge> im //return all ids that are emitted from this edge return observable.flatMap( id -> { + //set our our constant state + final Optional<Edge> startFromCursor = getSeekValue(); + final SimpleSearchByIdType search = new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, entityType, startFromCursor );
