Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-593 0523dc038 -> 9dc65dc8c
Refactor complete. Now need to test Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9dc65dc8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9dc65dc8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9dc65dc8 Branch: refs/heads/USERGRID-593 Commit: 9dc65dc8c3176d28d61119c0c434b47b7fc08254 Parents: 0523dc0 Author: Todd Nine <tn...@apigee.com> Authored: Fri Apr 24 18:32:36 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Fri Apr 24 18:32:36 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 163 +++++++++---------- .../pipeline/read/FilterFactory.java | 9 +- .../pipeline/read/ReadPipelineBuilder.java | 29 ++-- .../pipeline/read/ReadPipelineBuilderImpl.java | 69 +++++--- .../results/AbstractGraphQueryExecutor.java | 133 --------------- .../results/CollectionGraphQueryExecutor.java | 60 ------- .../results/ConnectionGraphQueryExecutor.java | 58 ------- 7 files changed, 139 insertions(+), 382 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index d8f9446..af926bf 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -31,12 +31,9 @@ import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.index.AsyncIndexService; -import org.apache.usergrid.corepersistence.results.CollectionGraphQueryExecutor; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.CollectionResultsLoaderFactoryImpl; -import org.apache.usergrid.corepersistence.results.ConnectionGraphQueryExecutor; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ConnectionResultsLoaderFactoryImpl; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl.ElasticSearchQueryExecutor; -import org.apache.usergrid.corepersistence.results.QueryExecutor; +import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; +import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; +import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.ConnectedEntityRef; @@ -114,6 +111,7 @@ public class CpRelationManager implements RelationManager { private ManagerCache managerCache; private EntityCollectionManagerFactory entityCollectionManagerFactory; private GraphManagerFactory graphManagerFactory; + private PipelineBuilderFactory pipelineBuilderFactory; private EntityManager em; @@ -131,8 +129,10 @@ public class CpRelationManager implements RelationManager { private Timer updateCollectionTimer; - public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, final - EntityCollectionManagerFactory entityCollectionManagerFactory, final GraphManagerFactory graphManagerFactory, final AsyncIndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity) { + public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final GraphManagerFactory graphManagerFactory, final AsyncIndexService indexService, + final EntityManager em, final UUID applicationId, final EntityRef headEntity ) { Assert.notNull( em, "Entity manager cannot be null" ); @@ -158,8 +158,8 @@ public class CpRelationManager implements RelationManager { if ( logger.isDebugEnabled() ) { logger.debug( "Loading head entity {}:{} from app {}", new Object[] { - headEntity.getType(), headEntity.getUuid(), applicationScope - } ); + headEntity.getType(), headEntity.getUuid(), applicationScope + } ); } Id entityId = new SimpleId( headEntity.getUuid(), headEntity.getType() ); @@ -171,24 +171,22 @@ public class CpRelationManager implements RelationManager { .format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) ); this.indexService = indexService; - - } @Override public Set<String> getCollectionIndexes( String collectionName ) throws Exception { - GraphManager gm = managerCache.getGraphManager( applicationScope ); + GraphManager gm = managerCache.getGraphManager( applicationScope ); String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ); logger.debug( "getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", new Object[] { - edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() - } ); + edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() + } ); Observable<Set<String>> types = - gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) ).collect( - () -> new HashSet<>(), ( set, type ) -> set.add( type ) ); + gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) ) + .collect( () -> new HashSet<>(), ( set, type ) -> set.add( type ) ); return types.toBlocking().last(); @@ -238,7 +236,7 @@ public class CpRelationManager implements RelationManager { public Observable<Edge> call( final String edgeType ) { return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } } ); @@ -249,22 +247,20 @@ public class CpRelationManager implements RelationManager { return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge ) -> { - if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) { - logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() ); - return; - } + if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) { + logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() ); + return; + } - final EntityRef eref = - new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); + final EntityRef eref = + new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); - String name = getNameFromEdgeType( edge.getType() ); - addMapSet( entityRefSetMap, eref, name ); - } ).toBlocking().last(); + String name = getNameFromEdgeType( edge.getType() ); + addMapSet( entityRefSetMap, eref, name ); + } ).toBlocking().last(); } - - @Override public boolean isConnectionMember( String connectionType, EntityRef entity ) throws Exception { @@ -273,8 +269,8 @@ public class CpRelationManager implements RelationManager { String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ); logger.debug( "isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] { - edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() - } ); + edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() + } ); GraphManager gm = managerCache.getGraphManager( applicationScope ); Observable<Edge> edges = gm.loadEdgeVersions( @@ -294,8 +290,8 @@ public class CpRelationManager implements RelationManager { String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName ); logger.debug( "isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] { - edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() - } ); + edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() + } ); GraphManager gm = managerCache.getGraphManager( applicationScope ); Observable<Edge> edges = gm.loadEdgeVersions( @@ -306,7 +302,6 @@ public class CpRelationManager implements RelationManager { } - @Override public Set<String> getCollections() throws Exception { @@ -408,8 +403,8 @@ public class CpRelationManager implements RelationManager { if ( logger.isDebugEnabled() ) { logger.debug( "Loaded member entity {}:{} from app {}\n " + " data {}", new Object[] { - itemRef.getType(), itemRef.getUuid(), applicationScope, CpEntityMapUtils.toMap( memberEntity ) - } ); + itemRef.getType(), itemRef.getUuid(), applicationScope, CpEntityMapUtils.toMap( memberEntity ) + } ); } @@ -419,14 +414,13 @@ public class CpRelationManager implements RelationManager { gm.writeEdge( edge ).toBlocking().last(); - //perform indexing if ( logger.isDebugEnabled() ) { logger.debug( "Wrote edge {}", edge ); } - indexService.queueEntityIndexUpdate( applicationScope, memberEntity); + indexService.queueEntityIndexUpdate( applicationScope, memberEntity ); if ( logger.isDebugEnabled() ) { @@ -492,7 +486,7 @@ public class CpRelationManager implements RelationManager { addToCollection( collName, itemEntity ); if ( collection != null && collection.getLinkedCollection() != null ) { - Id itemEntityId = new SimpleId( itemEntity.getUuid(),itemEntity.getType() ); + Id itemEntityId = new SimpleId( itemEntity.getUuid(), itemEntity.getType() ); final Edge edge = createCollectionEdge( cpHeadEntity.getId(), collName, itemEntityId ); GraphManager gm = managerCache.getGraphManager( applicationScope ); @@ -528,8 +522,8 @@ public class CpRelationManager implements RelationManager { if ( logger.isDebugEnabled() ) { logger.debug( "Loading entity to remove from collection " + "{}:{} from app {}\n", new Object[] { - itemRef.getType(), itemRef.getUuid(), applicationScope - } ); + itemRef.getType(), itemRef.getUuid(), applicationScope + } ); } Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() ); @@ -636,38 +630,30 @@ public class CpRelationManager implements RelationManager { } - query.setEntityType( collection.getType() ); query = adjustQuery( query ); + final ReadPipelineBuilder readPipelineBuilder = + pipelineBuilderFactory.createReadPipelineBuilder( applicationScope ); - if ( query.isGraphSearch() ) { - QueryExecutor executor = - new CollectionGraphQueryExecutor( entityCollectionManagerFactory, graphManagerFactory, applicationScope, - headEntity, query.getOffsetCursor(), collName, query.getLimit() ); - - return executor.next(); - } + //set our fields applicable to both operations + readPipelineBuilder.withCursor( query.getOffsetCursor() ); + readPipelineBuilder.withLimit( query.getLimit() ); + //TODO, this should be removed when the CP relation manager is removed + readPipelineBuilder.setStartId( cpHeadEntity.getId() ); - - final SearchEdge searchEdge = createCollectionSearchEdge( cpHeadEntity.getId(), collName ); - - final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); - - final SearchTypes types = SearchTypes.fromTypes( collection.getType() ); - - logger.debug( "Searching scope {}", searchEdge ); - - final CollectionResultsLoaderFactoryImpl resultsLoaderFactory = - new CollectionResultsLoaderFactoryImpl( managerCache ); + if ( query.isGraphSearch() ) { + readPipelineBuilder.getCollection( collName ); + } + else { + readPipelineBuilder.getCollectionWithQuery( collName, query.getQl().get() ); + } - //execute the query and return our next result - final QueryExecutor executor = - new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, applicationScope, searchEdge, types, query ); + final Observable<Results> resultsObservable = readPipelineBuilder.build(); - return executor.next(); + return new ObservableQueryExecutor( resultsObservable ).next(); } @@ -689,10 +675,10 @@ public class CpRelationManager implements RelationManager { if ( logger.isDebugEnabled() ) { logger.debug( "createConnection(): " + "Indexing connection type '{}'\n from source {}:{}]\n" - + " to target {}:{}\n app {}", new Object[] { - connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(), - connectedEntityRef.getUuid(), applicationScope - } ); + + " to target {}:{}\n app {}", new Object[] { + connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(), + connectedEntityRef.getUuid(), applicationScope + } ); } Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ); @@ -778,9 +764,9 @@ public class CpRelationManager implements RelationManager { if ( logger.isDebugEnabled() ) { logger.debug( "Deleting connection '{}' from source {}:{} \n to target {}:{}", new Object[] { - connectionType, connectingEntityRef.getType(), connectingEntityRef.getUuid(), - connectedEntityRef.getType(), connectedEntityRef.getUuid() - } ); + connectionType, connectingEntityRef.getType(), connectingEntityRef.getUuid(), + connectedEntityRef.getType(), connectedEntityRef.getUuid() + } ); } Id entityId = new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ); @@ -909,34 +895,36 @@ public class CpRelationManager implements RelationManager { headEntity = em.validate( headEntity ); - final SearchEdge indexScope = createConnectionSearchEdge( cpHeadEntity.getId(), connection ); - final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() ); + query = adjustQuery( query ); - ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); + final String entityType = query.getEntityType(); - logger.debug( "Searching {}", indexScope ); - query = adjustQuery( query ); + final ReadPipelineBuilder readPipelineBuilder = + pipelineBuilderFactory.createReadPipelineBuilder( applicationScope ); + //set our fields applicable to both operations + readPipelineBuilder.withCursor( query.getOffsetCursor() ); + readPipelineBuilder.withLimit( query.getLimit() ); + //TODO, this should be removed when the CP relation manager is removed + readPipelineBuilder.setStartId( cpHeadEntity.getId() ); if ( query.isGraphSearch() ) { - QueryExecutor executor = - new ConnectionGraphQueryExecutor( entityCollectionManagerFactory, graphManagerFactory, applicationScope, - headEntity, query.getOffsetCursor(), connection, query.getLimit() ); - - return executor.next(); + readPipelineBuilder.getConnection( connection ); + } + else if ( entityType != null ) { + readPipelineBuilder.connectionWithQuery( connection, query.getQl().get(), entityType ); + } + else { + readPipelineBuilder.connectionWithQuery( connection, query.getQl().get() ); } - final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory = - new ConnectionResultsLoaderFactoryImpl( managerCache, headEntity, connection ); - final QueryExecutor executor = - new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, applicationScope, indexScope, searchTypes, - query ); + final Observable<Results> resultsObservable = readPipelineBuilder.build(); - return executor.next(); + return new ObservableQueryExecutor( resultsObservable ).next(); } @@ -1007,7 +995,6 @@ public class CpRelationManager implements RelationManager { } - /** side effect: converts headEntity into an Entity if it is an EntityRef! */ private Entity getHeadEntity() throws Exception { Entity entity = null; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java index 525678c..8f9776e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -80,22 +80,19 @@ public interface FilterFactory { /** * Generate a new instance of the command with the specified parameters */ - QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( final String collectionName, final String query, - final Optional<String> cursor, final int limit ); + QueryCollectionElasticSearchCollectorFilter queryCollectionElasticSearchCollector( final String collectionName, final String query); /** * Generate a new instance of the command with the specified parameters */ - QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName,final String query, - final Optional<String> cursor, final int limit ); + QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName,final String query); /** * Generate a new instance of the command with the specified parameters */ - QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName, final String connectionEntityType, final String query, - final Optional<String> cursor, final int limit ); + QueryConnectionElasticSearchCollectorFilter queryConnectionElasticSearchCollector( final String connectionName, final String connectionEntityType, final String query ); /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java index 13aa3eb..d40cb12 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java @@ -20,13 +20,9 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import java.util.UUID; - import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.model.entity.Id; -import com.google.common.base.Optional; - import rx.Observable; @@ -37,6 +33,20 @@ import rx.Observable; */ public interface ReadPipelineBuilder { + + /** + * Set the cursor + * @param cursor + */ + ReadPipelineBuilder withCursor(final String cursor); + + /** + * Set the limit of our page sizes + * @param limit + * @return + */ + ReadPipelineBuilder withLimit(final int limit); + /** * An operation to bridge 2.0-> 1.0. Should be removed when everyone uses the pipeline * @param id @@ -58,8 +68,7 @@ public interface ReadPipelineBuilder { /** * Get all entities with a query */ - ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String query, final int limit, - final Optional<String> cursor ); + ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String query); /** * Get an entity via the connection name and entity Id @@ -79,15 +88,13 @@ public interface ReadPipelineBuilder { /** * Get all entities in a connection with a query */ - ReadPipelineBuilder connectionWithQuery( final String connectionName, final String query, final int limit, - final Optional<String> cursor ); + ReadPipelineBuilder connectionWithQuery( final String connectionName, final String query ); /** * Get all entities in a connection with a query */ - ReadPipelineBuilder connectionWithQuery( final String connectionName, final String entityType, final String query, final int limit, - final Optional<String> cursor ); + ReadPipelineBuilder connectionWithQuery( final String connectionName, final String entityType, final String query); @@ -98,5 +105,5 @@ public interface ReadPipelineBuilder { * Execute final construction of the pipeline and return the results * @return */ - Observable<Results> execute(); + Observable<Results> build(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java index a44f4b8..7319f21 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java @@ -20,11 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import java.util.Collections; -import java.util.UUID; - import org.apache.usergrid.corepersistence.pipeline.DataPipeline; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; @@ -48,17 +44,41 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { private final DataPipeline pipeline; /** - * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct - * results are rendered + * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are + * rendered */ private CollectorFilter<Results> collectorFilter; + private Optional<String> cursor; + private Optional<Integer> limit; + @Inject public ReadPipelineBuilderImpl( final FilterFactory filterFactory, @Assisted final ApplicationScope applicationScope ) { this.filterFactory = filterFactory; this.pipeline = new DataPipeline( applicationScope ); + this.cursor = Optional.absent(); + //set the default limit + this.limit = Optional.absent(); + } + + + @Override + public ReadPipelineBuilder withCursor( final String cursor ) { + this.cursor = Optional.fromNullable( cursor ); + pipeline.setCursor( this.cursor ); + return this; + } + + + @Override + public ReadPipelineBuilder withLimit( final int limit ) { + Preconditions.checkArgument( limit > 0, "You must set the limit > 0" ); + this. limit = Optional.of( limit ); + //set the default value + pipeline.setLimit( this.limit.or( 10 ) ); + return this; } @@ -97,11 +117,10 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override - public ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String query, final int limit, - final Optional<String> cursor ) { + public ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String query ) { //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = filterFactory.queryCollectionElasticSearchCollector( collectionName, query, cursor, limit ); + collectorFilter = filterFactory.queryCollectionElasticSearchCollector( collectionName, query ); return this; } @@ -129,25 +148,23 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder getConnection( final String connectionName, final String entityType ) { pipeline.withTraverseCommand( filterFactory.readGraphConnectionCommand( connectionName, entityType ) ); - setEntityLoaderFilter(); + setEntityLoaderFilter(); return this; } + /** * * @param connectionName * @param query - * @param limit - * @param cursor * @return */ @Override - public ReadPipelineBuilder connectionWithQuery( final String connectionName, final String query, final int limit, - final Optional<String> cursor ) { + public ReadPipelineBuilder connectionWithQuery( final String connectionName, final String query ) { - //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = filterFactory.queryConnectionElasticSearchCollector( connectionName, query, cursor, limit ); + //TODO, this should really be 2 a TraverseFilter with an entityLoad collector + collectorFilter = filterFactory.queryConnectionElasticSearchCollector( connectionName, query ); return this; } @@ -155,26 +172,26 @@ public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { @Override public ReadPipelineBuilder connectionWithQuery( final String connectionName, final String entityType, - final String query, final int limit, - final Optional<String> cursor ) { + final String query ) { - //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - collectorFilter = filterFactory.queryConnectionElasticSearchCollector( connectionName, entityType, query, cursor, limit ); + //TODO, this should really be 2 a TraverseFilter with an entityLoad collector + collectorFilter = + filterFactory.queryConnectionElasticSearchCollector( connectionName, entityType, query); return this; } - - @Override - public Observable<Results> execute() { - Preconditions.checkNotNull(collectorFilter, "You have not specified an operation that creates a collection filter. This is required for loading results"); + public Observable<Results> build() { + Preconditions.checkNotNull( collectorFilter, + "You have not specified an operation that creates a collection filter. This is required for loading " + + "results" ); return pipeline.build( collectorFilter ); } - private void setEntityLoaderFilter(){ + + private void setEntityLoaderFilter() { collectorFilter = filterFactory.entityLoadCollector(); } - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java deleted file mode 100644 index 663c15c..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractGraphQueryExecutor.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.results; - - -import java.util.Collections; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import org.apache.usergrid.corepersistence.pipeline.DataPipeline; -import org.apache.usergrid.corepersistence.pipeline.read.entity.EntityLoadCollectorFilter; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.Results; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.entity.SimpleId; - -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * This class is a nasty hack to bridge 2.0 observables into 1.0 iterators DO NOT use this as a model for moving - * forward, pandas will die. - */ -public abstract class AbstractGraphQueryExecutor implements QueryExecutor { - - - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private final ApplicationScope applicationScope; - private final Id sourceId; - private final int limit; - - private final Optional<String> requestCursor; - - private Iterator<Results> observableIterator; - - - public AbstractGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final ApplicationScope applicationScope, final EntityRef source, - final String cursor, final int limit ) { - - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - this.applicationScope = applicationScope; - this.limit = limit; - this.sourceId = new SimpleId( source.getUuid(), source.getType() ); - - this.requestCursor = Optional.fromNullable( cursor ); - } - - - @Override - public Iterator<Results> iterator() { - return this; - } - - - @Override - public boolean hasNext() { - - //hasn't been set up yet, run through our first setup - if ( observableIterator == null ) { - //assign them to an iterator. this now uses an internal buffer with backpressure, so we won't load all - // results - //set up our command builder - final DataPipeline dataPipeline = new DataPipeline( applicationScope, sourceId, requestCursor, limit ); - - - addTraverseCommand( dataPipeline ); - - //construct our results to be observed later. This is a cold observable - final Observable<Results> resultsObservable = - dataPipeline.build( new EntityLoadCollectorFilter( entityCollectionManagerFactory, applicationScope ) ); - - this.observableIterator = resultsObservable.toBlocking().getIterator(); - - if(!observableIterator.hasNext()){ - //no results, generate an empty one - this.observableIterator = Collections.singleton(new Results()).iterator(); - } - } - - - //see if our current results are not null - return observableIterator.hasNext(); - } - - - @Override - public Results next() { - if ( !hasNext() ) { - throw new NoSuchElementException( "No more results present" ); - } - - final Results results = observableIterator.next(); - - //ugly and tight coupling, but we don't have a choice until we finish some refactoring - results.setQueryExecutor( this ); - - return results; - } - - - @Override - public void remove() { - throw new RuntimeException( "Remove not implemented!!" ); - } - - - /** - * Add the traverse command to the graph - */ - protected abstract void addTraverseCommand( final DataPipeline dataPipeline ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java deleted file mode 100644 index b506c71..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionGraphQueryExecutor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.results; - - -import org.apache.usergrid.corepersistence.pipeline.DataPipeline; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphCollectionFilter; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; - -import com.google.common.base.Preconditions; - - -/** - * Create The collection graph query executor - */ -public class CollectionGraphQueryExecutor extends AbstractGraphQueryExecutor { - - private final GraphManagerFactory graphManagerFactory; - private final String collectionName; - - - public CollectionGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final GraphManagerFactory graphManagerFactory, - final ApplicationScope applicationScope, final EntityRef source, - final String cursor, final String collectionName, final int limit ) { - - super( entityCollectionManagerFactory, applicationScope, source, cursor, limit ); - this.graphManagerFactory = graphManagerFactory; - - Preconditions.checkNotNull( collectionName, "collectionName is required on the query" ); - this.collectionName = collectionName; - } - - - @Override - protected void addTraverseCommand( final DataPipeline dataPipeline ) { - //set the traverse command from the source Id to the connect name - dataPipeline.withTraverseCommand( new ReadGraphCollectionFilter( graphManagerFactory, collectionName ) ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9dc65dc8/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java deleted file mode 100644 index 3dc5cd6..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionGraphQueryExecutor.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.results; - - -import org.apache.usergrid.corepersistence.pipeline.DataPipeline; -import org.apache.usergrid.corepersistence.pipeline.read.graph.ReadGraphConnectionFilter; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.GraphManagerFactory; - -import com.google.common.base.Preconditions; - - -public class ConnectionGraphQueryExecutor extends AbstractGraphQueryExecutor { - - private final GraphManagerFactory graphManagerFactory; - private final String connectionName; - - - public ConnectionGraphQueryExecutor( final EntityCollectionManagerFactory entityCollectionManagerFactory, - final GraphManagerFactory graphManagerFactory, - final ApplicationScope applicationScope, final EntityRef source, - final String cursor, final String connectionType, final int limit) { - - super( entityCollectionManagerFactory, applicationScope, source, cursor, limit ); - this.graphManagerFactory = graphManagerFactory; - - Preconditions.checkNotNull(connectionType, "connectionType is required on the query" ); - this.connectionName = connectionType; - } - - - - @Override - protected void addTraverseCommand( final DataPipeline dataPipeline ) { - //set the traverse command from the source Id to the connect name - dataPipeline.withTraverseCommand( new ReadGraphConnectionFilter( graphManagerFactory, connectionName ) ); - } -}