Refactor of pipeline to support type mapping for clarity
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3a1784f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3a1784f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3a1784f0 Branch: refs/heads/USERGRID-669 Commit: 3a1784f0455acae20c7dfbde61e9493d572ad549 Parents: 5b1dfa1 Author: Todd Nine <tn...@apigee.com> Authored: Tue May 19 17:05:04 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue May 19 18:07:47 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 32 +- .../corepersistence/CpEntityManagerFactory.java | 13 +- .../corepersistence/CpRelationManager.java | 158 +++++----- .../corepersistence/pipeline/Pipeline.java | 121 -------- .../pipeline/PipelineBuilderFactory.java | 39 --- .../pipeline/PipelineModule.java | 9 - .../pipeline/PipelineOperations.java | 30 ++ .../corepersistence/pipeline/read/Filter.java | 9 +- .../pipeline/read/FilterFactory.java | 5 +- .../pipeline/read/FilterPipeline.java | 132 +++++++++ .../pipeline/read/ReadPipelineBuilder.java | 104 ------- .../pipeline/read/ReadPipelineBuilderImpl.java | 296 ------------------- .../pipeline/read/collect/EntityFilter.java | 68 ----- .../read/collect/EntityResumeFilter.java | 68 +++++ .../read/elasticsearch/CandidateIdFilter.java | 46 ++- 15 files changed, 380 insertions(+), 750 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 63018cb..7a56631 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -17,14 +17,29 @@ package org.apache.usergrid.corepersistence; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; +import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; +import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.AggregateCounter; @@ -165,7 +180,8 @@ public class CpEntityManager implements EntityManager { private final AsyncEventService indexService; - private PipelineBuilderFactory pipelineBuilderFactory; + private final FilterFactory filterFactory; + private final CollectorFactory collectorFactory; private boolean skipAggregateCounters; private MetricsFactory metricsFactory; @@ -207,7 +223,7 @@ public class CpEntityManager implements EntityManager { */ public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig, - final PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) { + final FilterFactory filterFactory, final CollectorFactory collectorFactory, final UUID applicationId ) { this.entityManagerFig = entityManagerFig; @@ -216,8 +232,10 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull( managerCache, "managerCache must not be null" ); Preconditions.checkNotNull( applicationId, "applicationId must not be null" ); Preconditions.checkNotNull( indexService, "indexService must not be null" ); - Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" ); - this.pipelineBuilderFactory = pipelineBuilderFactory; + Preconditions.checkNotNull( filterFactory, "filterFactory must not be null" ); + Preconditions.checkNotNull( collectorFactory, "collectorFactory must not be null" ); + this.filterFactory = filterFactory; + this.collectorFactory = collectorFactory; this.managerCache = managerCache; @@ -732,7 +750,7 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull( entityRef, "entityRef cannot be null" ); CpRelationManager relationManager = - new CpRelationManager( metricsFactory, managerCache, pipelineBuilderFactory, indexService, this, entityManagerFig, applicationId, entityRef ); + new CpRelationManager( metricsFactory, managerCache, filterFactory, collectorFactory, indexService, this, entityManagerFig, applicationId, entityRef ); return relationManager; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 63e2869..5055538 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -35,7 +35,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.ReIndexService; -import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; +import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; +import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; import org.apache.usergrid.persistence.AbstractEntity; @@ -125,7 +126,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final EntityIndex entityIndex; private final MetricsFactory metricsFactory; private final AsyncEventService indexService; - private final PipelineBuilderFactory pipelineBuilderFactory; + private final FilterFactory filterFactory; + private final CollectorFactory collectorFactory; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector ) { @@ -139,7 +141,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); this.indexService = injector.getInstance( AsyncEventService.class ); - this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class ); + this.filterFactory = injector.getInstance( FilterFactory.class ); + this.collectorFactory = injector.getInstance( CollectorFactory.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); @@ -198,7 +201,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { - EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, pipelineBuilderFactory, applicationId ); + EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, + + filterFactory, collectorFactory, applicationId ); return em; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 4993d88..6201fe8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -17,16 +17,23 @@ package org.apache.usergrid.corepersistence; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; -import org.apache.usergrid.persistence.graph.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.pipeline.PipelineBuilderFactory; -import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; +import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; +import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; +import org.apache.usergrid.corepersistence.pipeline.read.FilterPipeline; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; @@ -49,6 +56,10 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.entities.Group; import org.apache.usergrid.persistence.entities.User; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.SearchByEdge; +import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; @@ -63,7 +74,6 @@ import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.schema.CollectionInfo; import org.apache.usergrid.utils.MapUtils; -import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -97,7 +107,6 @@ public class CpRelationManager implements RelationManager { private final EntityManagerFig entityManagerFig; private ManagerCache managerCache; - private final PipelineBuilderFactory pipelineBuilderFactory; private EntityManager em; @@ -111,13 +120,16 @@ public class CpRelationManager implements RelationManager { private final AsyncEventService indexService; - private MetricsFactory metricsFactory; - private Timer updateCollectionTimer; + + private final FilterFactory filterFactory; + private final CollectorFactory collectorFactory; + public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, - final PipelineBuilderFactory pipelineBuilderFactory, final AsyncEventService indexService, - final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId, final EntityRef headEntity) { + final FilterFactory filterFactory, final CollectorFactory collectorFactory, final AsyncEventService indexService, + final EntityManager em, final EntityManagerFig entityManagerFig, final UUID applicationId, + final EntityRef headEntity ) { Assert.notNull( em, "Entity manager cannot be null" ); @@ -134,11 +146,9 @@ public class CpRelationManager implements RelationManager { this.headEntity = headEntity; this.managerCache = managerCache; this.applicationScope = CpNamingUtils.getApplicationScope( applicationId ); - this.pipelineBuilderFactory = pipelineBuilderFactory; - this.metricsFactory = metricsFactory; - this.updateCollectionTimer = - metricsFactory.getTimer( CpRelationManager.class, "relation.manager.es.update.collection" ); + this.filterFactory = filterFactory; + this.collectorFactory = collectorFactory; if ( logger.isDebugEnabled() ) { logger.debug( "Loading head entity {}:{} from app {}", new Object[] { @@ -162,7 +172,7 @@ public class CpRelationManager implements RelationManager { public Set<String> getCollectionIndexes( String collectionName ) throws Exception { GraphManager gm = managerCache.getGraphManager( applicationScope ); - String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName); + String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ); logger.debug( "getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", new Object[] { edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid() @@ -198,7 +208,7 @@ public class CpRelationManager implements RelationManager { private Map<EntityRef, Set<String>> getContainers() { - return getContainers(-1, null, null); + return getContainers( -1, null, null ); } @@ -215,14 +225,14 @@ public class CpRelationManager implements RelationManager { Observable<Edge> edges = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) ) - .flatMap(new Func1<String, Observable<Edge>>() { + .flatMap( new Func1<String, Observable<Edge>>() { @Override - public Observable<Edge> call(final String edgeType) { + public Observable<Edge> call( final String edgeType ) { return gm.loadEdgesToTarget( - new SimpleSearchByEdgeType(cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent())); + new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); } - }); + } ); //if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory if ( limit > -1 ) { @@ -250,7 +260,7 @@ public class CpRelationManager implements RelationManager { Id entityId = new SimpleId( entity.getUuid(), entity.getType() ); - String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType(connectionType); + String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ); logger.debug( "isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] { edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() @@ -271,13 +281,13 @@ public class CpRelationManager implements RelationManager { Id entityId = new SimpleId( entity.getUuid(), entity.getType() ); - String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName); + String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ); logger.debug( "isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] { edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid() } ); - GraphManager gm = managerCache.getGraphManager(applicationScope); + GraphManager gm = managerCache.getGraphManager( applicationScope ); Observable<Edge> edges = gm.loadEdgeVersions( new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ); @@ -343,7 +353,8 @@ public class CpRelationManager implements RelationManager { return null; } - return addToCollection( collectionName, itemRef, ( collection != null && collection.getLinkedCollection() != null ) ); + return addToCollection( collectionName, itemRef, + ( collection != null && collection.getLinkedCollection() != null ) ); } @@ -403,7 +414,7 @@ public class CpRelationManager implements RelationManager { logger.debug( "Wrote edge {}", edge ); } - indexService.queueNewEdge(applicationScope, memberEntity, edge); + indexService.queueNewEdge( applicationScope, memberEntity, edge ); if ( logger.isDebugEnabled() ) { @@ -518,7 +529,8 @@ public class CpRelationManager implements RelationManager { //run our delete - final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() ); + final Edge collectionToItemEdge = + createCollectionEdge( cpHeadEntity.getId(), collectionName, memberEntity.getId() ); gm.markEdge( collectionToItemEdge ).toBlocking().last(); @@ -575,7 +587,7 @@ public class CpRelationManager implements RelationManager { results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false ); } else { - results = em.getTargetEntities(headEntity, srcRelationName, null, Level.REFS); + results = em.getTargetEntities( headEntity, srcRelationName, null, Level.REFS ); } if ( ( results != null ) && ( results.size() > 0 ) ) { @@ -617,50 +629,54 @@ public class CpRelationManager implements RelationManager { query = adjustQuery( query ); + final FilterPipeline<Id> filterPipeline = new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() ).withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) ); - final ReadPipelineBuilder readPipelineBuilder = - pipelineBuilderFactory.createReadPipelineBuilder(applicationScope); - //set our fields applicable to both operations - readPipelineBuilder.withCursor(query.getCursor()); - readPipelineBuilder.withLimit( Optional.of(query.getLimit())); - - //TODO, this should be removed when the CP relation manager is removed - readPipelineBuilder.setStartId( cpHeadEntity.getId() ); + final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline; if ( query.isGraphSearch() ) { - readPipelineBuilder.getCollection( collectionName ); + entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphCollectionFilter( collectionName ) ) + .withFilter( filterFactory.entityLoadFilter() ); } else { final String entityType = collection.getType(); - readPipelineBuilder.getCollectionWithQuery( collectionName, entityType, query.getQl().get() ); + entityFilterPipeline = filterPipeline.withFilter( + filterFactory.elasticSearchCollectionFilter( query.getQl().get(), collectionName, entityType ) ) + .withFilter( filterFactory.candidateEntityFilter() ); } - final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute(); + final Observable<ResultsPage> resultsObservable = + entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() ) + .withCollector( collectorFactory.getResultsPageCollector() ).execute(); return new ObservableQueryExecutor( resultsObservable ).next(); } + @Override - public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults ) throws Exception { + public Results searchCollectionConsistent( String collectionName, Query query, int expectedResults ) + throws Exception { Results results; long maxLength = entityManagerFig.pollForRecordsTimeout(); long sleepTime = entityManagerFig.sleep(); boolean found; long current = System.currentTimeMillis(), length = 0; do { - results = searchCollection(collectionName, query); + results = searchCollection( collectionName, query ); length = System.currentTimeMillis() - current; found = expectedResults == results.size(); - if(found){ + if ( found ) { break; } - Thread.sleep(sleepTime); - }while (!found && length <= maxLength); - if(logger.isInfoEnabled()){ - logger.info(String.format("Consistent Search finished in %s, results=%s, expected=%s...dumping stack",length, results.size(),expectedResults)); + Thread.sleep( sleepTime ); + } + while ( !found && length <= maxLength ); + if ( logger.isInfoEnabled() ) { + logger.info( String + .format( "Consistent Search finished in %s, results=%s, expected=%s...dumping stack", length, + results.size(), expectedResults ) ); Thread.dumpStack(); } return results; @@ -836,7 +852,7 @@ public class CpRelationManager implements RelationManager { @Override - public Results getTargetEntities(String connectionType, String connectedEntityType, Level level) + public Results getTargetEntities( String connectionType, String connectedEntityType, Level level ) throws Exception { //until this is refactored properly, we will delegate to a search by query @@ -849,20 +865,19 @@ public class CpRelationManager implements RelationManager { query.setEntityType( connectedEntityType ); query.setResultsLevel( level ); - return searchTargetEntities(query); + return searchTargetEntities( query ); } @Override - public Results getSourceEntities(String connType, String fromEntityType, Level resultsLevel) - throws Exception { + public Results getSourceEntities( String connType, String fromEntityType, Level resultsLevel ) throws Exception { - return getSourceEntities(connType, fromEntityType, resultsLevel, -1); + return getSourceEntities( connType, fromEntityType, resultsLevel, -1 ); } @Override - public Results getSourceEntities(String connType, String fromEntityType, Level level, int count) + public Results getSourceEntities( String connType, String fromEntityType, Level level, int count ) throws Exception { // looking for edges to the head entity @@ -895,7 +910,7 @@ public class CpRelationManager implements RelationManager { @Override - public Results searchTargetEntities(Query query) throws Exception { + public Results searchTargetEntities( Query query ) throws Exception { Preconditions.checkNotNull( query, "query cannot be null" ); @@ -909,37 +924,40 @@ public class CpRelationManager implements RelationManager { query = adjustQuery( query ); final String entityType = query.getEntityType(); - //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector -> 1.0 results + //set startid -- graph | es query filter -- load entities filter (verifies exists) --> results page collector + // -> 1.0 results // startid -- graph edge load -- entity load (verify) from ids -> results page collector // startid -- eq query candiddate -- entity load (verify) from canddiates -> results page collector //startid -- graph edge load -- entity id verify --> filter to connection ref --> connection ref collector - //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref collector + //startid -- eq query candiddate -- candidate id verify --> filter to connection ref --> connection ref + // collector + - final ReadPipelineBuilder readPipelineBuilder = - pipelineBuilderFactory.createReadPipelineBuilder(applicationScope); - //readPipelineBuilder.startId().load().collect() + final FilterPipeline<Id> filterPipeline = + new FilterPipeline( applicationScope, query.getCursor(), query.getLimit() ) + .withFilter( filterFactory.getEntityIdFilter( cpHeadEntity.getId() ) ); - //set our fields applicable to both operations - readPipelineBuilder - .withCursor(query.getCursor()) - .withLimit(Optional.of(query.getLimit())) - //TODO, this should be removed when the CP relation manager is removed - .setStartId( cpHeadEntity.getId() ); + + final FilterPipeline<org.apache.usergrid.persistence.model.entity.Entity> entityFilterPipeline; if ( query.isGraphSearch() ) { - // if(query.getResultsLevel() == Level.ALL_PROPERTIES) - readPipelineBuilder.getConnection( connection ); - //else + entityFilterPipeline = filterPipeline.withFilter( filterFactory.readGraphConnectionFilter( connection ) ) + .withFilter( filterFactory.entityLoadFilter() ); } + else { - readPipelineBuilder.getConnectionWithQuery( connection, Optional.fromNullable( entityType ), - query.getQl().get() ); + + entityFilterPipeline = filterPipeline.withFilter( filterFactory + .elasticSearchConnectionFilter( query.getQl().get(), connection, Optional.fromNullable( entityType ) ) ) + .withFilter( filterFactory.candidateEntityFilter() ); } - final Observable<ResultsPage> resultsObservable = readPipelineBuilder.execute(); + final Observable<ResultsPage> resultsObservable = + entityFilterPipeline.withFilter( filterFactory.entityResumeFilter() ) + .withCollector( collectorFactory.getResultsPageCollector() ).execute(); return new ObservableQueryExecutor( resultsObservable ).next(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java deleted file mode 100644 index 26cf346..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline; - - -import java.util.List; - -import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; -import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor; -import org.apache.usergrid.corepersistence.pipeline.read.Collector; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; - -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * A pipeline that will allow us to build a traversal command for execution - * - * See http://martinfowler.com/articles/collection-pipeline/ for some examples - * - * TODO: Re work the cursor and limit phases. They need to be lazily evaluated, not added on build time - */ -public class Pipeline<R> { - - - private final ApplicationScope applicationScope; - private final List<PipelineOperation> idPipelineOperationList; - private final Collector<?, R> collector; - private final RequestCursor requestCursor; - - private final int limit; - - - private int idCount = 0; - - - /** - * Our first pass, where we implement our start point as an Id until we can use this to perform our entire - * traversal. Eventually as we untangle the existing Query service nightmare, the sourceId will be remove and - * should only be traversed from the root application - */ - public Pipeline( final ApplicationScope applicationScope, final List<PipelineOperation> pipelineOperations, - final Collector<?, R> collector, final Optional<String> cursor, final int limit ) { - - this.applicationScope = applicationScope; - this.idPipelineOperationList = pipelineOperations; - this.collector = collector; - this.limit = limit; - - this.requestCursor = new RequestCursor( cursor ); - } - - - /** - * Execute the pipline construction, returning an observable of results - * @return - */ - public Observable<R> execute(){ - - - Observable traverseObservable = Observable.just( new FilterResult<>( applicationScope.getApplication(), Optional.absent() )); - - //build our traversal commands - for ( PipelineOperation pipelineOperation : idPipelineOperationList ) { - setState( pipelineOperation ); - - //TODO, see if we can wrap this observable in our ObservableTimer so we can see how long each filter takes - - - traverseObservable = traverseObservable.compose( pipelineOperation ); - } - - - setState( collector ); - - final Observable<R> response = traverseObservable.compose( collector ); - - - //append the optional cursor into the response for the caller to use - return response; - } - - - - - /** - * Set the id of the state - */ - private void setState( final PipelineOperation pipelineOperation ) { - - - final PipelineContext context = new PipelineContext( applicationScope, requestCursor, - limit, idCount ); - - pipelineOperation.setContext( context ); - - //done for clarity - idCount++; - - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java deleted file mode 100644 index 9916bc1..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineBuilderFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline; - - -import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; - - -/** - * Factory for creating pipeline builders - */ -public interface PipelineBuilderFactory { - - - /** - * Create a read pipeline builder - * @param applicationScope - * @return - */ - ReadPipelineBuilder createReadPipelineBuilder( final ApplicationScope applicationScope ); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java index 3018718..ef696bd 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineModule.java @@ -22,9 +22,6 @@ package org.apache.usergrid.corepersistence.pipeline; import org.apache.usergrid.corepersistence.pipeline.read.CollectorFactory; import org.apache.usergrid.corepersistence.pipeline.read.FilterFactory; -import org.apache.usergrid.corepersistence.pipeline.read.ReadFilterFactoryImpl; -import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilder; -import org.apache.usergrid.corepersistence.pipeline.read.ReadPipelineBuilderImpl; import com.google.inject.AbstractModule; import com.google.inject.assistedinject.FactoryModuleBuilder; @@ -42,12 +39,6 @@ public class PipelineModule extends AbstractModule { // bind( FilterFactory.class ).to( ReadFilterFactoryImpl.class ); - //Use Guice to create the builder since we don't really need to do anything - //other than DI when creating the filters - install( new FactoryModuleBuilder().implement( ReadPipelineBuilder.class, ReadPipelineBuilderImpl.class ) - .build( PipelineBuilderFactory.class ) ); - - // install( new Factory) //Use Guice to create the builder since we don't really need to do anything http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java new file mode 100644 index 0000000..3929a97 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineOperations.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline; + + +public interface PipelineOperations { + + /** + * Add the pipeline operation to the set of operations + * @param po + */ + void add( PipelineOperation po ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java index 054a85a..ee01602 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/Filter.java @@ -28,4 +28,11 @@ import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; * an observable of FilterResults. Filters should never emit groups or objects that represent collections. Items should * always be emitted 1 at a time. It is the responsibility of the collector to aggregate results. */ -public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> {} +public interface Filter<T, R> extends PipelineOperation<T, FilterResult<R>> { + + /** + * Get the builder for the next phase + * @return + */ +// B getNextBuilder(); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java index a2f1605..d297c2a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterFactory.java @@ -20,8 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read; -import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityFilter; -import org.apache.usergrid.corepersistence.pipeline.read.collect.IdCursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.collect.EntityResumeFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateIdFilter; import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.ElasticSearchCollectionFilter; @@ -139,5 +138,5 @@ public interface FilterFactory { * Create a new instance of our entity filter * @return */ - EntityFilter entityFilter(); + EntityResumeFilter entityResumeFilter(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java new file mode 100644 index 0000000..f8bbdd8 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterPipeline.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read; + + +import org.apache.usergrid.corepersistence.pipeline.PipelineContext; +import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; +import org.apache.usergrid.corepersistence.pipeline.cursor.RequestCursor; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.util.ValidationUtils; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import rx.Observable; + + +/** + * Pipeline for applying our UG domain specific filters. + * + * Modeled after an observable, with typing to allow input of specific filters + * + * @param InputType the input type in the current pipeline state + */ +public class FilterPipeline<InputType> { + + + private int idCount = 0; + + private final ApplicationScope applicationScope; + + + private final RequestCursor requestCursor; + private int limit; + + //Generics hell, intentionally without a generic, we check at the filter level + private Observable currentObservable; + + + /** + * Create our filter pipeline + */ + public FilterPipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) { + + + ValidationUtils.validateApplicationScope( applicationScope ); + Preconditions.checkNotNull( cursor, "cursor optional is required" ); + Preconditions.checkArgument( limit > 0, "limit must be > 0" ); + + + this.applicationScope = applicationScope; + + //init our cursor to empty + this.requestCursor = new RequestCursor( cursor ); + + //set the default limit + this.limit = limit; + + //set our observable to start at the application + final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() ); + this.currentObservable = Observable.just( filter ); + } + + + public <OutputType> FilterPipeline<OutputType> withFilter( + final Filter<? super InputType, ? extends OutputType> filter ) { + + + setUp( filter ); + + return ( FilterPipeline<OutputType> ) this; + } + + + public <OutputType> FilterPipeline<OutputType> withCollector( + final Collector<? super InputType, ? extends OutputType> collector ) { + + + setUp( collector ); + + return ( FilterPipeline<OutputType> ) this; + } + + + private <OutputType> void setUp( + final PipelineOperation<? super InputType, ? extends OutputType> pipelineOperation ) { + setState( pipelineOperation ); + + currentObservable = currentObservable.compose( pipelineOperation ); + } + + + /** + * Return the observable of the filter pipeline + */ + public Observable<InputType> execute() { + return currentObservable; + } + + + /** + * Set the id of the state + */ + private void setState( final PipelineOperation pipelineOperation ) { + + + final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount ); + + pipelineOperation.setContext( context ); + + //done for clarity + idCount++; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java deleted file mode 100644 index d0e87b3..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilder.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * An instance of a pipeline builder for building commands on our read pipline - * - * Each invocation of the method will assemble the underlying pipe and updating it's state - * - * Results are added by invoking execute. - */ -public interface ReadPipelineBuilder { - - - /** - * Set the cursor - * @param cursor - */ - ReadPipelineBuilder withCursor(final Optional<String> cursor); - - /** - * Set the limit of our page sizes - * @param limit - * @return - */ - ReadPipelineBuilder withLimit(final Optional<Integer> limit); - - /** - * An operation to bridge 2.0-> 1.0. Should be removed when everyone uses the pipeline - * @param id - * @return - */ - ReadPipelineBuilder setStartId(final Id id); - - - /** - * Add a get entity to the pipeline - */ - ReadPipelineBuilder getEntityViaCollection( final String collectionName, final Id entityId ); - - - /** - * Add get Collection from our previous source - */ - ReadPipelineBuilder getCollection( final String collectionName ); - - /** - * Get all entities with a query - */ - ReadPipelineBuilder getCollectionWithQuery( final String collectionName,final String entityType, final String query); - - /** - * Get an entity via the connection name and entity Id - */ - ReadPipelineBuilder getEntityViaConnection( final String connectionName, final Id entityId ); - - /** - * Get all entities in a connection by the connection name - */ - ReadPipelineBuilder getConnection( final String connectionName ); - - /** - * Get all entities in a connection of the specified connection type - */ - ReadPipelineBuilder getConnection( final String connectionName, final String entityType ); - - /** - * Get all entities in a connection with a query and a target entity type - */ - ReadPipelineBuilder getConnectionWithQuery( final String connectionName, final Optional<String> entityType, - final String query ); - - - /** - * Load our entity results when our previous filter calls graph - * @return - */ - Observable<ResultsPage> execute(); -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java deleted file mode 100644 index 28446ad..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ReadPipelineBuilderImpl.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read; - - -import java.util.ArrayList; -import java.util.List; - -import org.apache.usergrid.corepersistence.pipeline.Pipeline; -import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.CandidateEntityFilter; -import org.apache.usergrid.corepersistence.pipeline.read.graph.EntityLoadFilter; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.util.ValidationUtils; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import rx.Observable; - - -/** - * An implementation of our builder for piplines - */ -public class ReadPipelineBuilderImpl implements ReadPipelineBuilder { - - private static final int DEFAULT_LIMIT = 10; - - private final FilterFactory filterFactory; - - private final CollectorState collectorState; - - private final ApplicationScope applicationScope; - - private final CollectorFactory collectorFactory; - - - /** - * Our pointer to our collect filter. Set or cleared with each operation that's performed so the correct results are - * rendered - */ - private List<Filter> filters; - - - private Optional<String> cursor; - private int limit; - - - @Inject - public ReadPipelineBuilderImpl( final FilterFactory filterFactory, final CollectorFactory collectorFactory, - @Assisted final ApplicationScope applicationScope ) { - this.filterFactory = filterFactory; - - this.applicationScope = applicationScope; - this.collectorFactory = collectorFactory; - - //init our cursor to empty - this.cursor = Optional.absent(); - - //set the default limit - this.limit = DEFAULT_LIMIT; - - - this.collectorState = new CollectorState( ); - - this.filters = new ArrayList<>(); - } - - - @Override - public ReadPipelineBuilder withCursor( final Optional<String> cursor ) { - Preconditions.checkNotNull( cursor, "cursor must not be null" ); - this.cursor = cursor; - return this; - } - - - @Override - public ReadPipelineBuilder withLimit( final Optional<Integer> limit ) { - Preconditions.checkNotNull( limit, "limit must not be null" ); - this.limit = limit.or( DEFAULT_LIMIT ); - return this; - } - - - @Override - public ReadPipelineBuilder setStartId( final Id id ) { - ValidationUtils.verifyIdentity( id ); - - filters.add( filterFactory.getEntityIdFilter( id ) ); - - this.collectorState.clear(); - - - return this; - } - - - @Override - public ReadPipelineBuilder getEntityViaCollection( final String collectionName, final Id entityId ) { - Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); - ValidationUtils.verifyIdentity( entityId ); - - filters.add( filterFactory.readGraphCollectionByIdFilter( collectionName, entityId ) ); - - this.collectorState.setIdEntityLoaderFilter(); - - return this; - } - - - @Override - public ReadPipelineBuilder getCollection( final String collectionName ) { - Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); - - filters.add( filterFactory.readGraphCollectionFilter( collectionName ) ); - - this.collectorState.setIdEntityLoaderFilter(); - - return this; - } - - - @Override - public ReadPipelineBuilder getCollectionWithQuery( final String collectionName, final String entityType, final String query ) { - Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); - Preconditions.checkNotNull( query, "query must not be null" ); - - //TODO, this should really be 2 a TraverseFilter with an entityLoad collector - - filters.add( filterFactory.elasticSearchCollectionFilter( query, collectionName, entityType ) ); - - this.collectorState.setCandidateEntityFilter(); - - return this; - } - - - @Override - public ReadPipelineBuilder getEntityViaConnection( final String connectionName, final Id entityId ) { - Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); - ValidationUtils.verifyIdentity( entityId ); - - filters.add( filterFactory.readGraphConnectionByIdFilter( connectionName, entityId ) ); - collectorState.setIdEntityLoaderFilter(); - - return this; - } - - - @Override - public ReadPipelineBuilder getConnection( final String connectionName ) { - Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); - filters.add( filterFactory.readGraphConnectionFilter( connectionName ) ); - collectorState.setIdEntityLoaderFilter(); - - return this; - } - - - @Override - public ReadPipelineBuilder getConnection( final String connectionName, final String entityType ) { - Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); - Preconditions.checkNotNull( connectionName, "entityType must not be null" ); - - filters.add( filterFactory.readGraphConnectionByTypeFilter( connectionName, entityType ) ); - - collectorState.setIdEntityLoaderFilter(); - return this; - } - - - @Override - public ReadPipelineBuilder getConnectionWithQuery( final String connectionName, final Optional<String> entityType, - final String query ) { - - Preconditions.checkNotNull( connectionName, "connectionName must not be null" ); - Preconditions.checkNotNull( connectionName, "entityType must not be null" ); - Preconditions.checkNotNull( query, "query must not be null" ); - - filters.add( filterFactory.elasticSearchConnectionFilter( query, connectionName, entityType ) ); - collectorState.setCandidateEntityFilter(); - return this; - } - - - @Override - public Observable<ResultsPage> execute() { - - ValidationUtils.validateApplicationScope( applicationScope ); - - - //add our last filter that will generate entities - final Filter<?, Entity> entityLoadFilter = collectorState.getFinalFilter(); - - filters.add( entityLoadFilter ); - - //add the filter that skips the first result on resume - final Filter<Entity, Entity> cursorEntityFilter = filterFactory.entityFilter(); - - filters.add( cursorEntityFilter ); - - - //execute our collector - final Collector<?, ResultsPage> collector = collectorFactory.getResultsPageCollector(); - - Preconditions.checkNotNull( collector, - "You have not specified an operation that creates a collection filter. This is required for loading " - + "results" ); - - - Preconditions.checkNotNull( cursor, "A cursor should be initialized even if absent" ); - - Preconditions.checkArgument( limit > 0, "limit must be > than 0" ); - - - Pipeline pipeline = new Pipeline( applicationScope, filters, collector, cursor, limit ); - - - return pipeline.execute(); - } - - - /** - * A mutable state for our collectors. Rather than create a new instance each time, we create a singleton - * collector - */ - private final class CollectorState { - - - private EntityLoadFilter entityLoadCollector; - - private CandidateEntityFilter candidateEntityFilter; - - private Filter entityLoadFilter; - - - - private CollectorState( ){} - - - /** - * Set our final filter to be a load entity by Id filter - */ - public void setIdEntityLoaderFilter() { - if ( entityLoadCollector == null ) { - entityLoadCollector = filterFactory.entityLoadFilter(); - } - - - entityLoadFilter = entityLoadCollector; - } - - - /** - * Set our final filter to be a load entity by candidate filter - */ - public void setCandidateEntityFilter() { - if ( candidateEntityFilter == null ) { - candidateEntityFilter = filterFactory.candidateEntityFilter(); - } - - entityLoadFilter = candidateEntityFilter; - } - - - public void clear() { - entityLoadFilter = null; - } - - - public Filter<?, Entity> getFinalFilter() { - return entityLoadFilter; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java deleted file mode 100644 index daf2e7f..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityFilter.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.pipeline.read.collect; - - -import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; -import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; -import org.apache.usergrid.corepersistence.pipeline.read.Filter; -import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if - * it matches the Id that was set - */ -public class EntityFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> { - - - @Override - public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) { - - //filter only the first id, then map into our path for our next pass - - - return filterResultObservable.skipWhile( filterResult -> { - - final Optional<Id> startFromCursor = getSeekValue(); - - return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() ); - } ).map( filterResult -> { - - - final Entity entity = filterResult.getValue(); - final Id entityId = entity.getId(); - - return createFilterResult( entity, entityId, filterResult.getPath() ); - } ); - } - - - @Override - protected CursorSerializer<Id> getCursorSerializer() { - return IdCursorSerializer.INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java new file mode 100644 index 0000000..2917b61 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/EntityResumeFilter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.pipeline.read.collect; + + +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; +import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; +import org.apache.usergrid.corepersistence.pipeline.read.Filter; +import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.common.base.Optional; + +import rx.Observable; + + +/** + * A filter that is used when we can potentially serialize pages via cursor. This will filter the first result, only if + * it matches the Id that was set + */ +public class EntityResumeFilter extends AbstractPathFilter<Entity, Entity, Id> implements Filter<Entity, Entity> { + + + @Override + public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Entity>> filterResultObservable ) { + + //filter only the first id, then map into our path for our next pass + + + return filterResultObservable.skipWhile( filterResult -> { + + final Optional<Id> startFromCursor = getSeekValue(); + + return startFromCursor.isPresent() && startFromCursor.get().equals( filterResult.getValue().getId() ); + } ).map( filterResult -> { + + + final Entity entity = filterResult.getValue(); + final Id entityId = entity.getId(); + + return createFilterResult( entity, entityId, filterResult.getPath() ); + } ); + } + + + @Override + protected CursorSerializer<Id> getCursorSerializer() { + return IdCursorSerializer.INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3a1784f0/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java index 56e1c1c..0e87141 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter; -import org.apache.usergrid.corepersistence.pipeline.PipelineOperation; import org.apache.usergrid.corepersistence.pipeline.read.Filter; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; import org.apache.usergrid.persistence.collection.EntityCollectionManager; @@ -50,11 +49,10 @@ import rx.Observable; /** - * Responsible for verifying candidate result versions, then emitting the Ids of these versions - * Input is a batch of candidate results, output is a stream of validated Ids + * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of + * candidate results, output is a stream of validated Ids */ -public class CandidateIdFilter extends AbstractFilter<Candidate, Id> - implements Filter<Candidate, Id> { +public class CandidateIdFilter extends AbstractFilter<Candidate, Id> implements Filter<Candidate, Id> { private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final EntityIndexFactory entityIndexFactory; @@ -68,9 +66,8 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id> } - @Override - public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) { + public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) { /** @@ -87,32 +84,28 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id> final ApplicationEntityIndex applicationIndex = entityIndexFactory.createApplicationEntityIndex( applicationScope ); - final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( - candidateResults -> { - //flatten toa list of ids to load - final Observable<List<Id>> candidateIds = - Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() ) - .toList(); + final Observable<FilterResult<Id>> searchIdSetObservable = + filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> { + //flatten toa list of ids to load + final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map( + candidate -> candidate.getValue().getCandidateResult().getId() ).toList(); - //load the ids - final Observable<VersionSet> versionSetObservable = - candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) ); + //load the ids + final Observable<VersionSet> versionSetObservable = + candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) ); - //now we have a collection, validate our canidate set is correct. + //now we have a collection, validate our canidate set is correct. - return versionSetObservable.map( - entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) ) - .doOnNext( entityCollector -> entityCollector.merge() ).flatMap( + return versionSetObservable.map( + entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, + candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap( entityCollector -> Observable.from( entityCollector.collectResults() ) ); - } ); + } ); return searchIdSetObservable; } - - - /** * Map a new cp entity to an old entity. May be null if not present */ @@ -155,7 +148,6 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id> /** * Validate each candidate results vs the data loaded from cass - * @param filterCandidate */ private void validate( final FilterResult<Candidate> filterCandidate ) { @@ -191,11 +183,9 @@ public class CandidateIdFilter extends AbstractFilter<Candidate, Id> //they're the same add it - final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() ); + final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() ); results.add( result ); } } - - }